Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Datastream to BQ: partitioning of target tables without an updated_at column

I am using Datastream to ingest data from various MySQL and Postgres data into our BigQuery. It works like a charm except one thing: there is no automatic partitioning of the target tables. This is already addressed in the documentation, where it is suggested to manually create a partitioned table and then configure datastream to use that table.

Well, this works except one thing: it presumes that there is a proper source timestamp column in the source data that I could use for partitioning. Unfortunately, I don't have an updated_at column in the provided data (not my data, I cannot change this) and I would love to be able to use datastream's own metadata: datastream_metadata.source_timestamp, but I'm pulling my hair because they put this into a record (why, oh why?!) and thus this cannot be used as a partition key!

Is there any workaround? Maybe I could I use ingestion time partitioning? Will this give a result similar to datastream's source_timestamp column?

Any thoughs, ideas, or workarounds would be greatly appreciated.

Thank you very much in advance.

1 11 1,139
11 REPLIES 11

What do you want to achieve with partitioning?

If you currently do not have a datetime field that can be used in partitioning, this means that your queries do not also filter the data by a datetime value. Correct?

Ideally you should partition your table by a field that will always be present in your queries, thus limiting the amount of data that bigquery needs to read to process the request.

If you partition your data by a field that is not part of your queries, I imagine that your performance and execution cost will be much worse, as Bigquery will end up executing 1 query for each of the partitions that exist.

For example, you have a table that has 2 fields, ID and datetime. This table is partitioned by day of the datetime field. And there are 365 partitions, one for each day of the last year. If you make the following query `Select * where id > 1000 and id < 5000`, bigquery will not know which of the 365 partitions to access and so it will need to execute 365 queries to discover the data.

Wouldn't it be better in your case to use an integer-range partitioned in a column with a sequential ID? So you could have partitions with fixed amounts of records. But again, this would be of no use if your queries on these tables are not filtering by ID.

You're facing a common challenge when dealing with partitioning in Google Cloud Datastream, especially when the source data lacks a suitable timestamp column. Utilizing ingestion time partitioning in BigQuery is indeed a practical workaround in this scenario. Here's how it can be implemented and what you should consider:

Ingestion Time Partitioning in BigQuery

Ingestion time partitioning in BigQuery utilizes the time at which data is loaded into BigQuery as the partitioning timestamp. This method effectively mimics the behavior of using the datastream_metadata.source_timestamp for partitioning, with some caveats.

Setting Up Ingestion Time Partitioning

Create a Partitioned Table in BigQuery:

CREATE TABLE mydataset.partitioned_table (
  id INT64,
  data STRING,
  other_columns STRING,
  _PARTITIONTIME TIMESTAMP
) PARTITION BY _PARTITIONTIME;

Configure Datastream to Load Data into the Partitioned Table:

  • When setting up your Datastream destination, ensure it points to mydataset.partitioned_table.

  • Datastream will automatically handle the ingestion and populate the _PARTITIONTIME column.

Advantages and Considerations

Advantages:

  • Simplicity: No need to modify the source data or rely on unavailable columns.

  • Automation: BigQuery manages partitioning based on the ingestion timestamp.

Considerations:

  • Data Latency: Ingestion time might slightly differ from the actual event time, especially if there's a delay in data streaming. This discrepancy can be more noticeable with high-volume or real-time data streams.

  • Data Consistency: Ensure the ingestion process is reliable and doesn't introduce significant delays, which might affect the accuracy of your partitions.

Alternative Solutions

If ingestion time partitioning doesn't fully meet your needs, consider these alternatives:

Data Transformation:

Modify your Datastream pipeline to add a timestamp column to your source data before ingestion.

Custom Metadata:

Leverage Datastream's custom metadata feature to include the source timestamp and use it for partitioning in BigQuery.

Example Query

To query data partitioned by ingestion time, use:

SELECT *
FROM mydataset.partitioned_table
WHERE _PARTITIONTIME BETWEEN TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY) AND CURRENT_TIMESTAMP();

This query retrieves data ingested in the last 24 hours.

Using ingestion time partitioning in BigQuery is a practical workaround for your situation. It allows you to partition tables based on when data is ingested, serving as a proxy for the source timestamp you intended to use. Thoroughly test this setup to confirm it meets your performance and data latency requirements. Remember that slight discrepancies between ingestion time and actual event time may occur, so consider alternative solutions if precise timestamp alignment is crucial for your use case.

