Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Cloud Composer - Task is throttled

 

 

 

 

import re
from datetime import datetime

# airflow imports
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from airflow.operators.python import get_current_context

# google-cloud library imports
from google.cloud import storage, bigquery

DATA_PROJECT = '' # Config, Audits
PROC_PROJECT = '' # Storage, Composer, Files will always be present in this project

# initialize the storage and bigquery clients
storage_client = storage.Client(project=PROC_PROJECT)
bigquery_client = bigquery.Client(project=DATA_PROJECT)

# Initialize static variables
REGION = 'europe-west2'

# Initialize the config dataset and tables
CONFIG_PROJECT = ''
CONFIG_DATASET = 'config_dataset'
CONFIG_TABLE = 'utility_delta_feed_master'

DPN_AUDIT_PROJECT = ''
DPN_AUDIT_DATASET = "audits"
DPN_AUDIT_TABLE = "utility_stg_to_dpn_ingestion_log"

PROCEDURE_PROJECT_ID = ""
PROCEDURE_DATASET_ID = "delta_stored_procedure" 

default_args = {
    'retries': 0,
    'start_date': days_ago(1),
}

def delete_files_from_processing(bucket_name, destination_blob_name):
    '''Delete all pre-processed files from processing path'''
    global storage_client
    source_bucket = storage_client.bucket(bucket_name)
    processing_blob = source_bucket.blob(destination_blob_name)
    processing_blob.delete()
    print(f'Blob {processing_blob.name} deleted successfully')


def move_to_archival(bucket_name, source_file, processed_file_path, rejected_file_path, status):
    '''Move the processed source file to archival path (processed/rejected)'''
    global storage_client
    dest_blob = source_file.split('/')[-1]

    if status:
        destination_blob_name = f"{'/'.join(processed_file_path.split('/')[3:])}{dest_blob}"
    else:
        destination_blob_name = f"{'/'.join(rejected_file_path.split('/')[3:])}{dest_blob}"

    source_bucket = storage_client.bucket(bucket_name)
    source_blob = source_bucket.blob(source_file)
    destination_bucket = storage_client.bucket(bucket_name)

    blob_copy = source_bucket.copy_blob(
        source_blob, destination_bucket, destination_blob_name
    )

    source_bucket.delete_blob(source_file)

    try:
        archived_file = destination_bucket.blob(destination_blob_name)
        if archived_file and status:
            print(f'File successfully moved to processed file archival path.')
        elif archived_file and not status:
            print(f'File successfully moved to rejected file archival path.')
    except Exception as e:
        print(f'Error encountered while moving the file to archival location. Refer to error logs for additional information.')
        print(f"Moving File to Archival Error", e)


def insert_ingestion_log(file_id, file_name, dpn_dataset, dpn_table, procedure_name, record_count, load_timestamp, status):
    rows_to_insert = [
    { 
        "file_id":  file_id,
        "file_name": file_name,
        "dpn_dataset": dpn_dataset,
        "dpn_table": dpn_table,
        "procedure_name": procedure_name,
        "record_count": record_count,
        "load_timestamp": str(load_timestamp),
        "status": status
    }
    ]
    client = bigquery.Client(DATA_PROJECT)
    table_ref = client.dataset(DPN_AUDIT_DATASET).table(DPN_AUDIT_TABLE)
    errors = client.insert_rows_json(table_ref, rows_to_insert)
    if errors:
        print(errors)
    else:
        print(f"load entry added for {procedure_name}")


def get_max_file_id():
    result = []
    query = f"""SELECT ifnull(max(file_id), 0) as file_id FROM `{DPN_AUDIT_PROJECT}.{DPN_AUDIT_DATASET}.{DPN_AUDIT_TABLE}`"""
    dpn_load_audit_bq_client = bigquery.Client(project=DPN_AUDIT_PROJECT)
    query_job = dpn_load_audit_bq_client.query(query)
    results = query_job.result()
    for row in results:
        result.append(row.file_id)

    if len(result):
        return result[0]
    return 0 


def get_column_count(project_id, dataset_id, table_id):   
    client = bigquery.Client(project=project_id)          
    table_ref = client.dataset(dataset_id).table(table_id)        
    table = client.get_table(table_ref)
    
    # Get the column count
    column_count = len(table.schema)
    
    return column_count


