Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

File transfer from AWS S3 to GCS

Hi @ms4446 

I am doing a file transfer from AWS S3 to GCS for which I have a DAG running through CloudComposer. I in-fact have a 2 step process:

1) File transfer from GCS to S3

2) File transfer from S3 to GCS

The GCS to S3 works well.

I have the following snippet to transfer from S3 to GCS.

 

def transfer_file_from_s3_to_gcs(
    gcs_bucket: str,
    gcs_folder: str,
    s3_bucket: str,
    aws_role_arn: str,
    s3_folder: str,
) -> None:
    """Transfer a file from S3 to GCS.

    Args:
        gcs_bucket (str): GCS Destination bucket.
        gcs_folder (str): GCS Folder in the bucket.
        s3_bucket (str): Source S3 bucket.
        aws_role_arn (str): AWS role to use.
        s3_folder (str): AWS Folder in the bucket.
    """
    gcp_id_token = gcp_idtoken_from_metadata_server("https://s3.amazonaws.com")
    aws_credentials = aws_credentials_from_id_token(gcp_id_token, aws_role_arn)

    # Set up gcs storage client
    gcs_client = storage.Client()

    # Set up AWS S3 client
    s3_client = boto3.client(
        "s3",
        aws_access_key_id=aws_credentials["AccessKeyId"],
        aws_secret_access_key=aws_credentials["SecretAccessKey"],
        aws_session_token=aws_credentials["SessionToken"],
    )

    # Get the current date in UTC
    today = datetime.now(timezone.utc).date()

    # List objects in the specified folder
    response = s3_client.list_objects_v2(Bucket=s3_bucket, prefix=s3_folder)

    # Filter objects that were uploaded today
    objs_today = [
        obj
        for obj in response.get("Contents", [])
        if obj["LastModified"].date() == today
    ]

    # If no files were uploaded today, retun None
    if not objs_today:
        logging.info("No files were uploaded to the s3 bucket today")
        return

    # Find the latest 2 files uploaded today
    latest_files = sorted(
        objs_today, key=lambda obj: obj["LastModified"], reverse=True
    )[:2]

    if latest_files:
        for latest_file in latest_files:
            # Download file from s3 directly to in memory file
            file_stream = io.BytesIO()
            s3_client.download_file(s3_bucket, latest_file["Key"], file_stream)
            file_stream.seek(0)

            # Upload file to GCP
            gcp_blob_name = os.path.join(
                gcs_folder, os.path.basename(latest_file["Key"])
            )
            gcp_blob = gcs_client.bucket(gcs_bucket).blob(gcp_blob_name)
            gcp_blob.upload_from_filename(file_stream, rewind=True)

            logging.info(
                "Sucessfully transfered '%s' from '%s' in S3 to '%s' in GCS",
                latest_file["Key"],
                s3_bucket,
                gcs_bucket,
            )

 

 

I get the following error during when I try to list objects. Can you help?

This is the error:

 

ClientError('An error occurred (AccessDenied) when calling the ListObjectsV2 operation: Access Denied')

 

It happens during the execution of the line:

 

response = s3_client.list_objects_v2(Bucket=s3_bucket, Prefix=s3_folder)

 

Am I missing a permission? IS there any simpler way of retrieving the file than the above aporach?

3 2 908
2 REPLIES 2

The "AccessDenied" error you're encountering typically means that the AWS role you're using within Cloud Composer doesn't have the necessary permissions to list objects in the S3 bucket. Here is how to fix the permissions issue:

IAM Role: Make sure the IAM role associated with your Cloud Composer environment has the following S3 permissions:

  • s3:ListBucket (to list objects within a bucket)
  • s3:GetObject (to download objects from the bucket)

Policy Attachment: Verify that the policy granting these permissions is correctly attached to the IAM role. You can do this in the AWS IAM console.

Role Trust Relationship: Double-check that the trust relationship of the IAM role allows Cloud Composer to assume the role.

Here are a few alternative methods that might simplify your workflow:

Cloud Composer S3ToGCSOperator: Cloud Composer provides a built-in operator specifically for transferring files from S3 to GCS. This could eliminate the need for custom code:

from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator

transfer_operator = S3ToGCSOperator(
    task_id='s3_to_gcs_task',
    bucket=s3_bucket,
    prefix=s3_folder,
    dest_gcs_bucket=gcs_bucket,
    dest_gcs_prefix=gcs_folder,
    aws_conn_id='aws_default',  # Your Airflow connection ID for AWS
    gcp_conn_id='google_cloud_default',  # Your Airflow connection ID for GCP
    dag=dag
)

If you're comfortable with command-line tools, you could use the gsutil command-line interface directly within a Cloud Composer task. This is especially convenient if you don't need complex filtering:

from airflow.operators.bash_operator import BashOperator

transfer_task = BashOperator(
    task_id='gsutil_transfer',
    bash_command=f'gsutil cp -r s3://{s3_bucket}/{s3_folder} gs://{gcs_bucket}/{gcs_folder}',
    dag=dag
)

When using S3ToGCSOperator or gsutil, make sure your Cloud Composer environment has the appropriate connections to your AWS and GCP accounts configured in Airflow.

If you prefer to keep your custom Python function, here's a slightly improved version:

 

# ... (your existing imports and gcp_idtoken_from_metadata_server, aws_credentials_from_id_token functions)

def transfer_file_from_s3_to_gcs(...):
    # ... (same as your existing code up to listing objects)
    
    try:
        response = s3_client.list_objects_v2(Bucket=s3_bucket, Prefix=s3_folder)
    except ClientError as e:
        if e.response['Error']['Code'] == 'AccessDenied':
            logging.error("Insufficient permissions to access S3 bucket.")
        else:
            logging.error(f"Error listing S3 objects: {e}")
        raise # Re-raise the exception after logging
    
    # ... (rest of your existing code)

 

When it comes to transferring from GCS to S3 or transfer from S3 to GCS, Tools like CloudFuze and Gs Richcopy 360 can do this job easily, directly, automatically, and with additional filters if you need .