Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

DATAFLOW -Operation ongoing for over 2437.47 seconds in state process-msecs in step Move-ptransform-

Hello

I have a dataflow job that:

  1. Reads a GSC bucket
  2. Decompress n tar files with m images. 2Gb of size per each tar file.
  3. Upload these files  to another GCS bucket

The DAG looks like this:

Screen Shot 2023-07-10 at 14.16.51.png

But the job keeps failing because of this error: Operation ongoing for over 2437.47 seconds in state process-msecs in step Move-ptransform-

I've tried with different machine types including: n1-standard-2, n1-standard-8, n1-standard-96, n1-highmem-8, n1-highmem-96.

How can I accomplish this using Cloud Dataflow? 

--
Best regards
David Regalado
Web | Linkedin | Twitter

3 5 565
5 REPLIES 5

Here are some suggestions to address this issue:

 

  • Decompress Files in Smaller Chunks: Instead of decompressing the entire tar file at once, consider decompressing it in smaller chunks. This can help manage memory usage and prevent timeouts.

  • Increase Write Speed to GCS: If the uploading process to GCS is slow, it could be causing a bottleneck. Consider increasing the number of threads used to upload files to GCS.

  • Use a Buffer: Consider using a buffer to temporarily store decompressed files before uploading them to GCS. This can help manage memory and disk usage.

  • Adjust the Number of Workers: If the number of workers is too low, it could be causing a bottleneck in your pipeline. Consider increasing the number of workers.

  • Optimize Your Code: Make sure your code is as efficient as possible. Look for any potential bottlenecks or inefficiencies and try to optimize them.

 

The memory and disks are fine (look at the machine types I've tried) because:

  1. The CPU utilization (All Workers) < 30%
  2. The Max worker memory utilization (estimated bytes/sec) < 25%

But about this:

  • Increase Write Speed to GCS: If the uploading process to GCS is slow, it could be causing a bottleneck. Consider increasing the number of threads used to upload files to GCS.

How do I do that?

--
Best regards
David Regalado
Web | Linkedin | Twitter

ms4446

--
Best regards
David Regalado
Web | Linkedin | Twitter

Increasing the number of threads used to upload files to Google Cloud Storage (GCS) can help improve the write speed. This can be achieved by using parallel uploads, which is a feature provided by the Google Cloud Storage client libraries.

Here's a general example of how you can do this in Python using the google-cloud-storage library:

from google.cloud import storage
from multiprocessing.dummy import Pool

def upload_blob(bucket_name, source_file_name, destination_blob_name):
"""Uploads a file to the bucket."""
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)

blob.upload_from_filename(source_file_name)

print(
"File {} uploaded to {}.".format(
source_file_name, destination_blob_name
)
)

# Get a list of all files to upload
files_to_upload = [...] # replace with your list of files
bucket_name = 'your-bucket-name'

# Create a thread pool
pool = Pool(processes=20) # replace 20 with the number of threads you want

# Use the thread pool to upload files in parallel
for file in files_to_upload:
pool.apply_async(upload_blob, (bucket_name, file, file))

pool.close()
pool.join()

In this example, a pool of threads is created using the multiprocessing.dummy.Pool class. The Pool.apply_async method is then used to upload each file in a separate thread.

Please note that you should adjust the number of threads based on your specific use case and the resources available on your machine. Too many threads could overwhelm your system, while too few might not fully utilize your resources.

Also, remember to handle exceptions properly in your upload_blob function. If an upload fails, you might want to retry it or log the failure so you can investigate it later.

This is a very basic example and might need to be adjusted based on your specific use case. For example, if you're uploading large files, you might want to use the Blob.upload_from_file method instead, which allows you to upload a file object and could be more memory-efficient for large files.

mind-blown.gif

Thank you, I'll try it and let you know.

--
Best regards
David Regalado
Web | Linkedin | Twitter