Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

File transfer from AWS S3 to GCS

Hi @ms4446 

I am doing a file transfer from AWS S3 to GCS for which I have a DAG running through CloudComposer. I in-fact have a 2 step process:

1) File transfer from GCS to S3

2) File transfer from S3 to GCS

The GCS to S3 works well.

I have the following snippet to transfer from S3 to GCS.

 

def transfer_file_from_s3_to_gcs(
    gcs_bucket: str,
    gcs_folder: str,
    s3_bucket: str,
    aws_role_arn: str,
    s3_folder: str,
) -> None:
    """Transfer a file from S3 to GCS.

    Args:
        gcs_bucket (str): GCS Destination bucket.
        gcs_folder (str): GCS Folder in the bucket.
        s3_bucket (str): Source S3 bucket.
        aws_role_arn (str): AWS role to use.
        s3_folder (str): AWS Folder in the bucket.
    """
    gcp_id_token = gcp_idtoken_from_metadata_server("https://s3.amazonaws.com")
    aws_credentials = aws_credentials_from_id_token(gcp_id_token, aws_role_arn)

    # Set up gcs storage client
    gcs_client = storage.Client()

    # Set up AWS S3 client
    s3_client = boto3.client(
        "s3",
        aws_access_key_id=aws_credentials["AccessKeyId"],
        aws_secret_access_key=aws_credentials["SecretAccessKey"],
        aws_session_token=aws_credentials["SessionToken"],
    )

    # Get the current date in UTC
    today = datetime.now(timezone.utc).date()

    # List objects in the specified folder
    response = s3_client.list_objects_v2(Bucket=s3_bucket, prefix=s3_folder)

    # Filter objects that were uploaded today
    objs_today = [
        obj
        for obj in response.get("Contents", [])
        if obj["LastModified"].date() == today
    ]

    # If no files were uploaded today, retun None
    if not objs_today:
        logging.info("No files were uploaded to the s3 bucket today")
        return

    # Find the latest 2 files uploaded today
    latest_files = sorted(
        objs_today, key=lambda obj: obj["LastModified"], reverse=True
    )[:2]

    if latest_files:
        for latest_file in latest_files:
            # Download file from s3 directly to in memory file
            file_stream = io.BytesIO()
            s3_client.download_file(s3_bucket, latest_file["Key"], file_stream)
            file_stream.seek(0)

            # Upload file to GCP
            gcp_blob_name = os.path.join(
                gcs_folder, os.path.basename(latest_file["Key"])
            )
            gcp_blob = gcs_client.bucket(gcs_bucket).blob(gcp_blob_name)
            gcp_blob.upload_from_filename(file_stream, rewind=True)

            logging.info(
                "Sucessfully transfered '%s' from '%s' in S3 to '%s' in GCS",
                latest_file["Key"],
                s3_bucket,
                gcs_bucket,
            )

 

 

I get the following error during when I try to list objects. Can you help?

This is the error:

 

ClientError('An error occurred (AccessDenied) when calling the ListObjectsV2 operation: Access Denied')

 

It happens during the execution of the line:

 

response = s3_client.list_objects_v2(Bucket=s3_bucket, Prefix=s3_folder)

 

Am I missing a permission? IS there any simpler way of retrieving the file than the above aporach?

3 2 912
2 REPLIES 2