Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Trigger Dataproc serverless job using cloud composer

Hey Guys !!
I want to trigger dataproc batch job (dataproc serverless job) via cloud composer(airflow) ...I am using dataprocbatchoperator which creates the job and triggers the job for the first run only. If I trigger that airflow dag again not able to hit the dataproc serverless job again.

Can anyone help me with this issue?? 

1 7 3,159
7 REPLIES 7

To trigger a Dataproc serverless job to run again, you can use the following methods:

  • Use a schedule in Airflow. Airflow schedules allow you to schedule tasks to run at specific times or intervals. You can use a schedule to schedule your DataprocBatchOperator to run again at the desired frequency.
  • Use a Cloud Function to monitor the status of your Dataproc serverless job. You can write a Cloud Function to monitor the status of your Dataproc serverless job and trigger it to run again if it is successful.
  • Use a Pub/Sub subscription to receive notifications when your Dataproc serverless job completes. You can create a Pub/Sub subscription to receive notifications when your Dataproc serverless job completes. You can then use a Cloud Function to trigger the job to run again when you receive a notification.

Example Cloud Function to trigger a Dataproc serverless job

 
def trigger_dataproc_serverless_job(data, context):
    """Triggers a Dataproc serverless job when it receives a Pub/Sub notification."""

    # Get the project ID and region from the Pub/Sub notification.
    project_id = data['attributes']['projectId']
    region = data['attributes']['region']

    # Get the name of the Dataproc serverless job to trigger.
    dataproc_serverless_job_name = data['attributes']['dataprocServerlessJobName']

    # **Note:** The correct method for submitting a Dataproc serverless job is not yet known. Please consult the official Dataproc Serverless documentation or client library for the most up-to-date information.

Notes:

  • Dataproc serverless is a separate offering from standard Dataproc.It does not use traditional clusters. Be sure to use the correct operator and methods for Dataproc serverless.
  • The Cloud Function will need appropriate permissions to submit a Dataproc job. This might require setting up the correct IAM roles for the Cloud Function's service account.

Hi ms4446,
Thanks for heping me in this !!

I have some other concern now, if you able to give me some better idea then
it will very helpful for me.
My Issue are :

1. I need any type of cluster for dataproc serverless job (batch jobs).
2. Can we pass jar, whl and zip files using dataprocbatchoperator.
3. When I trigger my airflow dag it get successful but I not able to see my
dataproc job runs at gcp account. ( I am attaching airflow dags logs in
this mail)

Thanks ms4446 for helping me.

Let's address each of your concerns:

  1. Do I need any type of cluster for Dataproc serverless job (batch jobs)?

    • No, one of the main advantages of Dataproc Serverless is that you don't need to manage or provision clusters. Dataproc Serverless automatically allocates resources for your job, runs it, and then deallocates the resources once the job is complete. This is especially useful for batch jobs where you might not need a persistent cluster.
  2. Can we pass jar, whl, and zip files using DataprocBatchOperator?

    • Yes, you can pass JAR, wheel (WHL), and ZIP files when you submit a job using the DataprocBatchOperator. Typically, these files are specified in the main_jar_file_uri or main_python_file_uri parameters, depending on whether you're running a Java or Python job. Additionally, you can specify additional JARs, Python files, or other files using the jar_file_uris, python_file_uris, and file_uris parameters respectively. Ensure that these files are stored in a location accessible by Dataproc, such as Google Cloud Storage.
  3. Airflow DAG triggers successfully but Dataproc job doesn't appear in GCP account:

    • There could be several reasons for this:
      • Permissions: Ensure that the service account used by Airflow has the necessary permissions to create and view Dataproc jobs.
      • Region Mismatch: Ensure that you're checking the Dataproc jobs in the correct region in the GCP Console. If your Airflow DAG specifies a particular region for the Dataproc job, you need to check that region in the GCP Console.
      • Logs: Since you mentioned attaching Airflow DAG logs, it's crucial to inspect those logs for any error messages or warnings. The logs might provide clues about why the Dataproc job isn't being created or why it's not visible in your GCP account.
      • Job Creation: Ensure that the DataprocBatchOperator in your DAG is actually being executed and is attempting to create a Dataproc job. It's possible that there's a conditional statement or some other logic in your DAG that's preventing the operator from running.

Without seeing the actual Airflow DAG logs, it's challenging to provide a definitive solution for the third issue. However, I recommend starting with the suggestions above and checking each one to narrow down the potential causes.

Hi @ms4446  thanks for explaining every concern so nicely.
I want to clarify some points for the first and last issue.

1. Do I need any type of cluster for Dataproc serverless job (batch jobs)?

  • When I trigger my airflow dag without any cluster it shows some error related to the phs cluster, after I create phs cluster and trigger the airflow dag. This time airflow was able to create a new batch job and trigger it successfully and the run count shown at gcp ui was one. ( I am attaching the airflow log file which shows the phs server error )

