I am using the google provided template 'text on gcs bucket to bigquery' to move some ndjson files from cloud storage into Big query tables. We have a requirement like once the files are loaded successfully into bigquery I need to trigger a cloud function that moves these files from current bucket to an archive bucket (This movement needs to happen only when the dataflow run is successful). I tried creating eventarc trigger on the cloud function based on dataflow job status change (The only direct option currently available). The problem is, both the dataflow job and the cloud function are getting triggered simultaneously while using this (as dataflow in 'running' state is considered as status change), and dataflow is failing as the bucket movement completes before the dataflow job and it is not able to find the source files.
I am using the below command from a cloud function to define and start the dataflow job.
service.projects().locations().templates().launch()
After some research, decided to write the dataflow job status to a pubsub topic and then set the trigger for bucket movement CF based on the topic. However, the configuration variable "--notificationsTopic" to write the status to a topic does not seem to be supported by service.projects().templates().launch(). I looked at using projects.locations.jobs.create() instead, but feel this is very complex to implement particularly when we have a separate python UDF for dropping any new fields in the input when that not present in BQ schema. I am new to GCP, can someone please guide me on how to achieve this with any simpler approach (like getting the dataflow job status within the existing trigger options itself instead of explicitly writing to pubsub) ? We are also not allowed to create any container dockers in our project needed for create() method.
Hi @Chanthar,
I understand that you're looking for a way for a Cloud Run function to get triggered based on the successful outcome of a dataflow job.
As of now, it's not possible to create a function that directly gets triggered by a successful dataflow run. There's currently an existing feature request filed for that although we currently can't confirm the exact timetable for its actual implementation.
One possible workaround as suggested here is that you can explore the use of Cloud Logging with filter:
resource.type="dataflow_step"
textPayload="Worker pool stopped."
You can then create a sink while having Pub/Sub topic as the destination. From there, the topic can be linked to your function to perform its intended task after it gets triggered. The only downside of this is that it is not always indicative of success, and it can happen due to several reasons like job success, failures, cancellation, etc. You still need to optimize your flow and ensure your dataflow jobs are running successfully. Overall, take note that this may still not be the best approach based on your use-case.
Was this helpful? If so, please accept this answer as “Solution”. If you need additional assistance, reply here within 2 business days and I’ll be happy to help.