How to prevent a Dataform workflow in case a pre defined condition is not met.

Hello Community,

I have the following scenario:

1. Ingest daily multiple .csv files into designated BQ tables (each .csv file goes into its own table).

2. After each file is ingested write the result(success or failure) of .csv file in a BQ table (validator_table) that has as many columns as the maximum .csv files that can be ingested daily + "current_date" column.

Example: Today, out of maximum 10 .csv files I receive only 5, 4 of them are inserted successful and 1 of them fails. The validation table row for the current day looks like this: 9 columns equal to "success" (5 of the missing .csv files + 4 that were inserted successfully), 1 columns equals to "failure" (for the .csv file that failed to be ingested) and 1 column equals to 2024/03/11 (current date).

3. Merge daily the landing tables from BQ (where the .csv files are ingested) to a new set of tables that contain all history data. The merge is done table bytable (10 "operations" files that correspond to each of the 10 .csv files).

The structure in BQ is the following:

L1 tables (the so called landing tables where the .csv files land) -> L2 tables (the so called raw tables that contain all history data).

 

My question is: in this above described scenario: at least 1 .csv file fails to be correctly ingested into BQ L1 table, is there any possibility to prevent the execution of the merge operations for all 10 tables (including the ones that correspond to the .csv files ingested correctly)?

I am thinking of something like this:

Before the merge is executed (maybe add a pre operation type validation that does the following:

SELECT COUNT(1) FROM "validator_table" where date = today and column1 OR column2 OR ... OR column10 = 'failure')

In case the count > 0 then the merge operation is blocked and it does not get executed any more.

For reference I am adding one merge file I have so that you can understand what type of flow I am trying to prevent if something goes wrong.

 
config {
    type: "operations",
    tags: ["deltaLoad"]
}

DECLARE
  deltaLoadTimestamp TIMESTAMP

---

DECLARE
  deltaLoadDate DATE

---

SET
  deltaLoadTimestamp = CURRENT_TIMESTAMP

---

SET
  deltaLoadDate = CURRENT_DATE

---

MERGE INTO
  raw_cbd.ecbd_timezone_t AS tt
USING
  (
  SELECT
    *
  FROM
    raw_cbd.ecbd_timezone_t t1
  WHERE
    t1.load_insert_date = (
    SELECT
      MAX(t2.load_insert_date)
    FROM
      raw_cbd.ecbd_timezone_t t2
    WHERE
      DATE(t2.load_insert_date) = deltaLoadDate) ) AS st
ON
  tt.tz_tk = CAST(st.tz_tk AS NUMERIC)
  WHEN MATCHED THEN UPDATE SET
  tt.tz_tk = CAST(st.tz_tk AS NUMERIC),
  tt.tzt_type = st.tzt_type,
  tt.tz_code = st.tz_code,
  tt.valid_from = CAST(st.valid_from AS TIMESTAMP),
  tt.valid_to = CAST(st.valid_to AS TIMESTAMP),
  tt.delete_date = CAST(st.delete_date AS TIMESTAMP),
  tt.src_name_ins = st.src_name_ins,
  tt.ii_date = CAST(st.ii_date AS TIMESTAMP),
  tt.iu_date = CAST(st.iu_date AS TIMESTAMP),
  tt.iip_insert_date = CAST(st.iip_insert_date AS TIMESTAMP),
  tt.iip_update_date = CAST(st.iip_update_date AS TIMESTAMP),
  tt.load_insert_date = CAST(st.load_insert_date AS TIMESTAMP),
  tt.ecbd_update_date = deltaLoadTimestamp
  WHEN NOT MATCHED
  THEN
INSERT
  ( tz_tk,
    tzt_type,
    tz_code,
    valid_from,
    valid_to,
    delete_date,
    src_name_ins,
    ii_date,
    iu_date,
    iip_insert_date,
    iip_update_date,
    load_insert_date,  
    ecbd_insert_date,
    ecbd_update_date)
VALUES(
   CAST(st.tz_tk AS NUMERIC),
   st.tzt_type,
   st.tz_code,
   CAST(st.valid_from AS TIMESTAMP),
   CAST(st.valid_to AS TIMESTAMP),
   CAST(st.delete_date AS TIMESTAMP),
   st.src_name_ins,
   CAST(st.ii_date AS TIMESTAMP),
   CAST(st.iu_date AS TIMESTAMP),
   CAST(st.iip_insert_date AS TIMESTAMP),
   CAST(st.iip_update_date AS TIMESTAMP),
   CAST(st.load_insert_date AS TIMESTAMP),
   deltaLoadTimestamp,
   deltaLoadTimestamp);
0 3 123
3 REPLIES 3

To start with, instead of ingesting (loading), what about creating a one-time external table to each set of csv's stored in GCS ?

You can then tell the source of each record using the pseudo column _FILE_NAME, no real need for hive partitioning.

You can then build your queries and filter from there.

It has the benefit of not requiring any preprocessing (hence no need for dataform workflow just for that) + you can leverage materialized views if need be that refreshes when a new csv is added in GCS. 

Hello,
I really need the above described flow.

Hello Community,
I have found the solution for the above question.

Create a "manual assertion".

config {
  type: "assertion",
  name: "ingestion_validator"
}

SELECT
  *
FROM
  playground.csv_ingestion_validator
WHERE
  file_ingestion_status is FALSE
  AND validation_date = CURRENT_DATE
 
Add the manual assertion as dependency in the workflow that you want
its execution to be blocked in case the assertion fails.
config {
    type: "operations",
    dependencies: ["ingestion_validator"],
    tags: ["iv"]
}
 
The result is that the workflow will not be executed due to the assertion failure.