Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

List dataflow jobs filter by name and date in python

What is the python API equivalent of the CLI command to list the jobs 

 

gcloud dataflow jobs list --filter="$job_name" --region="${region}" --format=json --created-after=-p1d --sort-by="~stateTime"
0 2 481
2 REPLIES 2

The equivalent Python API code to list the Dataflow jobs with similar filtering options as the gcloud dataflow jobs list command would look something like this:

 

from googleapiclient.discovery import build
from datetime import datetime, timedelta
import logging

def list_dataflow_jobs(project_id, region, job_name=None, job_filter='ACTIVE'):
    """
    Lists Dataflow jobs matching the given criteria.
    """

    # Initialize the Dataflow client
    dataflow = build('dataflow', 'v1b3')

    # Calculate the creation time filter (1 day ago)
    created_after = (datetime.utcnow() - timedelta(days=1)).isoformat() + 'Z'

    try:
        request = dataflow.projects().locations().jobs().list(
            projectId=project_id,
            location=region,
            filter=job_filter  # Filter for job states (e.g., ACTIVE, TERMINATED)
        )

        jobs = []
        while request is not None:
            response = request.execute()

            # Filter jobs by name (if provided) and created time
            filtered_jobs = [
                job for job in response.get('jobs', [])
                if (job_name is None or job['name'] == job_name) and job['createTime'] >= created_after
            ]
            jobs.extend(filtered_jobs)

            # Handle pagination if necessary
            request = dataflow.projects().locations().jobs().list_next(previous_request=request, previous_response=response)

        # Sort by currentStateTime (if available) in descending order
        jobs.sort(key=lambda x: x.get('currentStateTime'), reverse=True)

        return jobs

    except Exception as e:
        logging.error(f"An error occurred: {e}")
        return []

# Example usage
project_id = 'your-project-id'
region = 'your-region'
job_name = 'your-job-name'

jobs = list_dataflow_jobs(project_id, region, job_name)

for job in jobs:
    print(job)

 

Hi @ramkrishnamI,

In addition to @ms4446 , you can also refer to the following documentation:

I hope the above information is helpful.