Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

csv to dataframe to BigQuery

Hi @ms4446 

I am finding many repeated rows in BigQuery. I am reading several csvs as you see below, converting it into dataframe , appending data-frame and then loading it into BQ. I don't think this process can anywhere give me duplication. Can you confirm?

 

storage_client = storage.Client()
    bigquery_client = bigquery.Client(project=project_id)
    blobs = storage_client.list_blobs(bucket_name)
    csv_files = [blob.name for blob in blobs if blob.name.endswith('.csv')]
    # Load each CSV into a DataFrame and store them in a list
    dataframes = []
    for file_name in csv_files:
        df = load_csv_to_dataframe(storage_client, bucket_name, file_name)
        
        dataframes.append(df)
    
    merged_df  = pd.concat(dataframes, ignore_index=True)
    df_to_bq(merged_df, table_id, bigquery_client, dataset_id)
 
 
def load_csv_to_dataframe(storage_client, bucket_name, file_name):
    bucket = storage_client.get_bucket(bucket_name)
    blob = bucket.blob(file_name)
    file_path = "gs://{}/{}".format(bucket_name, file_name)
    df = pd.read_csv(file_path, encoding='unicode_escape')
    df['file_name'] = file_name
 
    return df
 
def df_to_bq(df, table_name, client, dataset_name):

    
    dataset_ref = client.dataset(dataset_name)
    table_ref = dataset_ref.table(table_name)
    job_config = bigquery.LoadJobConfig()
    job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE  # Replace the existing table
    job = client.load_table_from_dataframe(df, table_ref, job_config=job_config)
    job.result()  # Wait for the job to complete

 

  

Solved Solved
0 1 869
1 ACCEPTED SOLUTION

Hi @ayushmaheshwari ,

Encountering duplicate rows in BigQuery can happen for several reasons, even when your process seems correct at first glance. Here are som potential causes and how to address them:

1. Duplicates in the Source CSV Files

  • Double-check Your CSVs: Carefully review your CSV files to ensure they don't contain duplicate rows before loading them into BigQuery. Duplicates at this stage can propagate through your data pipeline.
  • Deduplicating CSVs: If duplicates are found within the CSVs, it's crucial to clean them up. This can be done before loading or as a preprocessing step within your Python code:
 
def load_csv_to_dataframe(storage_client, bucket_name, file_name): 
    # ... (existing code) ... 
    df = pd.read_csv(file_path, encoding='unicode_escape') 
    df = df.drop_duplicates() # Remove duplicates within the CSV 
    df['file_name'] = file_name 
    return df 

2. Overwriting vs. Appending in BigQuery

  • WRITE_TRUNCATE vs. WRITE_APPEND: Your script uses WRITE_TRUNCATE for write_disposition, which overwrites the existing table each time. This approach prevents accumulation of duplicates across script executions but doesn't address duplicates within your source data or those introduced during data processing.
  • Consider WRITE_APPEND if your goal is to add new data to existing data in BigQuery, ensuring you're not re-adding existing rows.

3. Issues with the df_to_bq Function

  • Review for Hidden Logic: Ensure there's no unintended logic within your df_to_bq function that might lead to duplication. The function, as provided, should not inherently cause duplicates unless the input DataFrame already contains them.

Debugging and Prevention Strategies

  • Inspect Intermediate Data: After merging DataFrames but before loading to BigQuery, inspect for duplicates:
 
merged_df = pd.concat(dataframes, ignore_index=True) 
print("Number of duplicate rows:", merged_df.duplicated().sum()) 
  • Deduplicate Before BigQuery: If duplicates are found, remove them before loading to BigQuery:
 
merged_df = pd.concat(dataframes, ignore_index=True) 
merged_df = merged_df.drop_duplicates() # Deduplicate the merged DataFrame 
df_to_bq(merged_df, table_id, bigquery_client, dataset_id) 
  • Use Unique Identifiers for Deduplication: If your dataset has a unique identifier (or a combination of columns that can serve as one), consider using BigQuery's DML statements to deduplicate after loading.

    Example:

     
    DELETE FROM `project.dataset.table_name` 
    WHERE ROW_NUMBER() OVER (PARTITION BY id_column, column2 ORDER BY timestamp_column DESC) > 1; 

Additional Considerations

  • Alternative Deduplication Methods: For complex deduplication logic, BigQuery's window functions (e.g., ROW_NUMBER()) provide powerful options.
  • Data Integrity: Define what constitutes a duplicate based on your dataset.
  • Performance and Cost: Deduplicating before BigQuery is generally more efficient.
  • Regular Data Validation: Implement regular checks to maintain data quality.

View solution in original post

1 REPLY 1

Hi @ayushmaheshwari ,

Encountering duplicate rows in BigQuery can happen for several reasons, even when your process seems correct at first glance. Here are som potential causes and how to address them:

1. Duplicates in the Source CSV Files

  • Double-check Your CSVs: Carefully review your CSV files to ensure they don't contain duplicate rows before loading them into BigQuery. Duplicates at this stage can propagate through your data pipeline.
  • Deduplicating CSVs: If duplicates are found within the CSVs, it's crucial to clean them up. This can be done before loading or as a preprocessing step within your Python code:
 
def load_csv_to_dataframe(storage_client, bucket_name, file_name): 
    # ... (existing code) ... 
    df = pd.read_csv(file_path, encoding='unicode_escape') 
    df = df.drop_duplicates() # Remove duplicates within the CSV 
    df['file_name'] = file_name 
    return df 

2. Overwriting vs. Appending in BigQuery

  • WRITE_TRUNCATE vs. WRITE_APPEND: Your script uses WRITE_TRUNCATE for write_disposition, which overwrites the existing table each time. This approach prevents accumulation of duplicates across script executions but doesn't address duplicates within your source data or those introduced during data processing.
  • Consider WRITE_APPEND if your goal is to add new data to existing data in BigQuery, ensuring you're not re-adding existing rows.

3. Issues with the df_to_bq Function

  • Review for Hidden Logic: Ensure there's no unintended logic within your df_to_bq function that might lead to duplication. The function, as provided, should not inherently cause duplicates unless the input DataFrame already contains them.

Debugging and Prevention Strategies

  • Inspect Intermediate Data: After merging DataFrames but before loading to BigQuery, inspect for duplicates:
 
merged_df = pd.concat(dataframes, ignore_index=True) 
print("Number of duplicate rows:", merged_df.duplicated().sum()) 
  • Deduplicate Before BigQuery: If duplicates are found, remove them before loading to BigQuery:
 
merged_df = pd.concat(dataframes, ignore_index=True) 
merged_df = merged_df.drop_duplicates() # Deduplicate the merged DataFrame 
df_to_bq(merged_df, table_id, bigquery_client, dataset_id) 
  • Use Unique Identifiers for Deduplication: If your dataset has a unique identifier (or a combination of columns that can serve as one), consider using BigQuery's DML statements to deduplicate after loading.

    Example:

     
    DELETE FROM `project.dataset.table_name` 
    WHERE ROW_NUMBER() OVER (PARTITION BY id_column, column2 ORDER BY timestamp_column DESC) > 1; 

Additional Considerations

  • Alternative Deduplication Methods: For complex deduplication logic, BigQuery's window functions (e.g., ROW_NUMBER()) provide powerful options.
  • Data Integrity: Define what constitutes a duplicate based on your dataset.
  • Performance and Cost: Deduplicating before BigQuery is generally more efficient.
  • Regular Data Validation: Implement regular checks to maintain data quality.