A challenge here is that we might have many tables and you've got got to configure the partitioning on each. A given database might have many tables; and to use this idea effectively you've got to get hold of the schema for each table and set up the partitioning for all of the source tables, and somehow keep this in sync with the source database in the event of changes to the tables.

@ms4446 I never thanked you for your reply. I had asked this a long time ago and in fact just went ahead and implemented the partitioning by ingestion timestamp. It is not the best solution, but it's better than nothing. 

@PauR I also had the same issue. I came up with the script below to overcome it. You can easily combine it with the output of "bq ls" to run it for multiple tables, or automate it via some other means.

 

#!/usr/bin/env bash
set -euo pipefail

###################################
# Default values
###################################
PROJECT_ID="bigquery-338208"
SOURCE_DATASET="user_gtzanakis"
SOURCE_TABLE="reservations_bookings"
TARGET_DATASET="temporary_tables"
TIME_PARTITION_TYPE="MONTH"  # e.g., DAY, MONTH, YEAR
PRIMARY_KEY_COL=""

###################################
# Parse Command-Line Arguments
###################################
usage() {
  echo "Usage: $0 [options]"
  echo
  echo "Options:"
  echo "  --project_id <PROJECT_ID>           GCP project (default: ${PROJECT_ID})"
  echo "  --source_dataset <DATASET>          Source dataset (default: ${SOURCE_DATASET})"
  echo "  --source_table <TABLE>              Source table (default: ${SOURCE_TABLE})"
  echo "  --target_dataset <DATASET>          Dataset for temporary tables (default: ${TARGET_DATASET})"
  echo "  --time_partition_type <PARTITION>   Time partitioning type (default: ${TIME_PARTITION_TYPE})"
  echo "  --primary_key_col <COLUMN>          Primary key column (optional). If not provided, script will prompt."
  echo "  --help                              Show this help message and exit"
  echo
  echo "Valid time partitioning types include DAY, HOUR, MONTH, YEAR."
  exit 1
}

while [[ $# -gt 0 ]]; do
  key="$1"
  case $key in
    --project_id)
      PROJECT_ID="$2"
      shift; shift
      ;;
    --source_dataset)
      SOURCE_DATASET="$2"
      shift; shift
      ;;
    --source_table)
      SOURCE_TABLE="$2"
      shift; shift
      ;;
    --target_dataset)
      TARGET_DATASET="$2"
      shift; shift
      ;;
    --time_partition_type)
      TIME_PARTITION_TYPE="$2"
      shift; shift
      ;;
    --primary_key_col)
      PRIMARY_KEY_COL="$2"
      shift; shift
      ;;
    --help)
      usage
      ;;
    *)
      echo "Unknown option: $1"
      usage
      ;;
  esac
done

error_exit() {
    echo "Error: $1"
    exit 1
}

NEW_TABLE="new_${SOURCE_TABLE}"
BACKUP_TABLE="backup_${SOURCE_TABLE}"

###################################
# Prompt at the very beginning if no primary key is given
###################################
if [[ -z "${PRIMARY_KEY_COL}" ]]; then
  read -r -p "NO PRIMARY KEY IS DESIGNATED!!! proceed without primary key? (y/n) " choice
  case "$choice" in
    y|Y ) echo "Proceeding without primary key."; ;;
    n|N ) error_exit "User chose not to proceed without a primary key. Exiting." ;;
    * )   error_exit "Invalid choice. Exiting." ;;
  esac
fi

###################################
# Ensure no leftover tables exist
###################################
echo "Removing any existing ${NEW_TABLE} and ${BACKUP_TABLE} tables in ${TARGET_DATASET}..."
bq rm -f -t "${PROJECT_ID}:${TARGET_DATASET}.${NEW_TABLE}" || true
bq rm -f -t "${PROJECT_ID}:${TARGET_DATASET}.${BACKUP_TABLE}" || true

