Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

CDC with Dataflow into BigQuery: Storage Write API doesn't respect Primary Key

I have a BigQuery table created using the following DDL:

CREATE TABLE mytable AS
(
  id STRING,
  source STRING,
  PRIMARY KEY (id) NOT ENFORCED
);

As you can see, id is set as the table Primary Key. My Beam pipeline is then defined as follows:

def process_message(message):  
    data = json.loads(message.decode("utf-8"))
    if data == {}:
        print(f"Running DELETE operation on row {this_message['Key']}")
        data['_CHANGE_TYPE'] = 'DELETE'
    else:   
        print(f"Running UPSERT operation on row {this_message['Key']}")
        data['_CHANGE_TYPE'] = 'UPSERT'
    data['_CHANGE_SEQUENCE_NUMBER'] = str(struct.pack('d', int(round(float(this_message['Value']['updated'])))).hex())
    return [data]


with beam.Pipeline(options=PipelineOptions([
    f"--project={project_id}",
    "--region=europe-west2",
    "--runner=DataflowRunner",
    "--streaming",
    "--temp_location=gs://tmp/cdc",
    "--staging_location=gs://tmp/cdc",
])) as pipeline:
  data = pipeline | 'ReadFromPubSub' >> ReadFromPubSub(subscription=f'projects/{project_id}/subscriptions/{bq_table_name}')

  data | 'ProcessMessages' >> beam.ParDo(process_message) | 'WriteToBigQuery' >> WriteToBigQuery(
    f'{project_id}:{bq_dataset}.{bq_table_name}',
    schema="id STRING,source STRING",
    method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API
  )

However, when I query my table in BigQuery after consuming a handful of records, I have hundreds of duplicates of the id field.

How do I get my pipeline to respect the primary key and perform an UPSERT operation, as it should do?

Solved Solved
2 4 2,081
1 ACCEPTED SOLUTION

Thank you @ms4446 ! I'll be honest - its not a particularly great look to begin with when you respond on a support form with an AI generated response using my post as the prompt. It's even worse when you're a Google employee responding to a Google product query on Googles own community platform. And its even worse still when the response both does not address my issue and manages to be full of errors! Doesn't reflect all that well on either yourself or Google as a Cloud provider, wouldn't you agree?

That being said, on to your response itself:


@ms4446 wrote:

the Storage Write API doesn't automatically enforce primary key constraints


Actually, it can. Using BigQuery CDC, update operations are handled as per the tables Primary Key as long as you set the pseudocolumn `_CHANGE_TYPE`, as you can see I've done in my code. ("To use CDC, set _CHANGE_TYPE when you stream row modifications using the Storage Write API"(


@ms4446 wrote:

The DDL for your BigQuery table includes NOT ENFORCED, meaning BigQuery won't block duplicate id values

Primary Key Enforcement: To enforce the primary key, remove NOT ENFORCED from the DDL. Note that this will throw an error if you try to insert a duplicate key, so handle that in the pipeline.


BigQuery does not support enforced Primary Keys at all.


@ms4446 wrote:

Buffer Updates: Use a windowing strategy to buffer incoming updates.


How would this help exactly? It's a streaming pipeline that delivers row updates as soon as they are received by the subscriber. Even if I did want to wait X amount of time so as to batch my records and deduplicate, what happens on X+1 when that record is updated in my source data, after I've committed my buffered batch? I'll of course get another duplicate.


@ms4446 wrote:

Data Freshness: BigQuery's automatic duplicate removal might take a short while. For real-time deduplication, consider using a temporary table and MERGE statements.


Absolutely not. If using a temporary table and a MERGE statement was practical or desirable, I would not be using a streaming pipeline in the first place!

In any case, it seems that the Python SDK does not support such operations, whereas the Java implementation does, as per this blog post. So this is a major oversight in your documentation and your client libraries, as it's not clear at all how you're supposed to achieve this in a Python based pipeline. Adding the _CHANGE_TYPE pseudocolumn to the `schema` argument of the `WriteToBigQuery` function returns an error suggesting that columns starting with underscores are invalid in BigQuery.

View solution in original post

4 REPLIES 4

The core issue you're facing is that while you've defined a primary key (id) in your BigQuery table, the Storage Write API doesn't automatically enforce primary key constraints. Its primary function is efficient data loading, which doesn't include checking for existing primary key conflicts. This results in duplicate id values during standard inserts.

The DDL for your BigQuery table includes NOT ENFORCED, meaning BigQuery won't block duplicate id values. The Storage Write API focuses on raw data writes and doesn't inherently check for primary key conflicts. To achieve the desired UPSERT behavior, you need to modify your Dataflow pipeline.

Here's a refined pipeline design to ensure UPSERT behavior while respecting your primary key:

  1. Buffer Updates: Use a windowing strategy to buffer incoming updates.

  2. Group By Key: Group records by the id field.

  3. Reduce and Deduplicate: For each key, keep only the latest update.

  4. Write to BigQuery: Write the deduplicated records to BigQuery.

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows
from apache_beam.io.gcp.bigquery import BigQueryDisposition, WriteToBigQuery