def extract_key(file_details):
    # Prerequisite: This function will only work given the files in one feed follow same naming convention
    # Else it will result in error message as follows - TypeError: '<' not supported between instances of 'int' and 'list'
    pattern = f"(?:DNH|MNH|MH|DH)_(.*)"
    file_name = file_details[3]
    match = re.search(pattern, file_name)
    if match:
        matched_string = match.group(1)
        start_end_timestamp_list = matched_string.split('.')[0].split('_')
        if len(start_end_timestamp_list) == 1:
            file_name_timestamp = int(start_end_timestamp_list[0])
            return file_name_timestamp
        elif len(start_end_timestamp_list) == 3:
            try:
                matched_string_list = list(map(int, start_end_timestamp_list))
                return [matched_string_list[2], matched_string_list[0], matched_string_list[1]]
            except Exception as e:
                file_name_timestamp = int(start_end_timestamp_list[2])
                return file_name_timestamp
    else:
        raise Exception('''Error while extracting key for sorting files. Check if the file format is as expected. 
                        Check if the file format follows one of the following format'
                            1. BPP_GNS_BPA_IF_v1_DNH_20240815013001.dat
                            2. BPP_GSPS_BAC-BACMAIN-BEF_v2_MNH_Seq_Comp_20240824120009.dat
                            3. BPP_PDK_PUR-PUMAIN-NPIV_v12_DNH_1_1_20240808172349.dat 
                        If not, identify the new format which has arrived and check with the developer''')


def filter_files(file_details):
    # BPP_GNS_BPA_IF_v1_DNH_20240815013001.dat -> (20240815013001)
    # BPP_GSPS_BAC-BACMAIN-BEF_v2_MNH_Seq_Comp_20240824120009.dat -> ('Seq', 'Comp', 20240824120009)
    # BPP_PDK_PUR-PUMAIN-NPIV_v12_DNH_1_1_20240808172349.dat -> (1, 1, 20240808172349)
    pattern = f"(?:DNH|MNH|MH|DH)_(.*)"
    delta_start_date = file_details[10]
    match = re.search(pattern, file_details[3])

    if match:
        matched_string = match.group(1)
        start_end_timestamp_list = matched_string.split('.')[0].split('_')
        
        if len(start_end_timestamp_list) == 1:
            file_name_timestamp = int(start_end_timestamp_list[0])
        elif len(start_end_timestamp_list) == 3:
            file_name_timestamp = int(start_end_timestamp_list[2])
    
    if delta_start_date:
        if file_name_timestamp >= delta_start_date:
            return True
        return False
    return True


def sort_files(sp_files_map):
    return sorted(sp_files_map, key=extract_key)


