Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Dataproc serverless trigger Using Cloud Composer

This is my Equivalent command line for Dataproc serverless

gcloud dataproc batches submit --project xyz2022 --region asia-south1 pyspark --batch dummy-1 gs://bkt-1/abc.py --version 2.0.40 --container-image "asia-south1-docker.pkg.dev/xyz2022/v1:25" --jars gs://bkt-1/jars/abc42.2.24.jar,gs://bkt-1/jars/abc2.13-0.32.2.jar --py-files gs://bkt/demo.zip,gs://bkt-1/Config.properties --subnet composer-subnet --tags terraform-ssh --service-account batch-job-sa@rnd2022.iam.gserviceaccount.com --history-server-cluster projects/xyz2022/regions/asia-south1/clusters/cluster-4567 --properties spark.app.name=projects/xyz2022/locations/asia-south1/batches/batch-21fcd,spark.driver.cores=4,spark.driver.memory=9600m,spark.dynamicAllocation.executorAllocationRatio=0.3,spark.executor.cores=4,spark.executor.instances=2,spark.executor.memory=9600m,spark.eventLog.dir=gs://spark-dataproc-jobs/history/ --labels goog-dataproc-batch-id=dummy-1,goog-dataproc-batch-uuid=c217ec74-2a4m-40bc-bc18-b6b823a3341c,goog-dataproc-location=asia-south1 -- dummy-1

 I want to create the same Job using Cloud Composer, I am using DataprocCreateBatchOperator.

Please provide me with airflow dag for the same job creation.

0 2 1,007
2 REPLIES 2

To create an Airflow DAG for your Dataproc serverless job creation, you can use the following code:
 
from airflow import DAG
from airflow.providers.google.cloud.operators.dataproc import DataprocCreateBatchOperator
import airflow.utils.dates

default_args = {
    'start_date': airflow.utils.dates.days_ago(2),
    'retries': 1
}

dag = DAG(
    'dataproc_serverless_batch_job_creation',
    default_args=default_args,
    schedule_interval='@daily'
)

create_batch_operator = DataprocCreateBatchOperator(
    task_id='create_batch',
    project_id='xyz2022',
    region='asia-south1',
    job_type='pyspark',
    batch_id='dummy-1',
    main_python_file_uri='gs://bkt-1/abc.py',
    dataproc_version='2.0.40',
    container_image='asia-south1-docker.pkg.dev/xyz2022/v1:25',
    jars=['gs://bkt-1/jars/abc42.2.24.jar', 'gs://bkt-1/jars/abc2.13-0.32.2.jar'],
    py_files=['gs://bkt/demo.zip', 'gs://bkt-1/Config.properties'],
    subnet='composer-subnet',
    tags=['terraform-ssh'],
    service_account='batch-job-sa@rnd2022.iam.gserviceaccount.com',
    history_server_cluster='projects/xyz2022/regions/asia-south1/clusters/cluster-4567',
    properties={
        'spark.app.name': 'projects/xyz2022/locations/asia-south1/batches/batch-21fcd',
        'spark.driver.cores': 4,
        'spark.driver.memory': '9600m',
        'spark.dynamicAllocation.executorAllocationRatio': 0.3,
        'spark.executor.cores': 4,
        'spark.executor.instances': 2,
        'spark.executor.memory': '9600m',
        'spark.eventLog.dir': 'gs://spark-dataproc-jobs/history/'
    },
    labels={
        'goog-dataproc-batch-id': 'dummy-1',
        'goog-dataproc-batch-uuid': 'c217ec74-2a4m-40bc-bc18-b6b823a3341c',
        'goog-dataproc-location': 'asia-south1'
    },
    dag=dag
)

To run the DAG, you can use the following command:

 

airflow dags trigger dataproc_serverless_batch_job_creation

Note: Always ensure you have the necessary Airflow providers and dependencies installed and properly configured before running the DAG.

Sorry for the late reply @ms4446  !!
I am going to test this dag just now If I need further assistance I will ask you otherwise Accept as a Solution.
Thanks for helping me!