import re
from datetime import datetime
# airflow imports
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from airflow.operators.python import get_current_context
# google-cloud library imports
from google.cloud import storage, bigquery
DATA_PROJECT = '' # Config, Audits
PROC_PROJECT = '' # Storage, Composer, Files will always be present in this project
# initialize the storage and bigquery clients
storage_client = storage.Client(project=PROC_PROJECT)
bigquery_client = bigquery.Client(project=DATA_PROJECT)
# Initialize static variables
REGION = 'europe-west2'
# Initialize the config dataset and tables
CONFIG_PROJECT = ''
CONFIG_DATASET = 'config_dataset'
CONFIG_TABLE = 'utility_delta_feed_master'
DPN_AUDIT_PROJECT = ''
DPN_AUDIT_DATASET = "audits"
DPN_AUDIT_TABLE = "utility_stg_to_dpn_ingestion_log"
PROCEDURE_PROJECT_ID = ""
PROCEDURE_DATASET_ID = "delta_stored_procedure"
default_args = {
'retries': 0,
'start_date': days_ago(1),
}
def delete_files_from_processing(bucket_name, destination_blob_name):
'''Delete all pre-processed files from processing path'''
global storage_client
source_bucket = storage_client.bucket(bucket_name)
processing_blob = source_bucket.blob(destination_blob_name)
processing_blob.delete()
print(f'Blob {processing_blob.name} deleted successfully')
def move_to_archival(bucket_name, source_file, processed_file_path, rejected_file_path, status):
'''Move the processed source file to archival path (processed/rejected)'''
global storage_client
dest_blob = source_file.split('/')[-1]
if status:
destination_blob_name = f"{'/'.join(processed_file_path.split('/')[3:])}{dest_blob}"
else:
destination_blob_name = f"{'/'.join(rejected_file_path.split('/')[3:])}{dest_blob}"
source_bucket = storage_client.bucket(bucket_name)
source_blob = source_bucket.blob(source_file)
destination_bucket = storage_client.bucket(bucket_name)
blob_copy = source_bucket.copy_blob(
source_blob, destination_bucket, destination_blob_name
)
source_bucket.delete_blob(source_file)
try:
archived_file = destination_bucket.blob(destination_blob_name)
if archived_file and status:
print(f'File successfully moved to processed file archival path.')
elif archived_file and not status:
print(f'File successfully moved to rejected file archival path.')
except Exception as e:
print(f'Error encountered while moving the file to archival location. Refer to error logs for additional information.')
print(f"Moving File to Archival Error", e)
def insert_ingestion_log(file_id, file_name, dpn_dataset, dpn_table, procedure_name, record_count, load_timestamp, status):
rows_to_insert = [
{
"file_id": file_id,
"file_name": file_name,
"dpn_dataset": dpn_dataset,
"dpn_table": dpn_table,
"procedure_name": procedure_name,
"record_count": record_count,
"load_timestamp": str(load_timestamp),
"status": status
}
]
client = bigquery.Client(DATA_PROJECT)
table_ref = client.dataset(DPN_AUDIT_DATASET).table(DPN_AUDIT_TABLE)
errors = client.insert_rows_json(table_ref, rows_to_insert)
if errors:
print(errors)
else:
print(f"load entry added for {procedure_name}")
def get_max_file_id():
result = []
query = f"""SELECT ifnull(max(file_id), 0) as file_id FROM `{DPN_AUDIT_PROJECT}.{DPN_AUDIT_DATASET}.{DPN_AUDIT_TABLE}`"""
dpn_load_audit_bq_client = bigquery.Client(project=DPN_AUDIT_PROJECT)
query_job = dpn_load_audit_bq_client.query(query)
results = query_job.result()
for row in results:
result.append(row.file_id)
if len(result):
return result[0]
return 0
def get_column_count(project_id, dataset_id, table_id):
client = bigquery.Client(project=project_id)
table_ref = client.dataset(dataset_id).table(table_id)
table = client.get_table(table_ref)
# Get the column count
column_count = len(table.schema)
return column_count
def extract_key(file_details):
# Prerequisite: This function will only work given the files in one feed follow same naming convention
# Else it will result in error message as follows - TypeError: '<' not supported between instances of 'int' and 'list'
pattern = f"(?:DNH|MNH|MH|DH)_(.*)"
file_name = file_details[3]
match = re.search(pattern, file_name)
if match:
matched_string = match.group(1)
start_end_timestamp_list = matched_string.split('.')[0].split('_')
if len(start_end_timestamp_list) == 1:
file_name_timestamp = int(start_end_timestamp_list[0])
return file_name_timestamp
elif len(start_end_timestamp_list) == 3:
try:
matched_string_list = list(map(int, start_end_timestamp_list))
return [matched_string_list[2], matched_string_list[0], matched_string_list[1]]
except Exception as e:
file_name_timestamp = int(start_end_timestamp_list[2])
return file_name_timestamp
else:
raise Exception('''Error while extracting key for sorting files. Check if the file format is as expected.
Check if the file format follows one of the following format'
1. BPP_GNS_BPA_IF_v1_DNH_20240815013001.dat
2. BPP_GSPS_BAC-BACMAIN-BEF_v2_MNH_Seq_Comp_20240824120009.dat
3. BPP_PDK_PUR-PUMAIN-NPIV_v12_DNH_1_1_20240808172349.dat
If not, identify the new format which has arrived and check with the developer''')
def filter_files(file_details):
# BPP_GNS_BPA_IF_v1_DNH_20240815013001.dat -> (20240815013001)
# BPP_GSPS_BAC-BACMAIN-BEF_v2_MNH_Seq_Comp_20240824120009.dat -> ('Seq', 'Comp', 20240824120009)
# BPP_PDK_PUR-PUMAIN-NPIV_v12_DNH_1_1_20240808172349.dat -> (1, 1, 20240808172349)
pattern = f"(?:DNH|MNH|MH|DH)_(.*)"
delta_start_date = file_details[10]
match = re.search(pattern, file_details[3])
if match:
matched_string = match.group(1)
start_end_timestamp_list = matched_string.split('.')[0].split('_')
if len(start_end_timestamp_list) == 1:
file_name_timestamp = int(start_end_timestamp_list[0])
elif len(start_end_timestamp_list) == 3:
file_name_timestamp = int(start_end_timestamp_list[2])
if delta_start_date:
if file_name_timestamp >= delta_start_date:
return True
return False
return True
def sort_files(sp_files_map):
return sorted(sp_files_map, key=extract_key)
# dag instantiation
@dag(
default_args= default_args,
schedule_interval='*/30 * * * *',
catchup=False,
max_active_runs=1,
)
def data_ingestion_new_approach_sequentially_v3():
"""
This DAG performs below operations:
1. Scan for all available files in GCS bucket
2. Move the files from landing/ bucket to processing/ path.
3. For all these files, trigger the stored procedures, and store the status
in table (file_id, file_name, procedure_name, status), depending on the status,
move the files from processing/ to processed/ or rejected/
"""
@task
def start():
return 'Start -> Scan for available files -> Move the files to processing -> Group files based on SP and sort the files'
@task
def get_available_files() -> list:
"""
Read the config table, and scan for all available files
"""
delta_metadata_master = list()
# Fetch information for all the active feeds from metadata table
metadata_query = f'''
SELECT src_application_id, src_file_id, src_file_path, file_mask,
file_extension, file_delimiter, trim_header, trim_trailer, staging_project_id, staging_dataset, staging_table, dpn_dataset, dpn_table, stored_procedure_name, processing_file_path, processed_file_path, rejected_file_path, delta_start_date
FROM {CONFIG_PROJECT}.{CONFIG_DATASET}.{CONFIG_TABLE}
WHERE feed_active = 1;
'''
config_bq_client = bigquery.Client(project = CONFIG_PROJECT)
metadata_rows = config_bq_client.query_and_wait(query=metadata_query)
for row in metadata_rows:
delta_metadata_master.append((row.src_application_id, row.src_file_id, row.src_file_path,
row.file_mask, row.file_extension, row.file_delimiter, row.trim_header, row.trim_trailer, row.staging_project_id, row.staging_dataset, row.staging_table, row.dpn_dataset, row.dpn_table, row.stored_procedure_name,
row.processing_file_path, row.processed_file_path, row.rejected_file_path, row.delta_start_date))
# Files which are available under each feed's landing path
available_files = list()
# For each file in metadata_master construct the regular expression and check for files in landing which match this pattern
for metadata in delta_metadata_master:
src_application_id, src_file_id, src_file_path, file_mask, file_extension, file_delimiter, trim_header, trim_trailer, staging_project_id, staging_dataset, staging_table, dpn_dataset, dpn_table, stored_procedure_name, processing_file_path, processed_file_path, rejected_file_path, delta_start_date = metadata
bucket_name = src_file_path.split('/')[2]
prefix = None
if src_file_path.split('/')[3]:
prefix = '/'.join(src_file_path.split('/')[3:])
# Fetch all the blobs at src_file_path - ex. landing/
blobs = storage_client.list_blobs(bucket_name, prefix=prefix, delimiter='/')
# Construct regex pattern to check if the file_mask is part of any of the files available in landing bucket
# This will only scan for files which contain file_mask irrespective of file extension
# TODO: Update the pattern based on file naming standards
pattern = re.compile(f'^.*{file_mask.lower()}.*$')
for blob in blobs:
file_name = blob.name.split('/')[-1]
if pattern.match(file_name.lower()):
if blob.size != 0:
available_files.append(
(src_application_id, src_file_id, src_file_path + file_name, file_name, file_extension, file_delimiter, trim_header, trim_trailer, staging_project_id, staging_dataset, staging_table, dpn_dataset, dpn_table, stored_procedure_name, processing_file_path, processed_file_path, rejected_file_path, delta_start_date))
else:
print(f'Found 0 byte file {file_name}')
move_to_archival(bucket_name, blob.name, processed_file_path, rejected_file_path, False)
else:
continue
return available_files
@task
def move_files_to_processing(available_files) -> list:
"""
Move the available files from landing/ to processing/
"""
import pandas as pd
global storage_client
copied_files = list()
max_file_id = get_max_file_id()
for metadata in available_files:
max_file_id = max_file_id + 1
file_id = max_file_id
src_application_id, src_file_id, gcs_uri, file_name, file_extension, file_delimiter, trim_header, trim_trailer, staging_project_id, staging_dataset, staging_table, dpn_dataset, dpn_table, stored_procedure_name, processing_file_path, processed_file_path, rejected_file_path, delta_start_date = metadata
column_count = get_column_count(staging_project_id, staging_dataset, staging_table)
data_df = pd.read_csv(filepath_or_buffer=gcs_uri, header=None, skiprows=trim_header, sep = f'{file_delimiter}', names = [f'col_{i}' for i in range(column_count)])
row_count = data_df.shape[0]
filtered_df = data_df.iloc[0:row_count - trim_trailer].reset_index(drop=True)
bucket_name = gcs_uri.split('/')[2]
blob_name = '/'.join(gcs_uri.split('/')[3:])
destination_bucket_name = bucket_name
destination_blob_name = '/'.join(processing_file_path.split('/')[3:]) + f'{staging_table}/' + f'pre_processed_{file_name}'
source_bucket = storage_client.bucket(bucket_name)
source_blob = source_bucket.blob(blob_name)
destination_bucket = storage_client.bucket(destination_bucket_name)
print('bucket_name:', bucket_name)
print('blob_name:', blob_name)
print('destination_bucket_name:', destination_bucket_name)
print('destination_blob_name:', destination_blob_name)
print('source_bucket:', source_bucket)
print('source_blob:', source_blob)
print('destination_bucket:', destination_bucket)
try:
# Writing to processing_files/ path
destination_bucket.blob(destination_blob_name).upload_from_string(filtered_df.to_csv(sep='|', index=False, header=False))
copied_files.append((file_id, dpn_dataset, dpn_table, file_name, stored_procedure_name, bucket_name, blob_name, destination_blob_name, processed_file_path, rejected_file_path, delta_start_date))
except Exception as e:
print(f'Error while copying {source_blob.name}', e)
raise
return copied_files
@task
def group_and_sort_feed_wise_files(processing_files):
sp_files_map = dict()
# group the files based on the stored procedure name
for file_details in processing_files:
if file_details[4] in sp_files_map:
# if present in map, append the file name to the list
sp_files_map[file_details[4]].append(file_details)
else:
# if not present create an empty list
sp_files_map[file_details[4]] = list()
sp_files_map[file_details[4]].append(file_details)
# sort the files mapped against each stored procedures
for sp in sp_files_map:
sorted_files = sort_files(sp_files_map[sp])
sp_files_map[sp] = sorted_files
print('Stored_Procedure: Files map after sorting:', sp_files_map)
return sp_files_map
@task
def filter_based_on_delta_start_date(sp_files_map):
for sp in sp_files_map:
files_details_list = sp_files_map[sp] # list of tuples
filtered_files = filter(filter_files, files_details_list)
filtered_files_list = [file for file in filtered_files]
sp_files_map[sp] = filtered_files_list
return sp_files_map
# @task
# def write_map_to_json_file(sp_files_map):
# import json
# global storage_client
# temp_bucket = ''
# bucket = storage_client.get_bucket(temp_bucket)
# json_data = json.dumps(sp_files_map)
# blob = bucket.blob('data_ingestion_temp/sp_files_map.json')
# blob.upload_from_string(json_data, content_type='application/json')
# print(f'sp_files_map uploaded successfully as a json to GCS location {blob.name}')
# return blob.name
@task
def end():
return 'End'
def execute_procedure(procedure_name, file_name, file_id, dpn_dataset, dpn_table, bucket_name, source_blob_name, destination_blob_name, processed_file_path, rejected_file_path):
procedure_bq_client = bigquery.Client(project=PROCEDURE_PROJECT_ID)
load_timestamp = datetime.now()
record_count = 0
try:
print(f"running procedure {PROCEDURE_PROJECT_ID}.{PROCEDURE_DATASET_ID}.{procedure_name}")
sql_query = f"CALL `{PROCEDURE_PROJECT_ID}.{PROCEDURE_DATASET_ID}.{procedure_name}`('{file_name}', {file_id});"
print('SQL Query:', sql_query)
query_job = procedure_bq_client.query(sql_query)
results = query_job.result()
if query_job.errors:
move_to_archival(bucket_name=bucket_name, source_file=source_blob_name, processed_file_path=processed_file_path, rejected_file_path=rejected_file_path, status=False)
delete_files_from_processing(bucket_name, destination_blob_name)
insert_ingestion_log(file_id, file_name, dpn_dataset, dpn_table, procedure_name, record_count, load_timestamp, "FAILED")
raise RuntimeError(f"query execution failed:",query_job.errors)
for row in results:
print('Row:', row)
record_count = row['record_count']
break
move_to_archival(bucket_name=bucket_name, source_file=source_blob_name, processed_file_path=processed_file_path, rejected_file_path=rejected_file_path, status=True)
delete_files_from_processing(bucket_name, destination_blob_name)
insert_ingestion_log(file_id, file_name, dpn_dataset, dpn_table, procedure_name, record_count, load_timestamp, "SUCCESS")
print("procedure ran successfully...")
except Exception as e:
print(f'Exception while executing stored procedure {procedure_name}', e)
move_to_archival(bucket_name=bucket_name, source_file=source_blob_name, processed_file_path=processed_file_path, rejected_file_path=rejected_file_path, status=False)
delete_files_from_processing(bucket_name, destination_blob_name)
insert_ingestion_log(file_id, file_name, dpn_dataset, dpn_table, procedure_name, record_count, load_timestamp, "FAILED")
raise RuntimeError(f"procedure execution failed due to following reason..{e}")
# @task(map_index_template="{{ mapped_task_id }}")
@task
def run_procedures_sequentially(sp_files_tuple : list):
# global storage_client
# import json
# temp_bucket = ''
# bucket = storage_client.get_bucket(temp_bucket)
# blob = bucket.blob(blob_name)
# sp_files_map = json.loads(blob.download_as_string())
# context = get_current_context()
# context["mapped_task_id"] = sp_files_tuple[0]
count = 1
for file_details in sp_files_tuple[1]:
print(f'###### Iteration {count} for {sp_files_tuple[0]}: Running procedure {sp_files_tuple[0]} for file {file_details[3]} ######')
file_id, dpn_dataset, dpn_table, file_name, procedure_name, bucket_name, source_blob_name, destination_blob_name, processed_file_path, rejected_file_path, delta_start_date = file_details
execute_procedure(procedure_name, file_name, file_id, dpn_dataset, dpn_table, bucket_name, source_blob_name, destination_blob_name, processed_file_path, rejected_file_path)
count += 1
available_files_op = get_available_files()
processing_files_op = move_files_to_processing(available_files=available_files_op)
sp_files_map = group_and_sort_feed_wise_files(processing_files_op)
filtered_sp_files_map = filter_based_on_delta_start_date(sp_files_map=sp_files_map)
# blob_name_op = write_map_to_json_file(sp_files_map=filtered_sp_files_map)
run_procedures_sequentially_op = run_procedures_sequentially.expand(sp_files_tuple = filtered_sp_files_map)
# run_procedures_operator = PythonOperator(
# task_id = 'run_procedures',
# python_callable = run_procedures_sequentially,
# op_kwargs = {"blob_name": blob_name_op}
# )
# start() >> available_files_op >> processing_files_op >> sp_files_map >> filtered_sp_files_map >> blob_name_op >> run_procedures_operator >> end()
start() >> available_files_op >> processing_files_op >> sp_files_map >> filtered_sp_files_map >> run_procedures_sequentially_op >> end()
# dag invocation
data_ingestion_new_approach_sequentially_v3()
Above code performs below tasks:
1. Scan the files from GCS buckets based on a BigQuery table.
2. As part of move_files_to_processing step, I am removing the header, trailer and copying the files to processing/ path in the same GCS bucket
3. Sorting and filtering the files based on stored procedures.
4. All the files mapped to a stored procedure are then executed sequentially.
The issue I am facing here is, the move_files_to_processing stage is getting throttled even though the file size is in Kb
Can you please guide me to resolve this issue
Hi @patilb1997,
Welcome to Google Cloud Community!
Throttling might be a result of various reasons that you are facing in Cloud Composer. Below are the most likely causes:
Possible solution:
Additional Tips:
Utilize Monitoring and Logging tools for a more detailed understanding of why throttling occurs. In case you need further information, please refer to the following documentation:
I hope the above information is helpful.