What is the python API equivalent of the CLI command to list the jobs
gcloud dataflow jobs list --filter="$job_name" --region="${region}" --format=json --created-after=-p1d --sort-by="~stateTime"
The equivalent Python API code to list the Dataflow jobs with similar filtering options as the gcloud dataflow jobs list command would look something like this:
from googleapiclient.discovery import build
from datetime import datetime, timedelta
import logging
def list_dataflow_jobs(project_id, region, job_name=None, job_filter='ACTIVE'):
"""
Lists Dataflow jobs matching the given criteria.
"""
# Initialize the Dataflow client
dataflow = build('dataflow', 'v1b3')
# Calculate the creation time filter (1 day ago)
created_after = (datetime.utcnow() - timedelta(days=1)).isoformat() + 'Z'
try:
request = dataflow.projects().locations().jobs().list(
projectId=project_id,
location=region,
filter=job_filter # Filter for job states (e.g., ACTIVE, TERMINATED)
)
jobs = []
while request is not None:
response = request.execute()
# Filter jobs by name (if provided) and created time
filtered_jobs = [
job for job in response.get('jobs', [])
if (job_name is None or job['name'] == job_name) and job['createTime'] >= created_after
]
jobs.extend(filtered_jobs)
# Handle pagination if necessary
request = dataflow.projects().locations().jobs().list_next(previous_request=request, previous_response=response)
# Sort by currentStateTime (if available) in descending order
jobs.sort(key=lambda x: x.get('currentStateTime'), reverse=True)
return jobs
except Exception as e:
logging.error(f"An error occurred: {e}")
return []
# Example usage
project_id = 'your-project-id'
region = 'your-region'
job_name = 'your-job-name'
jobs = list_dataflow_jobs(project_id, region, job_name)
for job in jobs:
print(job)
Hi @ramkrishnamI,
In addition to @ms4446 , you can also refer to the following documentation:
I hope the above information is helpful.