Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Save incremental history

Hello, I am new to Dataform and I need your help to enter this data. I want to make sure that the history is maintained, that is, that each day is recorded in a new row without overwriting the previous

config {
    type: "incremental",

    uniqueKey: ["dt"]

}

pre_operations {

  DECLARE event_timestamp_checkpoint DEFAULT (

    ${when(incremental(),

    `SELECT max(Fecha_informe) FROM ${self()}`,

    `SELECT timestamp("2000-01-01")`)}

  )

}

INSERT INTO

  `DEL_INFORME_COBRANZA.historico_kpi` ( Fecha_informe,

    No_vencido,

    Menor_31_dias,

    Entre_31_60_dias,

    Entre_61_90_dias,

    Mayor_90_dias,

    snapshot_date )

   

SELECT

  dt AS Fecha_informe,

  SUM(CASE

      WHEN Rango_Mora_Comb = "No vencido" THEN Importe_CLP

      ELSE 0

  END

    ) AS No_vencido,

  SUM(CASE

      WHEN Rango_Mora_Comb = "Menor 31 dias" THEN Importe_CLP

      ELSE 0

  END

    ) AS Menor_31_dias,

  SUM(CASE

      WHEN Rango_Mora_Comb = "Entre 31 y 60 dias" THEN Importe_CLP

      ELSE 0

  END

    ) AS Entre_31_60_dias,

  SUM(CASE

      WHEN Rango_Mora_Comb = "Entre 61 y 90 dias" THEN Importe_CLP

      ELSE 0

  END

    ) AS Entre_61_90_dias,

  SUM(CASE

      WHEN Rango_Mora_Comb = "Mayor a 90 dias" THEN Importe_CLP

      ELSE 0

  END

    ) AS Mayor_90_dias,

  CURRENT_DATE() AS snapshot_date

FROM

  ${ref("informe_partidas")}

WHERE

  Estado_Partida = 'Abierta'

GROUP BY

  dt

EDIT; When executing the flow it gives the following error:
rhenriquez94_0-1728905737391.png

 



Solved Solved
0 3 1,055
1 ACCEPTED SOLUTION

Try something like this:

config {
    type: "incremental",
    protected: true
}

SELECT
  CURRENT_DATE() AS snapshot_date,
  SUM(CASE
      WHEN Rango_Mora_Comb = "No vencido" THEN Importe_CLP
      ELSE 0
  END
    ) AS No_vencido,
  SUM(CASE
      WHEN Rango_Mora_Comb = "Menor 31 dias" THEN Importe_CLP
      ELSE 0
  END
    ) AS Menor_31_dias,
  SUM(CASE
      WHEN Rango_Mora_Comb = "Entre 31 y 60 dias" THEN Importe_CLP
      ELSE 0
  END
    ) AS Entre_31_60_dias,
  SUM(CASE
      WHEN Rango_Mora_Comb = "Entre 61 y 90 dias" THEN Importe_CLP
      ELSE 0
  END
    ) AS Entre_61_90_dias,
  SUM(CASE
      WHEN Rango_Mora_Comb = "Mayor a 90 dias" THEN Importe_CLP
      ELSE 0
  END
    ) AS Mayor_90_dias
FROM ${ref("informe_partidas")}

${when(incremental(),
`WHERE Estado_Partida = 'Abierta' AND CURRENT_DATE() > (SELECT MAX(snapshot_date) FROM ${self()})`,
`WHERE Estado_Partida = 'Abierta'`)}

View solution in original post

3 REPLIES 3

What is the error message or the specific problem you encountered?

Hello, what happens is that when running the data flow, it does not preserve its history, for example if the process runs on the 14th-10th, it does not save the data in the next row, my goal is the following:

Fecha_informe

No_vencido

Menor_31_dias

Entre_31_60_dias

Entre_61_90_dias

Mayor_90_dias

14-10-2024

36216056884

23389173681

1775113359

1239642536

1853381099

11-10-2024

21231231234

23289173681

1675113359

1139642536

1753381099

13-10-2024

12303881411

14361823858

1665728850

1130258027

1743996590



and I am only getting the total of the last flow:

Fecha_informe

No_vencido

Menor_31_dias

Entre_31_60_dias

Entre_61_90_dias

Mayor_90_dias

14-10-2024

36216056884

23389173681

1775113359

1239642536

1853381099



Try something like this:

config {
    type: "incremental",
    protected: true
}

SELECT
  CURRENT_DATE() AS snapshot_date,
  SUM(CASE
      WHEN Rango_Mora_Comb = "No vencido" THEN Importe_CLP
      ELSE 0
  END
    ) AS No_vencido,
  SUM(CASE
      WHEN Rango_Mora_Comb = "Menor 31 dias" THEN Importe_CLP
      ELSE 0
  END
    ) AS Menor_31_dias,
  SUM(CASE
      WHEN Rango_Mora_Comb = "Entre 31 y 60 dias" THEN Importe_CLP
      ELSE 0
  END
    ) AS Entre_31_60_dias,
  SUM(CASE
      WHEN Rango_Mora_Comb = "Entre 61 y 90 dias" THEN Importe_CLP
      ELSE 0
  END
    ) AS Entre_61_90_dias,
  SUM(CASE
      WHEN Rango_Mora_Comb = "Mayor a 90 dias" THEN Importe_CLP
      ELSE 0
  END
    ) AS Mayor_90_dias
FROM ${ref("informe_partidas")}

${when(incremental(),
`WHERE Estado_Partida = 'Abierta' AND CURRENT_DATE() > (SELECT MAX(snapshot_date) FROM ${self()})`,
`WHERE Estado_Partida = 'Abierta'`)}