###################################
# Create a partitioned copy of the original table
###################################
echo "Creating partitioned copy of ${SOURCE_DATASET}.${SOURCE_TABLE} as ${TARGET_DATASET}.${NEW_TABLE} with ${TIME_PARTITION_TYPE} partitioning..."
bq query \
   --use_legacy_sql=false \
   --destination_table "${PROJECT_ID}:${TARGET_DATASET}.${NEW_TABLE}" \
   --time_partitioning_type="${TIME_PARTITION_TYPE}" \
   "SELECT id, * EXCEPT(id) FROM \`${PROJECT_ID}.${SOURCE_DATASET}.${SOURCE_TABLE}\`"

echo "Partitioned table ${TARGET_DATASET}.${NEW_TABLE} created successfully."

###################################
# Backup original table and remove it
###################################
echo "Backing up and removing the original table ${SOURCE_DATASET}.${SOURCE_TABLE}..."
bq cp \
   "${PROJECT_ID}:${SOURCE_DATASET}.${SOURCE_TABLE}" \
   "${PROJECT_ID}:${TARGET_DATASET}.${BACKUP_TABLE}" \
&& bq rm -f -t "${PROJECT_ID}:${SOURCE_DATASET}.${SOURCE_TABLE}"

echo "Original table ${SOURCE_DATASET}.${SOURCE_TABLE} backed up as ${TARGET_DATASET}.${BACKUP_TABLE} and removed successfully."

###################################
# Copy the new partitioned table back to the original dataset/table name
###################################
echo "Copying the partitioned table back to ${SOURCE_DATASET}.${SOURCE_TABLE}..."
bq cp \
   "${PROJECT_ID}:${TARGET_DATASET}.${NEW_TABLE}" \
   "${PROJECT_ID}:${SOURCE_DATASET}.${SOURCE_TABLE}"

echo "The original table ${SOURCE_DATASET}.${SOURCE_TABLE} is now partitioned by ${TIME_PARTITION_TYPE}."

###################################
# Add Primary Key if specified
###################################
if [[ -n "${PRIMARY_KEY_COL}" ]]; then
  echo "Adding PRIMARY KEY constraint on '${PRIMARY_KEY_COL}' (NOT ENFORCED) to ${SOURCE_DATASET}.${SOURCE_TABLE}..."
  bq query --use_legacy_sql=false \
    "ALTER TABLE \`${PROJECT_ID}.${SOURCE_DATASET}.${SOURCE_TABLE}\` ADD PRIMARY KEY(${PRIMARY_KEY_COL}) NOT ENFORCED;"
  echo "Primary key constraint added successfully."
fi

###################################
# Script Completed
###################################
echo "Script completed successfully."

 

 

It is important to set a primary key if there is one, as datastream uses them when there is one at the source, and it will return an error if it doesn't find one. 

But indeed we are deep on "workaround territory" for sure.

OK - thanks for the script. I'll experiment a bit.

I guess you should try using ingestion time partitioning in BigQuery. This will help in organizing data based on when it is added. Itis not exactly the same as datastream_metadata.source_timestamp, but it could work as an alternative.

I have sometime prefer to use another option as well which is to extract the source timestamp into a separate column during data processing. Hope this helps you!

Thank you @furqanhamid ! Indeed, I went ahead and used ingestion time partitioning as it was by far the most straightforward design choice. Partitioning the existing tables was a bit of a pain but I created a helper script.

Regarding your alternative option, is there a straightforward way to add a cloud function processing layer to a datastream cdc connection from MySQL to BigQuery?

 

Yes there are certain ways that can help us like using pub or sub to capture datastream events . Then we have to trigger a cloud function to extract the source timestamp and write it to bigquery.

This will make sure the minimal latency and keeps our pipeline efficient.  As far as pub/sub and cloud function is concerned, then Google pub/sub and cloud function based integration docs are a great starting point for implementation.

Hope you find these steps helpful!

Thank you! I’m trying to understand how it works. From what I’ve seen, a Pub/Sub topic isn’t listed as one of the possible Datastream destinations. Maybe I should just look it up further, but could you give me some pointers on how to integrate Pub/Sub into this kind of solution?

I guess you right mate!  datastream does not directly support pub/sub as a destination. I try a common workaround that is to to route datastreams output through dataflow. This helps us in processing the data and publish it to a pub/sub topic.  Hope this helps in processing downstream.

Got it. Haven't used dataflow yet, but maybe I ought have a look.

So you more or less define a dataflow with the datastream as "source" and a pubsub as "destination"? Or am I oversimplyfing this?