Hello Team,
I have one cloud function deployed. The main moto is to get a file as soon as it lands in the bucket convert in the csv and put in the same bucket in different folder. One main.py file is there which will be having the entry function and one that will call my transformation function.
I have 6 transformation functions for 6 different entities. All are running fine except one entity. For one entity I am getting repetative event generation from the same incoming file and it is producing output file every time.
I have tackeled the event generation, repetative event generation and all. But still facing the same issue.
The deployment is -
- gcloud run deploy $CLOUD_SERVICE_NAME --source . --function handle_gcs_event --base-image python312 --region $REGION --memory 1Gi --service-account=$SERVICE_ACCOUNT_EMAIL --no-allow-unauthenticated --ingress all
import functions_framework
import json
import os
import tempfile
from google.cloud import storage
from google.api_core.exceptions import TooManyRequests
from mycode import myfunction
from mycode1 import myfunction1
# GCS Folder Mapping
FOLDER_MAPPING = {
"Transformed/raw_incoming1": "Transformed/output1",
"Transformed/raw_incoming2": "Transformed/output2",
<other folder mapping there>
}
@functions_framework.cloud_event
def handle_gcs_event(cloud_event):
data = cloud_event.data
bucket_name = data["bucket"]
file_name = data["name"]
print(f"Event Triggered: {file_name} in {bucket_name}")
print(f"data is: {data}")
# Skip if already processed (look for metadata)
if file_name.endswith(".json") or file_name.endswith(".csv"):
if "transformed" in file_name:
print(f"Skipping already processed file: {file_name}")
return "Skipped already processed file", 200
output_folder = map_folder(file_name)
if not output_folder:
print(f"No matching folder for {file_name}")
return "No matching folder", 400
storage_client = storage.Client()
with tempfile.TemporaryDirectory() as temp_input_dir, tempfile.TemporaryDirectory() as temp_output_dir:
local_file_path = os.path.join(temp_input_dir, os.path.basename(file_name))
try:
download_from_gcs(storage_client, bucket_name, file_name, local_file_path)
except TooManyRequests:
print("Rate limit exceeded, retrying...")
return "Rate limit exceeded, try again later", 429
except Exception as e:
print(f"Error downloading file: {e}")
return f"Error: {e}", 500
# Route to correct transformer
if file_name.startswith("Transformed/raw_incoming"):
profile_trns(temp_input_dir, temp_output_dir, output_folder)
elif file_name.startswith("Transformed/raw_incoming"):
product_trns(temp_input_dir, temp_output_dir, output_folder)
elif file_name.startswith("Transformed/raw_incoming"):
appointment_trns(temp_input_dir, temp_output_dir, output_folder)
elif file_name.startswith("Transformed/raw_incoming"):
voucher_trns(temp_input_dir, temp_output_dir, output_folder)
elif file_name.startswith("Transformed/raw_incoming"):
store_trns(temp_input_dir, temp_output_dir, output_folder)
elif file_name.startswith("Transformed/raw_incoming"):
order_trns(temp_input_dir, temp_output_dir, output_folder)
else:
print(f"No valid transformer found for: {file_name}")
return "No transformer matched", 400
# Upload transformed files and tag as processed
for output_file in os.listdir(temp_output_dir):
local_output_path = os.path.join(temp_output_dir, output_file)
destination_path = os.path.join(output_folder, output_file)
upload_to_gcs(storage_client, bucket_name, local_output_path, destination_path)
print(f"Uploaded {output_file} to {destination_path}")
return "Success", 200
def map_folder(file_name):
folder_path = os.path.dirname(file_name)
return FOLDER_MAPPING.get(folder_path, None)
def download_from_gcs(storage_client, bucket_name, source_path, local_path):
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(source_path)
blob.download_to_filename(local_path)
print(f"Downloaded {source_path} to {local_path}")
def upload_to_gcs(storage_client, bucket_name, local_file, destination_path):
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_path)
blob.upload_from_filename(local_file)
blob.metadata = {"goog-reserved-file-type": "processed"}
blob.patch()
print(f"Uploaded {local_file} to {destination_path}")
the mycode.py is ----
Hi @ArnabBatabyal,
Welcome to Google Cloud Community!
From your setup, it looks like you’re hitting a classic Cloud Storage + Eventarc behavior. This isn’t a bug, it’s expected: Cloud Run Functions events are delivered at-least-once — meaning the same finalized event for a file can trigger your Cloud Run multiple times. If your transformation outputs the CSV back into the same bucket, that new file (or any metadata update) can also sometimes trigger another event, especially if the path naming isn’t carefully separated or if metadata like blob.patch()
triggers unintended updates. Google recommends making Cloud Run functions idempotent to handle this.
Was this helpful? If so, please accept this answer as “Solution”. If you need additional assistance, reply here within 2 business days and I’ll be happy to help.