Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Dataflow Stateful processing Issue || worker failed with OOM

Hi,

I have implemented dataflow pipeline to replicate data from BQ to cloudsql. In Bigquery there are tables which having billions of rows, I read data from bigquery table in paginated way and emit pages data to write for next stage/transformation (new DoFn to write data in parallel) .

Everything was working fine from dynamic reading BQ table table data and writing data dynamically to cloud sql.  The problem starts with status tracking for all the pages/batches of large bigquery table that we are continuously emits to write so that we came to know whether all the batches/page of table we read had been write by next stage/transformation(new DoFn to write) or not.

I have implemented stateful function in pipeline to write data to track the all the batches of table so that I can notify system that data replication for table had been completed but problem starts from here.. 

In reading stage from BQ for table earlier we were reading data in batches and emitting those batches to write by next stage/transformation but with stateful processing data emitting for batch/pages to next stage/transformation stops until it read all the pages and create corresponding batches to emit. In fact it emit but next stage never receive it and worker goes out memory as large BQ tables can be in size 10 GB- 100 GB.   

Is it the way of working for beam dataflow with stateful processing ?   Can someone please suggest, what are the other possible/better way of maintaining the status on dataflow job level which will be shared among worker.

 

Solved Solved
0 5 1,320
1 ACCEPTED SOLUTION

Your understanding of side inputs in Apache Beam is correct. They are indeed immutable and read-only, loaded by each worker before processing a DoFn. This makes them suitable for providing additional input data but not for tracking mutable state or progress across a dynamic, streaming pipeline.

Given your requirement to track the progress of batch writes for multiple tables in a streaming context, here are some suggestions:

  1. External State Management:

    • Cloud Storage or Database: Use an external storage system like Cloud SQL, Cloud Datastore, or even a simple Cloud Storage bucket to track the progress of each batch. Each worker can write to this external system once it completes writing a batch. You can then query this system to determine the overall progress.
    • Pub/Sub for State Updates: Consider using a separate Pub/Sub topic to publish state updates. Each worker can publish a message after completing a batch, and a separate process or pipeline can aggregate these messages to track overall progress.
  2. Data-Driven Triggers in Beam:

    • Custom Trigger Implementation: Apache Beam allows you to implement custom triggers. You could design a trigger that fires based on the data you store in an external system. For example, a trigger could fire when all batches for a table are marked as complete in your external database.
    • Event Time Triggers: If your batches are time-bound, you can use event time triggers to process data. This might not directly solve the state tracking issue but can help manage when data is processed.
    • Combining with Windowing: Triggers are often used in combination with windowing. If your data can be logically divided into windows (e.g., based on arrival time or a custom logic), you can apply triggers on these windows to manage processing.
  3. Stateful DoFn with External Checkpoints:

    • Checkpointing to External Storage: Instead of relying solely on Beam's stateful processing, you can implement a pattern where each DoFn instance checkpoints its progress to an external system. This way, the state is not held entirely in memory but is periodically persisted externally.
  4. Monitoring and Aggregation:

    • Pipeline Monitoring: Use Dataflow's monitoring tools to keep track of which batches have been processed. This could involve logging or metrics that can be queried to understand the pipeline's state.
    • Aggregation Service: Implement a separate service or pipeline stage that aggregates progress data from your external storage or Pub/Sub messages to determine the overall progress.

For your use case, a combination of external state management (using a database or Cloud Storage) and custom triggers or monitoring might be the most effective approach. This setup allows you to offload the mutable state from the Dataflow workers, thereby avoiding the memory issues associated with large stateful operations in Beam, while still being able to track the progress of your batches dynamically.

View solution in original post

5 REPLIES 5

My use is simple but having issues in implementation with dataflow as mentioned : 

Need pipeline to replicate data from BQ to cloudsql.  I have used dataflow and suggested in one another discussion as well https://www.googlecloudcommunity.com/gc/Data-Analytics/Is-Dataflow-Good-for-bulk-BigQuery-to-Cloud-S...  In Bigquery there are tables which having billions of rows, I read data from bigquery table in paginated way and emit pages data to write for next stage/transformation (new DoFn to write data in parallel) .

I need status tracking for all the pages/batches of bigquery table that we are continuously emits to write so that we came to know whether all the batches/page of table had been written by next stage/transformation(new DoFn to write) or not.

The stateful processing causing your worker to go out of memory in your Google Cloud Dataflow pipeline is indeed a common challenge when dealing with large datasets. Beam tends to hold all the data in memory for stateful DoFns until the entire state is processed, which can lead to memory issues, especially with large datasets.

