Hi @ms4446
I am finding many repeated rows in BigQuery. I am reading several csvs as you see below, converting it into dataframe , appending data-frame and then loading it into BQ. I don't think this process can anywhere give me duplication. Can you confirm?
storage_client = storage.Client()
bigquery_client = bigquery.Client(project=project_id)
blobs = storage_client.list_blobs(bucket_name)
csv_files = [blob.name for blob in blobs if blob.name.endswith('.csv')]
# Load each CSV into a DataFrame and store them in a list
dataframes = []
for file_name in csv_files:
df = load_csv_to_dataframe(storage_client, bucket_name, file_name)
dataframes.append(df)
merged_df = pd.concat(dataframes, ignore_index=True)
df_to_bq(merged_df, table_id, bigquery_client, dataset_id)
def load_csv_to_dataframe(storage_client, bucket_name, file_name):
bucket = storage_client.get_bucket(bucket_name)
blob = bucket.blob(file_name)
file_path = "gs://{}/{}".format(bucket_name, file_name)
df = pd.read_csv(file_path, encoding='unicode_escape')
df['file_name'] = file_name
return df
def df_to_bq(df, table_name, client, dataset_name):
dataset_ref = client.dataset(dataset_name)
table_ref = dataset_ref.table(table_name)
job_config = bigquery.LoadJobConfig()
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE # Replace the existing table
job = client.load_table_from_dataframe(df, table_ref, job_config=job_config)
job.result() # Wait for the job to complete
Solved! Go to Solution.
Hi @ayushmaheshwari ,
Encountering duplicate rows in BigQuery can happen for several reasons, even when your process seems correct at first glance. Here are som potential causes and how to address them:
1. Duplicates in the Source CSV Files
def load_csv_to_dataframe(storage_client, bucket_name, file_name):
# ... (existing code) ...
df = pd.read_csv(file_path, encoding='unicode_escape')
df = df.drop_duplicates() # Remove duplicates within the CSV
df['file_name'] = file_name
return df
2. Overwriting vs. Appending in BigQuery
WRITE_TRUNCATE
vs. WRITE_APPEND
: Your script uses WRITE_TRUNCATE
for write_disposition
, which overwrites the existing table each time. This approach prevents accumulation of duplicates across script executions but doesn't address duplicates within your source data or those introduced during data processing.WRITE_APPEND
if your goal is to add new data to existing data in BigQuery, ensuring you're not re-adding existing rows.3. Issues with the df_to_bq
Function
df_to_bq
function that might lead to duplication. The function, as provided, should not inherently cause duplicates unless the input DataFrame already contains them.Debugging and Prevention Strategies
merged_df = pd.concat(dataframes, ignore_index=True)
print("Number of duplicate rows:", merged_df.duplicated().sum())
merged_df = pd.concat(dataframes, ignore_index=True)
merged_df = merged_df.drop_duplicates() # Deduplicate the merged DataFrame
df_to_bq(merged_df, table_id, bigquery_client, dataset_id)
Use Unique Identifiers for Deduplication: If your dataset has a unique identifier (or a combination of columns that can serve as one), consider using BigQuery's DML statements to deduplicate after loading.
Example:
DELETE FROM `project.dataset.table_name`
WHERE ROW_NUMBER() OVER (PARTITION BY id_column, column2 ORDER BY timestamp_column DESC) > 1;
Additional Considerations
ROW_NUMBER()
) provide powerful options.Hi @ayushmaheshwari ,
Encountering duplicate rows in BigQuery can happen for several reasons, even when your process seems correct at first glance. Here are som potential causes and how to address them:
1. Duplicates in the Source CSV Files
def load_csv_to_dataframe(storage_client, bucket_name, file_name):
# ... (existing code) ...
df = pd.read_csv(file_path, encoding='unicode_escape')
df = df.drop_duplicates() # Remove duplicates within the CSV
df['file_name'] = file_name
return df
2. Overwriting vs. Appending in BigQuery
WRITE_TRUNCATE
vs. WRITE_APPEND
: Your script uses WRITE_TRUNCATE
for write_disposition
, which overwrites the existing table each time. This approach prevents accumulation of duplicates across script executions but doesn't address duplicates within your source data or those introduced during data processing.WRITE_APPEND
if your goal is to add new data to existing data in BigQuery, ensuring you're not re-adding existing rows.3. Issues with the df_to_bq
Function
df_to_bq
function that might lead to duplication. The function, as provided, should not inherently cause duplicates unless the input DataFrame already contains them.Debugging and Prevention Strategies
merged_df = pd.concat(dataframes, ignore_index=True)
print("Number of duplicate rows:", merged_df.duplicated().sum())
merged_df = pd.concat(dataframes, ignore_index=True)
merged_df = merged_df.drop_duplicates() # Deduplicate the merged DataFrame
df_to_bq(merged_df, table_id, bigquery_client, dataset_id)
Use Unique Identifiers for Deduplication: If your dataset has a unique identifier (or a combination of columns that can serve as one), consider using BigQuery's DML statements to deduplicate after loading.
Example:
DELETE FROM `project.dataset.table_name`
WHERE ROW_NUMBER() OVER (PARTITION BY id_column, column2 ORDER BY timestamp_column DESC) > 1;
Additional Considerations
ROW_NUMBER()
) provide powerful options.