Hi @ms4446
I am doing a file transfer from AWS S3 to GCS for which I have a DAG running through CloudComposer. I in-fact have a 2 step process:
1) File transfer from GCS to S3
2) File transfer from S3 to GCS
The GCS to S3 works well.
I have the following snippet to transfer from S3 to GCS.
def transfer_file_from_s3_to_gcs(
gcs_bucket: str,
gcs_folder: str,
s3_bucket: str,
aws_role_arn: str,
s3_folder: str,
) -> None:
"""Transfer a file from S3 to GCS.
Args:
gcs_bucket (str): GCS Destination bucket.
gcs_folder (str): GCS Folder in the bucket.
s3_bucket (str): Source S3 bucket.
aws_role_arn (str): AWS role to use.
s3_folder (str): AWS Folder in the bucket.
"""
gcp_id_token = gcp_idtoken_from_metadata_server("https://s3.amazonaws.com")
aws_credentials = aws_credentials_from_id_token(gcp_id_token, aws_role_arn)
# Set up gcs storage client
gcs_client = storage.Client()
# Set up AWS S3 client
s3_client = boto3.client(
"s3",
aws_access_key_id=aws_credentials["AccessKeyId"],
aws_secret_access_key=aws_credentials["SecretAccessKey"],
aws_session_token=aws_credentials["SessionToken"],
)
# Get the current date in UTC
today = datetime.now(timezone.utc).date()
# List objects in the specified folder
response = s3_client.list_objects_v2(Bucket=s3_bucket, prefix=s3_folder)
# Filter objects that were uploaded today
objs_today = [
obj
for obj in response.get("Contents", [])
if obj["LastModified"].date() == today
]
# If no files were uploaded today, retun None
if not objs_today:
logging.info("No files were uploaded to the s3 bucket today")
return
# Find the latest 2 files uploaded today
latest_files = sorted(
objs_today, key=lambda obj: obj["LastModified"], reverse=True
)[:2]
if latest_files:
for latest_file in latest_files:
# Download file from s3 directly to in memory file
file_stream = io.BytesIO()
s3_client.download_file(s3_bucket, latest_file["Key"], file_stream)
file_stream.seek(0)
# Upload file to GCP
gcp_blob_name = os.path.join(
gcs_folder, os.path.basename(latest_file["Key"])
)
gcp_blob = gcs_client.bucket(gcs_bucket).blob(gcp_blob_name)
gcp_blob.upload_from_filename(file_stream, rewind=True)
logging.info(
"Sucessfully transfered '%s' from '%s' in S3 to '%s' in GCS",
latest_file["Key"],
s3_bucket,
gcs_bucket,
)
I get the following error during when I try to list objects. Can you help?
This is the error:
ClientError('An error occurred (AccessDenied) when calling the ListObjectsV2 operation: Access Denied')
It happens during the execution of the line:
response = s3_client.list_objects_v2(Bucket=s3_bucket, Prefix=s3_folder)
Am I missing a permission? IS there any simpler way of retrieving the file than the above aporach?
The "AccessDenied" error you're encountering typically means that the AWS role you're using within Cloud Composer doesn't have the necessary permissions to list objects in the S3 bucket. Here is how to fix the permissions issue:
IAM Role: Make sure the IAM role associated with your Cloud Composer environment has the following S3 permissions:
s3:ListBucket
(to list objects within a bucket)s3:GetObject
(to download objects from the bucket)Policy Attachment: Verify that the policy granting these permissions is correctly attached to the IAM role. You can do this in the AWS IAM console.
Role Trust Relationship: Double-check that the trust relationship of the IAM role allows Cloud Composer to assume the role.
Here are a few alternative methods that might simplify your workflow:
Cloud Composer S3ToGCSOperator: Cloud Composer provides a built-in operator specifically for transferring files from S3 to GCS. This could eliminate the need for custom code:
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
transfer_operator = S3ToGCSOperator(
task_id='s3_to_gcs_task',
bucket=s3_bucket,
prefix=s3_folder,
dest_gcs_bucket=gcs_bucket,
dest_gcs_prefix=gcs_folder,
aws_conn_id='aws_default', # Your Airflow connection ID for AWS
gcp_conn_id='google_cloud_default', # Your Airflow connection ID for GCP
dag=dag
)
If you're comfortable with command-line tools, you could use the gsutil
command-line interface directly within a Cloud Composer task. This is especially convenient if you don't need complex filtering:
from airflow.operators.bash_operator import BashOperator
transfer_task = BashOperator(
task_id='gsutil_transfer',
bash_command=f'gsutil cp -r s3://{s3_bucket}/{s3_folder} gs://{gcs_bucket}/{gcs_folder}',
dag=dag
)
When using S3ToGCSOperator
or gsutil
, make sure your Cloud Composer environment has the appropriate connections to your AWS and GCP accounts configured in Airflow.
If you prefer to keep your custom Python function, here's a slightly improved version:
# ... (your existing imports and gcp_idtoken_from_metadata_server, aws_credentials_from_id_token functions)
def transfer_file_from_s3_to_gcs(...):
# ... (same as your existing code up to listing objects)
try:
response = s3_client.list_objects_v2(Bucket=s3_bucket, Prefix=s3_folder)
except ClientError as e:
if e.response['Error']['Code'] == 'AccessDenied':
logging.error("Insufficient permissions to access S3 bucket.")
else:
logging.error(f"Error listing S3 objects: {e}")
raise # Re-raise the exception after logging
# ... (rest of your existing code)
When it comes to transferring from GCS to S3 or transfer from S3 to GCS, Tools like CloudFuze and Gs Richcopy 360 can do this job easily, directly, automatically, and with additional filters if you need .