Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Dataflow Stateful processing Issue || worker failed with OOM

Hi,

I have implemented dataflow pipeline to replicate data from BQ to cloudsql. In Bigquery there are tables which having billions of rows, I read data from bigquery table in paginated way and emit pages data to write for next stage/transformation (new DoFn to write data in parallel) .

Everything was working fine from dynamic reading BQ table table data and writing data dynamically to cloud sql.  The problem starts with status tracking for all the pages/batches of large bigquery table that we are continuously emits to write so that we came to know whether all the batches/page of table we read had been write by next stage/transformation(new DoFn to write) or not.

I have implemented stateful function in pipeline to write data to track the all the batches of table so that I can notify system that data replication for table had been completed but problem starts from here.. 

In reading stage from BQ for table earlier we were reading data in batches and emitting those batches to write by next stage/transformation but with stateful processing data emitting for batch/pages to next stage/transformation stops until it read all the pages and create corresponding batches to emit. In fact it emit but next stage never receive it and worker goes out memory as large BQ tables can be in size 10 GB- 100 GB.   

Is it the way of working for beam dataflow with stateful processing ?   Can someone please suggest, what are the other possible/better way of maintaining the status on dataflow job level which will be shared among worker.

 

Solved Solved
0 5 1,335
1 ACCEPTED SOLUTION

Your understanding of side inputs in Apache Beam is correct. They are indeed immutable and read-only, loaded by each worker before processing a DoFn. This makes them suitable for providing additional input data but not for tracking mutable state or progress across a dynamic, streaming pipeline.

Given your requirement to track the progress of batch writes for multiple tables in a streaming context, here are some suggestions:

  1. External State Management:

    • Cloud Storage or Database: Use an external storage system like Cloud SQL, Cloud Datastore, or even a simple Cloud Storage bucket to track the progress of each batch. Each worker can write to this external system once it completes writing a batch. You can then query this system to determine the overall progress.
    • Pub/Sub for State Updates: Consider using a separate Pub/Sub topic to publish state updates. Each worker can publish a message after completing a batch, and a separate process or pipeline can aggregate these messages to track overall progress.
  2. Data-Driven Triggers in Beam:

    • Custom Trigger Implementation: Apache Beam allows you to implement custom triggers. You could design a trigger that fires based on the data you store in an external system. For example, a trigger could fire when all batches for a table are marked as complete in your external database.
    • Event Time Triggers: If your batches are time-bound, you can use event time triggers to process data. This might not directly solve the state tracking issue but can help manage when data is processed.
    • Combining with Windowing: Triggers are often used in combination with windowing. If your data can be logically divided into windows (e.g., based on arrival time or a custom logic), you can apply triggers on these windows to manage processing.
  3. Stateful DoFn with External Checkpoints:

    • Checkpointing to External Storage: Instead of relying solely on Beam's stateful processing, you can implement a pattern where each DoFn instance checkpoints its progress to an external system. This way, the state is not held entirely in memory but is periodically persisted externally.
  4. Monitoring and Aggregation:

    • Pipeline Monitoring: Use Dataflow's monitoring tools to keep track of which batches have been processed. This could involve logging or metrics that can be queried to understand the pipeline's state.
    • Aggregation Service: Implement a separate service or pipeline stage that aggregates progress data from your external storage or Pub/Sub messages to determine the overall progress.

For your use case, a combination of external state management (using a database or Cloud Storage) and custom triggers or monitoring might be the most effective approach. This setup allows you to offload the mutable state from the Dataflow workers, thereby avoiding the memory issues associated with large stateful operations in Beam, while still being able to track the progress of your batches dynamically.

View solution in original post

5 REPLIES 5