def process_message(message):
    data = json.loads(message.decode("utf-8"))
    if not data:
        data['_CHANGE_TYPE'] = 'DELETE'
    else:
        data['_CHANGE_TYPE'] = 'UPSERT'
    data['_CHANGE_SEQUENCE_NUMBER'] = str(struct.pack('d', int(round(float(message['Value']['updated'])))).hex())
    return [data]

def deduplicate(records):
    latest_record = max(records, key=lambda x: x['updated'])
    return latest_record

with beam.Pipeline(options=PipelineOptions([
    f"--project={project_id}",
    "--region=europe-west2",
    "--runner=DataflowRunner",
    "--streaming",
    "--temp_location=gs://tmp/cdc",
    "--staging_location=gs://tmp/cdc",
])) as pipeline:
  
    data = pipeline | 'ReadFromPubSub' >> beam.io.ReadFromPubSub(subscription=f'projects/{project_id}/subscriptions/{bq_table_name}')
  
    windowed_data = (data
                     | 'Window' >> beam.WindowInto(
                         FixedWindows(60),
                         trigger=beam.transforms.trigger.AfterProcessingTime(60),
                         accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING))
  
    processed_data = windowed_data | 'ProcessMessages' >> beam.ParDo(process_message)
  
    grouped_data = (processed_data
                    | 'KeyById' >> beam.Map(lambda row: (row['id'], row))
                    | 'GroupById' >> beam.GroupByKey())
  
    deduplicated_data = (grouped_data
                         | 'Deduplicate' >> beam.Map(lambda kv: deduplicate(kv[1])))
  
    deduplicated_data | 'WriteToBigQuery' >> WriteToBigQuery(
        f'{project_id}:{bq_dataset}.{bq_table_name}',
        schema="id STRING,source STRING, updated TIMESTAMP",
        write_disposition=BigQueryDisposition.WRITE_APPEND,  # Append to the existing table
        method=WriteToBigQuery.Method.STORAGE_WRITE_API,
        create_disposition=BigQueryDisposition.CREATE_IF_NEEDED
    )

Key Considerations

  1. Data Ordering: For optimal efficiency, ensure your CDC events are delivered in a roughly sorted order based on the primary key.

  2. Data Freshness: BigQuery's automatic duplicate removal might take a short while. For real-time deduplication, consider using a temporary table and MERGE statements.

  3. Primary Key Enforcement: To enforce the primary key, remove NOT ENFORCED from the DDL. Note that this will throw an error if you try to insert a duplicate key, so handle that in the pipeline.

  4. Error Handling: Implement robust error handling to catch cases where duplicate writes fail due to primary key violations.

Troubleshooting and Refinement

  1. Logging: Add detailed logging at various stages of your pipeline to pinpoint where duplicates arise.
  2. Method Change: The STORAGE_WRITE_API is more appropriate for streaming but requires handling duplicates explicitly as shown.

Thank you @ms4446 ! I'll be honest - its not a particularly great look to begin with when you respond on a support form with an AI generated response using my post as the prompt. It's even worse when you're a Google employee responding to a Google product query on Googles own community platform. And its even worse still when the response both does not address my issue and manages to be full of errors! Doesn't reflect all that well on either yourself or Google as a Cloud provider, wouldn't you agree?

That being said, on to your response itself:


@ms4446 wrote:

the Storage Write API doesn't automatically enforce primary key constraints


Actually, it can. Using BigQuery CDC, update operations are handled as per the tables Primary Key as long as you set the pseudocolumn `_CHANGE_TYPE`, as you can see I've done in my code. ("To use CDC, set _CHANGE_TYPE when you stream row modifications using the Storage Write API"(


@ms4446 wrote:

The DDL for your BigQuery table includes NOT ENFORCED, meaning BigQuery won't block duplicate id values

Primary Key Enforcement: To enforce the primary key, remove NOT ENFORCED from the DDL. Note that this will throw an error if you try to insert a duplicate key, so handle that in the pipeline.


BigQuery does not support enforced Primary Keys at all.


@ms4446 wrote:

Buffer Updates: Use a windowing strategy to buffer incoming updates.


How would this help exactly? It's a streaming pipeline that delivers row updates as soon as they are received by the subscriber. Even if I did want to wait X amount of time so as to batch my records and deduplicate, what happens on X+1 when that record is updated in my source data, after I've committed my buffered batch? I'll of course get another duplicate.


@ms4446 wrote:

Data Freshness: BigQuery's automatic duplicate removal might take a short while. For real-time deduplication, consider using a temporary table and MERGE statements.


Absolutely not. If using a temporary table and a MERGE statement was practical or desirable, I would not be using a streaming pipeline in the first place!

In any case, it seems that the Python SDK does not support such operations, whereas the Java implementation does, as per this blog post. So this is a major oversight in your documentation and your client libraries, as it's not clear at all how you're supposed to achieve this in a Python based pipeline. Adding the _CHANGE_TYPE pseudocolumn to the `schema` argument of the `WriteToBigQuery` function returns an error suggesting that columns starting with underscores are invalid in BigQuery.

Your pain just saved me weeks of pain. I was about to run head first into this same problem. 

Starting with version 2.36.0 of the Beam SDK for Java, you can use the BigQuery Storage Write API from the BigQueryIO connector.

Also after version 2.47.0 of Beam SDK for Python, SDK supports BigQuery Storage Write API.

https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api