Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Cloud Run Functions with Eventrac

Hello Team,

I have one cloud function deployed. The main moto is to get a file as soon as it lands in the bucket convert in the csv and put in the same bucket in different folder. One main.py file is there which will be having the entry function and one that will call my transformation function. 

I have 6 transformation functions for 6 different entities. All are running fine except one entity. For one entity I am getting repetative event generation from the same incoming file and it is producing output file every time. 

I have tackeled the event generation, repetative event generation and all. But still facing the same issue. 

The deployment is - 

- gcloud run deploy $CLOUD_SERVICE_NAME --source . --function handle_gcs_event --base-image python312 --region $REGION --memory 1Gi --service-account=$SERVICE_ACCOUNT_EMAIL --no-allow-unauthenticated --ingress all

    - gcloud eventarc triggers create $EVENT_SERVICE_NAME --location=eu --destination-run-service=$CLOUD_SERVICE_NAME --destination-run-region=$REGION --event-filters="type=google.cloud.storage.object.v1.finalized" --event-filters="bucket=$BUCKET_NAME" --service-account=$SERVICE_ACCOUNT_EMAIL || echo "Eventarc is already setup....."
 
smaple main.py is - 

import functions_framework
import json
import os
import tempfile
from google.cloud import storage
from google.api_core.exceptions import TooManyRequests
from mycode import myfunction
from mycode1 import myfunction1

# GCS Folder Mapping
FOLDER_MAPPING = {
"Transformed/raw_incoming1": "Transformed/output1",
"Transformed/raw_incoming2": "Transformed/output2",
<other folder mapping there>
}

@functions_framework.cloud_event
def handle_gcs_event(cloud_event):

data = cloud_event.data
bucket_name = data["bucket"]
file_name = data["name"]

print(f"Event Triggered: {file_name} in {bucket_name}")
print(f"data is: {data}")
# Skip if already processed (look for metadata)
if file_name.endswith(".json") or file_name.endswith(".csv"):
if "transformed" in file_name:
print(f"Skipping already processed file: {file_name}")
return "Skipped already processed file", 200

output_folder = map_folder(file_name)
if not output_folder:
print(f"No matching folder for {file_name}")
return "No matching folder", 400

storage_client = storage.Client()

with tempfile.TemporaryDirectory() as temp_input_dir, tempfile.TemporaryDirectory() as temp_output_dir:
local_file_path = os.path.join(temp_input_dir, os.path.basename(file_name))

try:
download_from_gcs(storage_client, bucket_name, file_name, local_file_path)
except TooManyRequests:
print("Rate limit exceeded, retrying...")
return "Rate limit exceeded, try again later", 429
except Exception as e:
print(f"Error downloading file: {e}")
return f"Error: {e}", 500

# Route to correct transformer
if file_name.startswith("Transformed/raw_incoming"):
profile_trns(temp_input_dir, temp_output_dir, output_folder)
elif file_name.startswith("Transformed/raw_incoming"):
product_trns(temp_input_dir, temp_output_dir, output_folder)
elif file_name.startswith("Transformed/raw_incoming"):
appointment_trns(temp_input_dir, temp_output_dir, output_folder)
elif file_name.startswith("Transformed/raw_incoming"):
voucher_trns(temp_input_dir, temp_output_dir, output_folder)
elif file_name.startswith("Transformed/raw_incoming"):
store_trns(temp_input_dir, temp_output_dir, output_folder)
elif file_name.startswith("Transformed/raw_incoming"):
order_trns(temp_input_dir, temp_output_dir, output_folder)

else:
print(f"No valid transformer found for: {file_name}")
return "No transformer matched", 400

# Upload transformed files and tag as processed
for output_file in os.listdir(temp_output_dir):
local_output_path = os.path.join(temp_output_dir, output_file)
destination_path = os.path.join(output_folder, output_file)
upload_to_gcs(storage_client, bucket_name, local_output_path, destination_path)
print(f"Uploaded {output_file} to {destination_path}")

return "Success", 200

def map_folder(file_name):
folder_path = os.path.dirname(file_name)
return FOLDER_MAPPING.get(folder_path, None)

def download_from_gcs(storage_client, bucket_name, source_path, local_path):
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(source_path)
blob.download_to_filename(local_path)
print(f"Downloaded {source_path} to {local_path}")

def upload_to_gcs(storage_client, bucket_name, local_file, destination_path):
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_path)
blob.upload_from_filename(local_file)
blob.metadata = {"goog-reserved-file-type": "processed"}
blob.patch()
print(f"Uploaded {local_file} to {destination_path}")

the mycode.py is ---- 

import json
import csv
import os
from datetime import datetime, timezone

# Function to transform required value
def transform_data(value😞
    return [value] if value else [""]


# Header mapping (JSON keys -> CSV column names)
HEADER_MAPPING = {
 json keys : csv columns
    }


# Columns requiring transformation
TRANSFORM_COLUMNS = []

# Final CSV headers
CSV_HEADERS = list(HEADER_MAPPING.values()) + ["Source"]

# Function to convert JSON to CSV
def profile_trns(input_dir, output_dir, output_folder😞

    json_files = [f for f in os.listdir(input_dir) if f.endswith('.json')]

    if not json_files:
        print(f"No JSON files found in {input_dir}. Exiting...")
        return
    json_file = json_files[0]
    final_csv_name = os.path.splitext(json_file)[0] + ".csv"

    final_csv_path = os.path.join(output_dir, final_csv_name)

    with open(final_csv_path, mode='w', newline='', encoding='utf-8') as csv_file:
        writer = csv.writer(csv_file)
        writer.writerow(CSV_HEADERS)  # Write CSV header

        # Process each JSON file
        for json_file in json_files:
            process_json_file(input_dir, json_file, writer)

    print(f"CSV file created successfully: {final_csv_path}")

# Function to process a single JSON file
def process_json_file(input_dir, json_file, writer😞

   
    with open(os.path.join(input_dir, json_file), 'r', encoding='utf-8') as file:
        for line in file:
            line = line.strip()
            if not line:
                continue  # Skip empty lines

            try:
                data = json.loads(line)  # Load each JSON object separately
            except json.JSONDecodeError:
                print(f"Skipping malformed JSON in file {json_file}: {line}")
                continue  # Skip invalid JSON lines

            if isinstance(data, dict😞
                data = [data]

            for record in data:
                # Add extra column

                # Prepare CSV row
                row = []
                for key in HEADER_MAPPING.keys():
                    value = record.get(key, "")
                    if key in TRANSFORM_COLUMNS:
                        value = transform_data(value)
                    row.append(value)

                # Append extra fields and source filename
                row.append(json_file)

                # Write row to CSV
                writer.writerow(row)

    print(f"Processed {json_file}")
 
 
in logs I can see the json file event is repeating. and the csv files are getting more in count. 
 
 
 
0 1 54
1 REPLY 1

Hi @ArnabBatabyal,

Welcome to Google Cloud Community!

From your setup, it looks like you’re hitting a classic Cloud Storage + Eventarc behavior. This isn’t a bug, it’s expected: Cloud Run Functions events are delivered at-least-once — meaning the same finalized event for a file can trigger your Cloud Run multiple times. If your transformation outputs the CSV back into the same bucket, that new file (or any metadata update) can also sometimes trigger another event, especially if the path naming isn’t carefully separated or if metadata like blob.patch() triggers unintended updates. Google recommends making Cloud Run functions idempotent to handle this.

Was this helpful? If so, please accept this answer as “Solution”. If you need additional assistance, reply here within 2 business days and I’ll be happy to help.