# dag instantiation
@dag(
        default_args= default_args,
        schedule_interval='*/30 * * * *',
        catchup=False,
        max_active_runs=1,
)
def data_ingestion_new_approach_sequentially_v3():
    """
    This DAG performs below operations:
    
    1. Scan for all available files in GCS bucket
    2. Move the files from landing/ bucket to processing/ path.
    3. For all these files, trigger the stored procedures, and store the status 
       in table (file_id, file_name, procedure_name, status), depending on the status, 
       move the files from processing/ to processed/ or rejected/ 
    """

    @task
    def start():
        return 'Start -> Scan for available files -> Move the files to processing -> Group files based on SP and sort the files'


    @task
    def get_available_files() -> list:
        """
        Read the config table, and scan for all available files
        """
        delta_metadata_master = list()
        
        # Fetch information for all the active feeds from metadata table
        metadata_query = f'''
            SELECT src_application_id, src_file_id, src_file_path, file_mask, 
                file_extension, file_delimiter, trim_header, trim_trailer, staging_project_id, staging_dataset, staging_table, dpn_dataset, dpn_table, stored_procedure_name, processing_file_path, processed_file_path, rejected_file_path, delta_start_date
            FROM {CONFIG_PROJECT}.{CONFIG_DATASET}.{CONFIG_TABLE} 
            WHERE feed_active = 1;
        '''
        
        config_bq_client = bigquery.Client(project = CONFIG_PROJECT)
        metadata_rows = config_bq_client.query_and_wait(query=metadata_query)
        
        for row in metadata_rows:
            delta_metadata_master.append((row.src_application_id, row.src_file_id,  row.src_file_path,
                                        row.file_mask, row.file_extension, row.file_delimiter, row.trim_header, row.trim_trailer, row.staging_project_id, row.staging_dataset, row.staging_table, row.dpn_dataset, row.dpn_table, row.stored_procedure_name, 
                                        row.processing_file_path, row.processed_file_path, row.rejected_file_path, row.delta_start_date))
        # Files which are available under each feed's landing path
        available_files = list()
        
        # For each file in metadata_master construct the regular expression and check for files in landing which match this pattern
        for metadata in delta_metadata_master:
            src_application_id, src_file_id, src_file_path, file_mask, file_extension, file_delimiter, trim_header, trim_trailer, staging_project_id, staging_dataset, staging_table, dpn_dataset, dpn_table, stored_procedure_name, processing_file_path, processed_file_path, rejected_file_path, delta_start_date = metadata
            bucket_name = src_file_path.split('/')[2]
            
            prefix = None
            
            if src_file_path.split('/')[3]:
                prefix = '/'.join(src_file_path.split('/')[3:])
            
            # Fetch all the blobs at src_file_path - ex. landing/
            blobs = storage_client.list_blobs(bucket_name, prefix=prefix, delimiter='/')
            
            # Construct regex pattern to check if the file_mask is part of any of the files available in landing bucket
            # This will only scan for files which contain file_mask irrespective of file extension
            # TODO: Update the pattern based on file naming standards
            pattern = re.compile(f'^.*{file_mask.lower()}.*$')
            for blob in blobs:
                file_name = blob.name.split('/')[-1]
                if pattern.match(file_name.lower()):
                    if blob.size != 0:
                        available_files.append(
                            (src_application_id, src_file_id, src_file_path + file_name, file_name, file_extension, file_delimiter, trim_header, trim_trailer, staging_project_id, staging_dataset, staging_table, dpn_dataset, dpn_table, stored_procedure_name, processing_file_path, processed_file_path, rejected_file_path, delta_start_date))
                    else:
                        print(f'Found 0 byte file {file_name}')
                        move_to_archival(bucket_name, blob.name, processed_file_path, rejected_file_path, False)
                else:
                    continue
        
        return available_files


    @task
    def move_files_to_processing(available_files) -> list:
        """
        Move the available files from landing/ to processing/
        """
        import pandas as pd

        global storage_client
        
        copied_files = list()
        
        max_file_id = get_max_file_id()
        
        for metadata in available_files:
            max_file_id = max_file_id + 1
            file_id = max_file_id
            
            src_application_id, src_file_id, gcs_uri, file_name, file_extension, file_delimiter, trim_header, trim_trailer, staging_project_id, staging_dataset, staging_table, dpn_dataset, dpn_table, stored_procedure_name, processing_file_path, processed_file_path, rejected_file_path, delta_start_date = metadata
            
            column_count = get_column_count(staging_project_id, staging_dataset, staging_table)
            
            data_df = pd.read_csv(filepath_or_buffer=gcs_uri, header=None, skiprows=trim_header, sep = f'{file_delimiter}', names = [f'col_{i}' for i in range(column_count)])

            row_count = data_df.shape[0]

            filtered_df = data_df.iloc[0:row_count - trim_trailer].reset_index(drop=True)
            
            bucket_name = gcs_uri.split('/')[2]
            blob_name = '/'.join(gcs_uri.split('/')[3:])
            destination_bucket_name = bucket_name
            destination_blob_name = '/'.join(processing_file_path.split('/')[3:]) + f'{staging_table}/' + f'pre_processed_{file_name}'
            source_bucket = storage_client.bucket(bucket_name)
            source_blob = source_bucket.blob(blob_name)
            destination_bucket = storage_client.bucket(destination_bucket_name)
            
            print('bucket_name:', bucket_name)
            print('blob_name:', blob_name)
            print('destination_bucket_name:', destination_bucket_name)
            print('destination_blob_name:', destination_blob_name)
            print('source_bucket:', source_bucket)
            print('source_blob:', source_blob)
            print('destination_bucket:', destination_bucket)
            
            try:
                # Writing to processing_files/ path
                destination_bucket.blob(destination_blob_name).upload_from_string(filtered_df.to_csv(sep='|', index=False, header=False))
                copied_files.append((file_id, dpn_dataset, dpn_table, file_name, stored_procedure_name, bucket_name, blob_name, destination_blob_name, processed_file_path, rejected_file_path, delta_start_date))
            except Exception as e:
                print(f'Error while copying {source_blob.name}', e)
                raise

        return copied_files


    @task
    def group_and_sort_feed_wise_files(processing_files):
        sp_files_map = dict()
        # group the files based on the stored procedure name
        for file_details in processing_files:
            if file_details[4] in sp_files_map:
                # if present in map, append the file name to the list
                sp_files_map[file_details[4]].append(file_details)
            else:
                # if not present create an empty list
                sp_files_map[file_details[4]] = list()
                sp_files_map[file_details[4]].append(file_details)
        
        # sort the files mapped against each stored procedures
        for sp in sp_files_map:
            sorted_files = sort_files(sp_files_map[sp])
            sp_files_map[sp] = sorted_files

        print('Stored_Procedure: Files map after sorting:', sp_files_map)

        return sp_files_map
    
    @task
    def filter_based_on_delta_start_date(sp_files_map):
        for sp in sp_files_map:
            files_details_list = sp_files_map[sp] # list of tuples
            filtered_files = filter(filter_files, files_details_list)
            filtered_files_list = [file for file in filtered_files]
            sp_files_map[sp] = filtered_files_list
 
        return sp_files_map


    # @task
    # def write_map_to_json_file(sp_files_map):
    #     import json
    #     global storage_client
    #     temp_bucket = ''
    #     bucket = storage_client.get_bucket(temp_bucket)
    #     json_data = json.dumps(sp_files_map)
    #     blob = bucket.blob('data_ingestion_temp/sp_files_map.json')
    #     blob.upload_from_string(json_data, content_type='application/json')
    #     print(f'sp_files_map uploaded successfully as a json to GCS location {blob.name}')
    #     return blob.name


    @task
    def end():
        return 'End'
    

    def execute_procedure(procedure_name, file_name, file_id, dpn_dataset, dpn_table, bucket_name, source_blob_name, destination_blob_name, processed_file_path, rejected_file_path):
        procedure_bq_client = bigquery.Client(project=PROCEDURE_PROJECT_ID)
        load_timestamp = datetime.now()
        record_count = 0
        try:
            print(f"running procedure {PROCEDURE_PROJECT_ID}.{PROCEDURE_DATASET_ID}.{procedure_name}")
            sql_query = f"CALL `{PROCEDURE_PROJECT_ID}.{PROCEDURE_DATASET_ID}.{procedure_name}`('{file_name}', {file_id});"
            print('SQL Query:', sql_query)

            query_job = procedure_bq_client.query(sql_query)
            results = query_job.result()
            if query_job.errors:
                move_to_archival(bucket_name=bucket_name, source_file=source_blob_name, processed_file_path=processed_file_path, rejected_file_path=rejected_file_path, status=False)
                delete_files_from_processing(bucket_name, destination_blob_name)
                insert_ingestion_log(file_id, file_name, dpn_dataset, dpn_table, procedure_name, record_count, load_timestamp, "FAILED")
                raise RuntimeError(f"query execution failed:",query_job.errors)
            
            for row in results:
                print('Row:', row)
                record_count = row['record_count']
                break
            
            move_to_archival(bucket_name=bucket_name, source_file=source_blob_name, processed_file_path=processed_file_path, rejected_file_path=rejected_file_path, status=True)
            delete_files_from_processing(bucket_name, destination_blob_name)
            insert_ingestion_log(file_id, file_name, dpn_dataset, dpn_table, procedure_name, record_count, load_timestamp, "SUCCESS")

            print("procedure ran successfully...")

        except Exception as e:
            print(f'Exception while executing stored procedure {procedure_name}', e)
            move_to_archival(bucket_name=bucket_name, source_file=source_blob_name, processed_file_path=processed_file_path, rejected_file_path=rejected_file_path, status=False)
            delete_files_from_processing(bucket_name, destination_blob_name)
            insert_ingestion_log(file_id, file_name, dpn_dataset, dpn_table, procedure_name, record_count, load_timestamp, "FAILED")
            raise RuntimeError(f"procedure execution failed due to following reason..{e}")

    # @task(map_index_template="{{ mapped_task_id }}")
    @task
    def run_procedures_sequentially(sp_files_tuple : list):
        # global storage_client
        # import json
        # temp_bucket = ''
        # bucket = storage_client.get_bucket(temp_bucket)
        # blob = bucket.blob(blob_name)
        # sp_files_map = json.loads(blob.download_as_string())
        # context = get_current_context()
        # context["mapped_task_id"] = sp_files_tuple[0]
        count = 1
        for file_details in sp_files_tuple[1]:
            print(f'###### Iteration {count} for {sp_files_tuple[0]}: Running procedure {sp_files_tuple[0]} for file {file_details[3]} ######')
            file_id, dpn_dataset, dpn_table, file_name, procedure_name, bucket_name, source_blob_name, destination_blob_name, processed_file_path, rejected_file_path, delta_start_date = file_details
            execute_procedure(procedure_name, file_name, file_id, dpn_dataset, dpn_table, bucket_name, source_blob_name, destination_blob_name, processed_file_path, rejected_file_path)
            count += 1


    available_files_op = get_available_files()
    processing_files_op = move_files_to_processing(available_files=available_files_op)
    sp_files_map = group_and_sort_feed_wise_files(processing_files_op)
    filtered_sp_files_map = filter_based_on_delta_start_date(sp_files_map=sp_files_map)
    # blob_name_op = write_map_to_json_file(sp_files_map=filtered_sp_files_map)
    run_procedures_sequentially_op = run_procedures_sequentially.expand(sp_files_tuple = filtered_sp_files_map)

    # run_procedures_operator = PythonOperator( 
    #     task_id = 'run_procedures',
    #     python_callable = run_procedures_sequentially,
    #     op_kwargs = {"blob_name": blob_name_op}
    # )


    # start() >> available_files_op >> processing_files_op >> sp_files_map >> filtered_sp_files_map >> blob_name_op >> run_procedures_operator >> end()
    start() >> available_files_op >> processing_files_op >> sp_files_map >> filtered_sp_files_map >> run_procedures_sequentially_op >> end()