Here are some revised solutions and considerations:

  1. Alternatives to Stateful Processing:

    • Side Inputs: For tracking pages/batches, consider using side inputs. However, be cautious with very large datasets as side inputs are broadcast to all workers and can become impractical.
    • External Storage: Offload state management by writing the status of processed pages/batches to external storage solutions like Cloud Storage or Cloud SQL. This approach can significantly reduce memory load on the workers.
  2. Optimizing Stateful Processing:

    • Increase Worker Memory: Allocating more memory to your worker VMs can be a direct solution, but keep an eye on the cost implications.
    • Utilize Beam's State and Timer APIs: These APIs, such as ValueState and Timers, can help manage state more efficiently, though they require a solid understanding of their mechanisms.
    • Experiment with Different Runners: Different runners like Apache Spark or Flink might offer better memory management for stateful processing in your specific case.
  3. Monitoring and Debugging:

    • Dataflow Monitoring Tools: Use Dataflow's monitoring tools to identify performance bottlenecks and optimize your pipeline.
    • Enable Debug Logging: While debug logging can provide detailed execution information, it should be used carefully due to the potential for generating large amounts of data.
  4. Sharing Information Among Workers:

    • Cloud Storage and Cloud SQL: Both are viable for sharing status information among workers. The choice depends on your pipeline's specific needs.
    • Distributed Data Structures: Consider Beam's distributed data structures like MapState for sharing state, but be aware that they add to the overall state that needs to be managed.
  5. Additional Resources:

    • Explore the provided links for a deeper understanding of stateful processing in Beam and Dataflow's runner configurations and monitoring tools.
  6. Additional Considerations:

    • Cost-Performance Balance: Always weigh the cost against performance benefits when scaling up resources or choosing solutions.
    • Pipeline Architecture Review: In some cases, reevaluating and modifying the pipeline architecture can lead to more efficient processing, especially for handling large-scale data.

Remember, the best approach depends on the specifics of your pipeline, including the size of your data and your processing patterns. Experimentation with different strategies might be necessary to find the most efficient and cost-effective solution for tracking your page processing progress.

Thanks a lot @ms4446 for your inputs and these are helpful. I have queries on alternative you suggested for stateful processing.

SideInputs share immutable/read only state among worker and each worker load it before processing of DoFn to provide additional input. In our case we are sending data in batches to write for table and these batches can be written among workers that is something I need to track whether all the batches had been written or not for particular table.

This is streaming pipeline so data will be written for multiple table in parallel and table will be resolved on runtime ( via pubsub event ) and each table will have it's own batches. Lets say I store table batch information in side input like how many batches, batches name and these are read only/immutable.
Now What I can do is , check whether or not I need to execute a batch using side input but can't conclude whether all the batches of table had been written or not because still I do not have something that manages the state. After using sideinputs I need something that is managing the state. Please suggest your input on this.

The second alternative you suggest will be useful for storing the state like external storage. Could you please suggest on Data driven trigger as well in beam pipeline ?

Your understanding of side inputs in Apache Beam is correct. They are indeed immutable and read-only, loaded by each worker before processing a DoFn. This makes them suitable for providing additional input data but not for tracking mutable state or progress across a dynamic, streaming pipeline.

Given your requirement to track the progress of batch writes for multiple tables in a streaming context, here are some suggestions:

  1. External State Management:

    • Cloud Storage or Database: Use an external storage system like Cloud SQL, Cloud Datastore, or even a simple Cloud Storage bucket to track the progress of each batch. Each worker can write to this external system once it completes writing a batch. You can then query this system to determine the overall progress.
    • Pub/Sub for State Updates: Consider using a separate Pub/Sub topic to publish state updates. Each worker can publish a message after completing a batch, and a separate process or pipeline can aggregate these messages to track overall progress.
  2. Data-Driven Triggers in Beam:

    • Custom Trigger Implementation: Apache Beam allows you to implement custom triggers. You could design a trigger that fires based on the data you store in an external system. For example, a trigger could fire when all batches for a table are marked as complete in your external database.
    • Event Time Triggers: If your batches are time-bound, you can use event time triggers to process data. This might not directly solve the state tracking issue but can help manage when data is processed.
    • Combining with Windowing: Triggers are often used in combination with windowing. If your data can be logically divided into windows (e.g., based on arrival time or a custom logic), you can apply triggers on these windows to manage processing.
  3. Stateful DoFn with External Checkpoints:

    • Checkpointing to External Storage: Instead of relying solely on Beam's stateful processing, you can implement a pattern where each DoFn instance checkpoints its progress to an external system. This way, the state is not held entirely in memory but is periodically persisted externally.
  4. Monitoring and Aggregation:

    • Pipeline Monitoring: Use Dataflow's monitoring tools to keep track of which batches have been processed. This could involve logging or metrics that can be queried to understand the pipeline's state.
    • Aggregation Service: Implement a separate service or pipeline stage that aggregates progress data from your external storage or Pub/Sub messages to determine the overall progress.

For your use case, a combination of external state management (using a database or Cloud Storage) and custom triggers or monitoring might be the most effective approach. This setup allows you to offload the mutable state from the Dataflow workers, thereby avoiding the memory issues associated with large stateful operations in Beam, while still being able to track the progress of your batches dynamically.

Thanks a lot @ms4446 for your valuable inputs !