I have a BigQuery table created using the following DDL:
CREATE TABLE mytable AS ( id STRING, source STRING, PRIMARY KEY (id) NOT ENFORCED );
As you can see, id is set as the table Primary Key. My Beam pipeline is then defined as follows:
def process_message(message): data = json.loads(message.decode("utf-8")) if data == {}: print(f"Running DELETE operation on row {this_message['Key']}") data['_CHANGE_TYPE'] = 'DELETE' else: print(f"Running UPSERT operation on row {this_message['Key']}") data['_CHANGE_TYPE'] = 'UPSERT' data['_CHANGE_SEQUENCE_NUMBER'] = str(struct.pack('d', int(round(float(this_message['Value']['updated'])))).hex()) return [data] with beam.Pipeline(options=PipelineOptions([ f"--project={project_id}", "--region=europe-west2", "--runner=DataflowRunner", "--streaming", "--temp_location=gs://tmp/cdc", "--staging_location=gs://tmp/cdc", ])) as pipeline: data = pipeline | 'ReadFromPubSub' >> ReadFromPubSub(subscription=f'projects/{project_id}/subscriptions/{bq_table_name}') data | 'ProcessMessages' >> beam.ParDo(process_message) | 'WriteToBigQuery' >> WriteToBigQuery( f'{project_id}:{bq_dataset}.{bq_table_name}', schema="id STRING,source STRING", method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API )
However, when I query my table in BigQuery after consuming a handful of records, I have hundreds of duplicates of the id field.
How do I get my pipeline to respect the primary key and perform an UPSERT operation, as it should do?
Solved! Go to Solution.
Thank you @ms4446 ! I'll be honest - its not a particularly great look to begin with when you respond on a support form with an AI generated response using my post as the prompt. It's even worse when you're a Google employee responding to a Google product query on Googles own community platform. And its even worse still when the response both does not address my issue and manages to be full of errors! Doesn't reflect all that well on either yourself or Google as a Cloud provider, wouldn't you agree?
That being said, on to your response itself:
@ms4446 wrote:the Storage Write API doesn't automatically enforce primary key constraints
Actually, it can. Using BigQuery CDC, update operations are handled as per the tables Primary Key as long as you set the pseudocolumn `_CHANGE_TYPE`, as you can see I've done in my code. ("To use CDC, set _CHANGE_TYPE when you stream row modifications using the Storage Write API"(
@ms4446 wrote:The DDL for your BigQuery table includes NOT ENFORCED, meaning BigQuery won't block duplicate id values
Primary Key Enforcement: To enforce the primary key, remove NOT ENFORCED from the DDL. Note that this will throw an error if you try to insert a duplicate key, so handle that in the pipeline.
BigQuery does not support enforced Primary Keys at all.
@ms4446 wrote:Buffer Updates: Use a windowing strategy to buffer incoming updates.
How would this help exactly? It's a streaming pipeline that delivers row updates as soon as they are received by the subscriber. Even if I did want to wait X amount of time so as to batch my records and deduplicate, what happens on X+1 when that record is updated in my source data, after I've committed my buffered batch? I'll of course get another duplicate.
@ms4446 wrote:Data Freshness: BigQuery's automatic duplicate removal might take a short while. For real-time deduplication, consider using a temporary table and MERGE statements.
Absolutely not. If using a temporary table and a MERGE statement was practical or desirable, I would not be using a streaming pipeline in the first place!
In any case, it seems that the Python SDK does not support such operations, whereas the Java implementation does, as per this blog post. So this is a major oversight in your documentation and your client libraries, as it's not clear at all how you're supposed to achieve this in a Python based pipeline. Adding the _CHANGE_TYPE pseudocolumn to the `schema` argument of the `WriteToBigQuery` function returns an error suggesting that columns starting with underscores are invalid in BigQuery.