Hi @ms4446
I am doing a file transfer from AWS S3 to GCS for which I have a DAG running through CloudComposer. I in-fact have a 2 step process:
1) File transfer from GCS to S3
2) File transfer from S3 to GCS
The GCS to S3 works well.
I have the following snippet to transfer from S3 to GCS.
def transfer_file_from_s3_to_gcs(
gcs_bucket: str,
gcs_folder: str,
s3_bucket: str,
aws_role_arn: str,
s3_folder: str,
) -> None:
"""Transfer a file from S3 to GCS.
Args:
gcs_bucket (str): GCS Destination bucket.
gcs_folder (str): GCS Folder in the bucket.
s3_bucket (str): Source S3 bucket.
aws_role_arn (str): AWS role to use.
s3_folder (str): AWS Folder in the bucket.
"""
gcp_id_token = gcp_idtoken_from_metadata_server("https://s3.amazonaws.com")
aws_credentials = aws_credentials_from_id_token(gcp_id_token, aws_role_arn)
# Set up gcs storage client
gcs_client = storage.Client()
# Set up AWS S3 client
s3_client = boto3.client(
"s3",
aws_access_key_id=aws_credentials["AccessKeyId"],
aws_secret_access_key=aws_credentials["SecretAccessKey"],
aws_session_token=aws_credentials["SessionToken"],
)
# Get the current date in UTC
today = datetime.now(timezone.utc).date()
# List objects in the specified folder
response = s3_client.list_objects_v2(Bucket=s3_bucket, prefix=s3_folder)
# Filter objects that were uploaded today
objs_today = [
obj
for obj in response.get("Contents", [])
if obj["LastModified"].date() == today
]
# If no files were uploaded today, retun None
if not objs_today:
logging.info("No files were uploaded to the s3 bucket today")
return
# Find the latest 2 files uploaded today
latest_files = sorted(
objs_today, key=lambda obj: obj["LastModified"], reverse=True
)[:2]
if latest_files:
for latest_file in latest_files:
# Download file from s3 directly to in memory file
file_stream = io.BytesIO()
s3_client.download_file(s3_bucket, latest_file["Key"], file_stream)
file_stream.seek(0)
# Upload file to GCP
gcp_blob_name = os.path.join(
gcs_folder, os.path.basename(latest_file["Key"])
)
gcp_blob = gcs_client.bucket(gcs_bucket).blob(gcp_blob_name)
gcp_blob.upload_from_filename(file_stream, rewind=True)
logging.info(
"Sucessfully transfered '%s' from '%s' in S3 to '%s' in GCS",
latest_file["Key"],
s3_bucket,
gcs_bucket,
)
I get the following error during when I try to list objects. Can you help?
This is the error:
ClientError('An error occurred (AccessDenied) when calling the ListObjectsV2 operation: Access Denied')
It happens during the execution of the line:
response = s3_client.list_objects_v2(Bucket=s3_bucket, Prefix=s3_folder)
Am I missing a permission? IS there any simpler way of retrieving the file than the above aporach?