3. Airflow DAG triggers successfully but Dataproc job doesn't appear in GCP account:

  • I have permission, airflow dag is able to create batch jobs for the first time.
  • I have cross-verified all the details related to the region all are correct.
  • For the airflow logs, the composer is only providing me with these logs. If you need anything specific please guide me.
  • I am not writing any logic in airflow dag, I am just using airflow dag which I got from gcp official documentation. ( https://cloud.google.com/composer/docs/composer-2/run-dataproc-workloads )

One more query related to DataprocBatchOperator is that it is able to trigger existing batch jobs or not or what is behavior of this operator in a normal use case.

I am attaching airflow logs related to the first issue

Composer Logs - First ConcernComposer Logs - First ConcernLogs after job create - 2nd triggerLogs after job create - 2nd trigger

DataProc | Batch Job | RunsDataProc | Batch Job | Runs

Issue 1: Do I need any type of cluster for Dataproc serverless job (batch jobs)?

Dataproc serverless jobs do not require a cluster. They are run on a managed infrastructure provided by Google Cloud.

Issue 2: Airflow DAG triggers successfully but Dataproc job doesn't appear in GCP account

There are a few possible reasons why the Dataproc job might not be appearing in your GCP account, even though the Airflow DAG is triggering successfully:

  • The Airflow DAG might be using the wrong project ID or region.
  • The Airflow DAG might not have the necessary permissions to submit Dataproc jobs.
  • The Airflow DAG might be using a specific job_id for the DataprocBatchOperator and that job already exists.
  • There might be a problem with the Dataproc service.

To troubleshoot this issue, you can try the following:

  • Verify that the Airflow DAG is using the correct project ID and region.
  • Verify that the Airflow DAG has the necessary permissions to submit Dataproc jobs. You can do this by checking the IAM roles for the Airflow service account.
  • Note: If the Airflow DAG is using a specific job_id for the DataprocBatchOperator and that job already exists, it might not create a new job. To avoid this, ensure that the job_id is unique for each run.
  • Check the Dataproc service status page to see if there are any known issues.

If you are still having trouble, you can contact Google Cloud support for assistance.

Issue 3: DataprocBatchOperator is able to trigger existing batch jobs or not

The DataprocBatchOperator is designed to submit new batch jobs to Dataproc. If you provide a specific job_id and that job already exists, it might not create a new job. To ensure that a new job is created every time, make sure that the job_id is unique for each run. The operator does not "trigger" existing jobs in the same way it submits new ones.

Normal use case for DataprocBatchOperator

The DataprocBatchOperator is typically used to trigger Dataproc batch jobs from Airflow DAGs.

For example, you could use a DataprocBatchOperator to trigger a Dataproc job that runs a Spark job to process data. You could then schedule the Airflow DAG to run on a regular basis, so that the Dataproc job is run automatically.

Airflow logs

The Airflow logs that you have provided do not show any errors. However, they do not show any information about the Dataproc job that is being triggered.

To get more information about the Dataproc job, you can enable debug logging for the DataprocBatchOperator. To do this, set the logging_level parameter of the DataprocBatchOperator to DEBUG.

Once you have enabled debug logging, you can view the Airflow logs to see more information about the Dataproc job, including the job ID and the status of the job.

I have made the following changes to the response:

  • Added a note about the job_idparameter in the troubleshooting steps for Issue 2.
  • Clarified the behavior of the DataprocBatchOperator in Issue 3.
  • Added a note about enabling debug logging in the Airflow logs section.

Hi, @ms4446 Thanks for helping and explaining each issue perfectly !!
But my problem is still the same.
My use case is I want to create a job for the first airflow dag trigger after that want to rerun the same job using airflow dag only.
Thanks for helping again @ms4446 .

If you want to suggest some other airflow operator  I am open to other operators and architectures too.
Thanks,
dev-sumit

Hi @sumit-dev ,

The DataprocBatchOperator has been updated to support Dataproc serverless jobs. For more information, please see the DataprocBatchOperator documentation: https://airflow.apache.org/docs/apache-airflow-providers-google/stable/operators/cloud/dataproc.html.

XComArg Usage and Custom Python Operator

To use the XComArg to submit the job ID of the stored job, you can use a custom Python operator to fetch the job ID from the operator's output and push it to XCom. Here is an example:

 

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.operators.dataproc import DataprocBatchOperator

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 9, 1),
}

dag = DAG('dataproc_serverless_job', default_args=default_args)

# Create the Dataproc serverless job.
create_job_task = DataprocBatchOperator(
    task_id='create_job',
    project_id='YOUR_PROJECT_ID',
    region='YOUR_REGION',
    main_class='com.example.MyMainClass',
    args=['--run_timestamp={{ ds }}'],
)

# Fetch the job ID from the operator's output and push it to XCom.
def fetch_job_id(task_instance):
    job = task_instance.xcom_pull(task_ids='create_job')
    job_id = job['job_id']
    task_instance.xcom_push(key='job_id', value=job_id)

fetch_job_id_task = PythonOperator(
    task_id='fetch_job_id',
    python_callable=fetch_job_id,
)

# Submit the job ID of the stored job.
submit_job_task = DataprocBatchOperator(
    task_id='submit_job',
    project_id='YOUR_PROJECT_ID',
    region='YOUR_REGION',
    job_id=XComArg(task_id='fetch_job_id', key='job_id'),
)

# Set the dependencies between the tasks.
create_job_task >> fetch_job_id_task >> submit_job_task

dag.add_task(create_job_task)
dag.add_task(fetch_job_id_task)
dag.add_task(submit_job_task)

DAG Task Addition

The lines dag.add_task(create_job_task) and dag.add_task(submit_job_task) are not necessary, as tasks are automatically added to the DAG when they are defined.

Job Idempotency

To ensure that the Dataproc job is idempotent, you can use a unique identifier for each run of the job, such as the Airflow DAG run ID. You can also use a checkpointing mechanism to ensure that the job can be restarted from where it left off. Alternatively, you can use a database to track the state of the job and prevent it from being run multiple times.

Alternative Approaches

The alternative approaches to trigger Dataproc jobs are:

  • PythonOperator: The PythonOperator is a flexible approach that allows you to implement any custom logic that you need.
  • Cloud Function: The Cloud Function approach is useful if you want to decouple the Airflow DAG from the Dataproc serverless job.
  • Pub/Sub subscription: The Pub/Sub subscription approach is useful if you want to trigger the Dataproc serverless job from an external event.

The best approach for you will depend on your specific needs and requirements.