Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Issue in Implement Datastream and Dataflow for analytics

I was adding a new source in Datastream whose base db is in Postgres I’ve setup the pipeline where the destination is through cloud storage and then the dataflow job sends the data to BQ because we wanted to logs separately. When I’m backfilling the data/when the new updates are coming it’s flowing into the staging dataset but not the main dataset, the table is created but with 0 data, essentially the merge operation is not happening. Any idea why this could be happening?

0 2 172
2 REPLIES 2

The issue with the merge operation not happening as expected could be due to several reasons:

Possible Causes

  1. BigQuery Schema Mismatch: Differences in schema between the PostgreSQL source, the intermediate Cloud Storage representation, and the BigQuery target can lead to merge failures. BigQuery is case-sensitive, and discrepancies in column names, order, or constraints (nullable/required) can cause issues.

  2. Data Type Errors: Mismatches in data types across the source, Cloud Storage, and BigQuery can prevent successful merges. Pay special attention to date, timestamp, and numeric types, ensuring compatibility across systems.

  3. MERGE Statement Configuration: Errors in the syntax or logic of a custom MERGE statement within your Dataflow job can obstruct the merging process. The statement must accurately reflect your data handling strategy, including how duplicates are managed and when to perform updates versus inserts.

  4. Incorrect Permissions: The service account used by Dataflow and/or Datastream must have sufficient permissions in BigQuery. Ensure it has at least bigquery.dataEditor and bigquery.jobUser roles, applied at the correct levels (dataset and project, respectively).

  5. Error Handling: Inadequate error handling or logging within the Dataflow job can obscure the root cause of merge failures. Enhancing logging can illuminate specific issues occurring during the merge process.

  6. Timeouts: Operations that take too long can time out, especially with large datasets or complex merge operations. Adjusting timeout settings may be necessary to accommodate your data's needs.

Below are some troubleshooting steps that might help resolve the issue:

  1. Verify Schemas: Ensure complete alignment in schema names, column order, and constraints between your PostgreSQL source, Cloud Storage, and BigQuery. Remember that BigQuery is case-sensitive.

  2. Data Type Check: Confirm that data types are consistent and compatible across your PostgreSQL source, Cloud Storage, and the BigQuery target. Special attention should be given to how numeric, date, and timestamp types are handled.

  3. MERGE Statement Review: Examine your custom MERGE statement for syntax accuracy and logical correctness. The conditions in the ON clause are crucial for identifying matches correctly. Test the logic with a subset of data directly in BigQuery if possible.

  4. Permissions: Double-check the permissions of the service accounts for Dataflow and Datastream. Ensure they have the necessary BigQuery roles (bigquery.dataEditor and bigquery.jobUser) at the appropriate scope.

  5. Error Logging: Implement or enhance logging within your Dataflow job to capture detailed error information. Consider using Google Cloud's monitoring tools like Stackdriver for better visibility into the process.

  6. Timeout Settings: If you suspect timeouts are an issue, increase the relevant timeout values for your Datastream and Dataflow jobs. This might involve adjusting parameters like streamingEngineTimeout in Dataflow or timeoutMs in BigQuery jobs.

Additional Tips

  • Test with a Small Sample: Begin troubleshooting with a smaller data sample to quickly identify and isolate errors.

  • Validate Source Data: Check a few records from your PostgreSQL source to ensure data quality and format meet your expectations.

Hi @NishinThattil I get where you're coming from.  The issue you're describing , where the tables get created but the data MERGE into the final dataset doesn't happen , is actually pretty common when working with DataStream + Dataflow in custom pipelines.

Here are a few possible causes to look into:

Main things to check:

Staging file schema
If the files generated in Cloud Storage don’t have the right _metadata fields (like _metadata_row_id or _metadata_error) that Dataflow expects, the template can silently fail when trying to MERGE the data.

Primary key not detected
Dataflow needs to properly identify primary keys to do the MERGE correctly.
If your Postgres setup uses composite keys or if the primary key wasn’t correctly configured in Datastream, the pipeline won't be able to match records properly.

Issues in the Dataflow template configuration
Make sure the Dataflow job is running the correct template, like the "Datastream to BigQuery" template, and that the Primary Key parameters are set properly.

Write permissions on the destination dataset
It sounds basic, but sometimes the Dataflow job worker doesn't have the right INSERT, UPDATE, or MERGE permissions on the BigQuery dataset.

Some practical suggestions:

  • Check the Dataflow job logs (in Stackdriver) for any errors like "Primary key missing" or "Unable to merge."

  • Review your Datastream JSON Schema — it should properly define the primaryKeyFields.

  • Make sure the final dataset already has the table created, or confirm that the template is set to auto-create tables if needed.

An easier alternative if you want to avoid the complexity:


If you'd rather not deal with staging datasets, merges, and templates manually, you might want to look into tools like Windsor.ai.
They can replicate data from databases like Postgres directly into BigQuery, automatically handling updates and change data capture (CDC) for you.

It’s a great option if your main goal is to move data reliably without spending too much time fine-tuning low-level pipelines.

Hope this helps!