# dag invocation
data_ingestion_new_approach_sequentially_v3()

 

 

 

 

Above code performs below tasks:

1. Scan the files from GCS buckets based on a BigQuery table.

2. As part of move_files_to_processing step, I am removing the header, trailer and copying the files to processing/ path in the same GCS bucket

3. Sorting and filtering the files based on stored procedures.

4. All the files mapped to a stored procedure are then executed sequentially.

The issue I am facing here is, the move_files_to_processing stage is getting throttled even though the file size is in Kb

Can you please guide me to resolve this issue

0 1 226
1 REPLY 1

Hi @patilb1997,

Welcome to Google Cloud Community!

Throttling might be a result of various reasons that you are facing in Cloud Composer. Below are the most likely causes:

  • API Rate Limits: If there is a massive number of API calls in a short span of time, then the rate limits in Google Cloud Storage (GCS) will be exceeded.
  • Your composer and storage relationship is influenced by the fact that your network bandwidth drives the speed of transferring between them.
  • Resource constraints: In case Cloud Composer is overloaded with many tasks or processes that are running in parallel, it might be the case that CPU, memory, or even the number of available worker slots are the resources that are being hit.

Possible solution: 

  • Consider batching files to reduce the number of API calls and to improve performance. 
  • Implement exponential backoff retry logic, this will give cloud storage more time to recover from temporary issues.
  • Optimize your network bandwidth to improve transfer speed.
  • Verify if your worker nodes have adequate CPU and memory resources.

Additional Tips:

Utilize Monitoring and Logging tools for a more detailed understanding of why throttling occurs. In case you need further information, please refer to the following documentation:

I hope the above information is helpful.