Hello Google Community, hoping for some help here!
I have a Dataform sqlx script that needs to merge.
I have tried many ways but merging does not work.
I have read the documentation over and over again.
I have also tried with answers in this community.
I will try to explain it step by step.
First: Create the table "001_001_test_pm_all"
------
config {
type: "table",
name: "001_001_test_pm_all",
bigquery: {
partitionBy: "FLD_FECHA_GCP"
}
}
SELECT DISTINCT
CAST(C1.FECHA_CARGA_GCP AS DATE) AS FLD_FECHA_GCP,
CAST(C1.FECHA_CARGA_GCP AS TIMESTAMP) AS FLD_FECHA_HORA_GCP,
CAST(C1.INVC_SID AS BIGNUMERIC) AS FLD_INVC_SID
FROM ${ref("XXX", "000_rpro_cms_invoice_24h_pm")} AS C1
------
This table have duplicates rows for the field FLD_INVC_SID.
Second: Create the merge table "001_001_test_pm_merge"
------
config {
type: "incremental",
name: "001_001_test_pm_merge",
uniqueKey: ["FLD_INVC_SID"],
bigquery: {
partitionBy: "FLD_FECHA_GCP",
updatePartitionFilter:
"FLD_FECHA_GCP >= date_sub(current_date(), interval 3 day)"
}
}
SELECT
FLD_FECHA_GCP,
FLD_FECHA_HORA_GCP,
FLD_INVC_SID
FROM ${ref("XXX", "001_001_test_pm_all")}
${ when (incremental(),
`WHERE FLD_FECHA_HORA_GCP > (SELECT MAX(FLD_FECHA_HORA_GCP) FROM ${self()})`
) }
-----
Here is the problem:
The first execution brings the same rows, with duplicate rows for the "FLD_INVC_SID" field. Basically, it is the same table as "001_001_test_pm_all".
The second execution shows me this error:
Status Failed
Duration 2 seconds
Reason for failurereason:"invalidQuery" location:"query" message:"Invalid value: Invalid routine name \"001_001_test_pm_merge_procedure\". Routine names must contain only letters, numbers, and underscores, and be at most 256 characters long. at [2:13]": invalid argument
Query:
BEGIN
CREATE SCHEMA IF NOT EXISTS `XXXX.XXX` OPTIONS(location="us-central1");
EXCEPTION WHEN ERROR THEN
IF NOT CONTAINS_SUBSTR(@@error.message, "already exists: dataset") AND
NOT CONTAINS_SUBSTR(@@error.message, "too many dataset metadata update operations") AND
NOT CONTAINS_SUBSTR(@@error.message, "User does not have bigquery.datasets.create permission")
THEN
RAISE USING MESSAGE = @@error.message;
END IF;
END;
BEGIN
DECLARE dataform_table_type DEFAULT (
SELECT ANY_VALUE(table_type)
FROM `XXXX.XXX.INFORMATION_SCHEMA.TABLES`
WHERE table_name = '001_001_test_pm_merge'
);
IF dataform_table_type IS NOT NULL AND dataform_table_type != 'BASE TABLE' THEN
IF dataform_table_type = 'BASE TABLE' THEN
DROP TABLE IF EXISTS `XXXX.XXX.001_001_test_pm_merge`;
ELSEIF dataform_table_type = "VIEW" THEN
DROP VIEW IF EXISTS `XXXX.XXX.001_001_test_pm_merge`;
ELSEIF dataform_table_type = 'MATERIALIZED VIEW' THEN
DROP MATERIALIZED VIEW IF EXISTS `XXXX.XXX.001_001_test_pm_merge`;
END IF;
END IF;
IF dataform_table_type IS NOT NULL THEN
BEGIN
DECLARE dataform_columns ARRAY<STRING>;
DECLARE dataform_columns_list STRING;
DECLARE dataform_columns_merge STRING;
SET dataform_columns = (
SELECT
ARRAY_AGG(DISTINCT "`" || column_name || "`")
FROM `XXXX.XXX.INFORMATION_SCHEMA.COLUMNS`
WHERE table_name = '001_001_test_pm_merge'
);
SET dataform_columns_list = (
SELECT
STRING_AGG(column)
FROM UNNEST(dataform_columns) AS column);
SET dataform_columns_merge = (
SELECT
STRING_AGG(column || "= S." || column)
FROM UNNEST(dataform_columns) AS column);
EXECUTE IMMEDIATE
"""
CREATE OR REPLACE PROCEDURE `XXXX.XXX.001_001_test_pm_merge_procedure`()
BEGIN
MERGE `XXXX.XXX.001_001_test_pm_merge` T
USING (
SELECT
FLD_FECHA_GCP,
FLD_FECHA_HORA_GCP,
FLD_INVC_SID,
FROM `XXXX.XXX.001_001_test_pm_all`
WHERE FLD_FECHA_HORA_GCP > (SELECT MAX(FLD_FECHA_HORA_GCP) FROM `XXXX.XXX.001_001_test_pm_merge`)
) S
ON T.FLD_INVC_SID = S.FLD_INVC_SID
AND T.FLD_FECHA_GCP >= date_sub(current_date(), interval 3 day)
WHEN MATCHED THEN
UPDATE SET """ || dataform_columns_merge || """
WHEN NOT MATCHED THEN
INSERT (""" || dataform_columns_list || """)
VALUES (""" || dataform_columns_list || """);
END;
""";
CALL `XXXX.XXX.001_001_test_pm_merge_procedure`();
DROP PROCEDURE IF EXISTS `XXXX.XXX.001_001_test_pm_merge_procedure`;
END;
ELSE
BEGIN
CREATE TABLE IF NOT EXISTS `XXXX.XXX.001_001_test_pm_merge`
PARTITION BY FLD_FECHA_GCP
OPTIONS()
AS (
SELECT
FLD_FECHA_GCP,
FLD_FECHA_HORA_GCP,
FLD_INVC_SID,
FROM `XXXX.XXX.001_001_test_pm_all`
);
END;
END IF;
END;
Your help with this.
The only thing I need is that the merge works with incremental table in dataform with sqlx, updating the rows and adding the new one with FLD_INVC_SID.
Solved! Go to Solution.
I have encountered a similar error with this, Can you try adding a letter before the start of the procedure name for testing? like :\"a001_001_test_pm_merge_procedure\". I believe its having a problem when it started with digits.