I have an s3 bucket (amazon) and I want to write a composer pipeline to upload csv files into the S3 bucket on a daily schedule. Currently all many data is in BigQuery and I'll convert it into csvs and will put it into an S3 bucket. Does anyone have any examples of doing this? Which is the easiest way to do this? I shall be grateful if someone can help CC: @ms4446
Solved! Go to Solution.
For your scenario, where you're aiming to upload files to an Amazon S3 bucket from a Cloud Composer and you have a single user access requirement, you don't necessarily need to use a third-party tool like JumpCloud for identity access management. AWS IAM and Google Cloud's IAM, along with the concept of workload identity federation, can suffice for your needs.
Given your use case, here's a simplified approach without needing third-party identity providers:
https://accounts.google.com
.Steps to Implement Workload Identity Federation
Create an AWS IAM Role:
Set Up Google Cloud Service Account:
Obtain Google Service Account Credentials:
Exchange Tokens for AWS Credentials:
Example
import boto3
# Assuming you have obtained the Google ID token by authenticating with your service account
google_id_token = 'YOUR_GOOGLE_ID_TOKEN'
# Assume the AWS role
sts_client = boto3.client('sts')
assumed_role_object = sts_client.assume_role_with_web_identity(
RoleArn="arn:aws:iam::AWS_ACCOUNT_ID:role/YOUR_AWS_ROLE",
RoleSessionName="SessionName",
WebIdentityToken=google_id_token
)
credentials = assumed_role_object['Credentials']
# Now you can use these temporary credentials to access AWS services
s3_client = boto3.client(
's3',
aws_access_key_id=credentials['AccessKeyId'],
aws_secret_access_key=credentials['SecretAccessKey'],
aws_session_token=credentials['SessionToken']
)
# Example: List buckets
response = s3_client.list_buckets()
print(response)
If you're finding it challenging, consider the following resources:
Hi @ayushmaheshwari ,
Creating a data pipeline in Google Cloud Composer to upload CSV files from Google BigQuery to an Amazon S3 bucket involves several steps. Google Cloud Composer is a managed Apache Airflow service, which makes it easier to create, schedule, and monitor your workflows. Here's a step-by-step guide to achieve your goal:
Step-by-Step Guide
Google Cloud Composer Environment: Ensure a Cloud Composer environment is set up. Follow Google Cloud's best practices for configuration to align with your project's needs.
Google BigQuery: Prepare your data within BigQuery, ensuring it's correctly formatted and indexed for efficient extraction.
Amazon S3 Bucket: Set up an S3 bucket for storing your CSV files. Apply best practices for bucket naming, security, and data lifecycle management.
AWS Credentials: Securely store your AWS credentials. Use Airflow's built-in Secrets Backend, such as Google Cloud Secret Manager, for storing the AWS access key ID and secret access key. Configure an Airflow connection of type 'Amazon Web Services' with these credentials.
Create an Airflow DAG
Define your DAG to outline the data pipeline's workflow. Incorporate error handling, dynamic file naming, and clear task descriptions for improved maintainability and operability.
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.providers.google.cloud.transfers.bigquery_to_gcs import BigQueryToGCSOperator
from airflow.providers.amazon.aws.transfers.gcs_to_s3 import GCSToS3Operator
from datetime import timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': True, # Enable email notifications on failure
'email': ['your_email@example.com'],
'retries': 2, # Increase retries for robustness
'retry_delay': timedelta(minutes=5),
}
with DAG(
'bq_to_s3_pipeline',
default_args=default_args,
description='Transfers data from BigQuery to S3 with improved practices',
schedule_interval='0 0 * * *',
start_date=days_ago(1),
catchup=False,
) as dag:
extract_from_bq = BigQueryToGCSOperator(
task_id='extract_from_bq',
source_project_dataset_table='your_project.your_dataset.your_table',
destination_cloud_storage_uris=['gs://your-gcs-bucket/temporary/data.csv'],
export_format='CSV',
)
transfer_to_s3 = GCSToS3Operator(
task_id='transfer_to_s3',
bucket='your-s3-bucket',
s3_key='data/{{ ds }}.csv',
gcp_conn_id='google_cloud_default',
aws_conn_id='your_aws_connection_id',
src_bucket='your-gcs-bucket',
src_object='temporary/data.csv',
)
extract_from_bq >> transfer_to_s3
Key Points and Customization
Error Handling: Utilize Airflow's built-in mechanisms like email_on_failure
to alert on task failures. Consider custom error handling logic for more complex scenarios.
Dynamic File Naming: Use Airflow's templating capabilities to dynamically name files based on the DAG run date or other variables, facilitating better data organization.
Data Transformation: For scenarios requiring data transformation between extraction and upload, integrate PythonOperator
or BashOperator
tasks that perform these transformations.
Permissions and Security: Ensure that the IAM roles for both Google Cloud and AWS have the minimum necessary permissions, adhering to the principle of least privilege.
Monitoring and Maintenance: Regularly monitor the DAG's performance through Airflow's UI. Set up additional monitoring and alerting through Google Cloud's operations suite for comprehensive visibility.
Thanks @ms4446 , I get all the steps. Instead of using the GCStoS3Operator, why not using the python boto3 library as below?
from google.cloud import bigquery
import boto3
from google.cloud.exceptions import NotFound
from datetime import datetime
def export_bigquery_view_to_s3(query, aws_access_key_id, aws_secret_access_key, s3_bucket, s3_key):
# Initialize BigQuery and S3 clients
bq_client = bigquery.Client()
s3_client = boto3.client('s3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)
# Execute the query to get the data from the view
query_job = bq_client.query(query)
results = query_job.result()
# Write the results to a temporary CSV file
temp_csv_file = '/tmp/temp_data.csv'
with open(temp_csv_file, 'w') as f:
for row in results:
f.write(','.join([str(field) for field in row]) + '\n')
# Define the destination URI for the export
destination_uri = f's3://{s3_bucket}/{s3_key}'
try:
# Upload the file to S3
s3_client.upload_file(temp_csv_file, s3_bucket, s3_key)
print(f'Data exported from BigQuery view query to {destination_uri}')
except Exception as e:
print(f'Failed to upload data to S3: {e}')
def main():
# Set the SQL query for your BigQuery view
query = """
SELECT *
FROM `your_project.your_dataset.your_view`
"""
# Set AWS S3 bucket information
aws_access_key_id = "your-aws-access-key-id"
aws_secret_access_key = "your-aws-secret-access-key"
s3_bucket = "your-s3-bucket"
s3_key = f"exported_data_{datetime.now().strftime('%Y%m%d%H%M%S')}.csv"
# Export data from BigQuery view to S3
export_bigquery_view_to_s3(query, aws_access_key_id, aws_secret_access_key, s3_bucket, s3_key)
if __name__ == "__main__":
main()
Yes, using the boto3
library is a perfectly valid approach for transferring data from Google BigQuery to Amazon S3, especially when you need more control over the process or when the existing Airflow operators do not meet your specific requirements.
Hi @ms4446 , further to our conversations above to transfer file from GCS to AWS s3, I am planning to use temporary AWS credentials as described here
This instructs using GCP VM and then installing Amazon CLI to request credentials as described. Do you suggest any better way to have temporary AWS credentials?
The blog post you referenced suggests using a VM on Google Cloud to install the AWS CLI primarily because it's a common scenario where an individual or a system might interact with AWS services from within GCP infrastructure. However, this is not necessary for all use cases, especially when dealing with programmatic access from applications or services like Cloud Composer.
In the context of Google Cloud Composer, you don't necessarily need to set up a separate VM for the sole purpose of installing the AWS CLI, unless your specific workflow requires CLI features that are not available through the AWS SDKs (e.g., boto3 in Python).
For your use case, where you're planning to transfer data from Google BigQuery to Amazon S3, you can indeed work directly through Cloud Composer without setting up a separate VM, by leveraging temporary credentials securely. Here's how you can adapt the workload identity federation approach for Cloud Composer:
Federation Configuration
Setup Workload Identity Federation: Begin by configuring an AWS IAM Identity Provider and IAM Role according to AWS documentation. This setup enables a Google Cloud service account to assume an AWS IAM Role, facilitating secure cross-cloud authentication without the management of long-term AWS access keys.
Assuming the AWS IAM Role
Obtain Temporary AWS Credentials: Utilize the AWS SDK for Python (boto3
) within Cloud Composer to programmatically assume the IAM Role. This crucial step grants you temporary credentials (access key ID, secret access key, and session token), necessary for authenticating requests to AWS S3.
Modifying Your Script for Temporary Credentials
Incorporate the process of assuming the AWS IAM Role into your data transfer script with boto3
. This involves exchanging a Google-provided identity token for AWS temporary credentials.
Code Example (Python):
import boto3
def get_google_identity_token():
# Implement using Google's authentication libraries
def get_aws_temp_credentials(google_identity_token, aws_role_arn):
sts_client = boto3.client('sts')
response = sts_client.assume_role_with_web_identity(
RoleArn=aws_role_arn,
RoleSessionName="AssumeRoleSession",
WebIdentityToken=google_identity_token
)
return response['Credentials']
# Example usage
google_identity_token = get_google_identity_token()
temp_credentials = get_aws_temp_credentials(google_identity_token, "arn:aws:iam::123456789012:role/YourAWSS3Role")
s3_client = boto3.client(
's3',
aws_access_key_id=temp_credentials['AccessKeyId'],
aws_secret_access_key=temp_credentials['SecretAccessKey'],
aws_session_token=temp_credentials['SessionToken'],
)
Important Considerations
Security and Credential Management: Prioritize secure credential exchange and handling.
Environment Configuration: Meticulously verify the configuration of your Cloud Composer environment and AWS IAM role.
Error Handling: Implement robust error handling procedures.
Integration with Airflow: Integrate with Airflow using a PythonOperator.
Documentation and Maintenance: Maintain detailed documentation for future reference.
@ms4446 I am struggling to get / implement Google's authentication libraries
For your scenario, where you're aiming to upload files to an Amazon S3 bucket from a Cloud Composer and you have a single user access requirement, you don't necessarily need to use a third-party tool like JumpCloud for identity access management. AWS IAM and Google Cloud's IAM, along with the concept of workload identity federation, can suffice for your needs.
Given your use case, here's a simplified approach without needing third-party identity providers:
https://accounts.google.com
.Steps to Implement Workload Identity Federation
Create an AWS IAM Role:
Set Up Google Cloud Service Account:
Obtain Google Service Account Credentials:
Exchange Tokens for AWS Credentials:
Example
import boto3
# Assuming you have obtained the Google ID token by authenticating with your service account
google_id_token = 'YOUR_GOOGLE_ID_TOKEN'
# Assume the AWS role
sts_client = boto3.client('sts')
assumed_role_object = sts_client.assume_role_with_web_identity(
RoleArn="arn:aws:iam::AWS_ACCOUNT_ID:role/YOUR_AWS_ROLE",
RoleSessionName="SessionName",
WebIdentityToken=google_id_token
)
credentials = assumed_role_object['Credentials']
# Now you can use these temporary credentials to access AWS services
s3_client = boto3.client(
's3',
aws_access_key_id=credentials['AccessKeyId'],
aws_secret_access_key=credentials['SecretAccessKey'],
aws_session_token=credentials['SessionToken']
)
# Example: List buckets
response = s3_client.list_buckets()
print(response)
If you're finding it challenging, consider the following resources:
Thanks a lot @ms4446 , this is the code I have for
def get_google_identity_token():
audience = 'https://s3.amazonaws.com'
# #1 Get the default credential to generate the access token
credentials, project_id = google.auth.default(
scopes='https://www.googleapis.com/auth/iam')
# #2 To use the current service account email
service_account_email = "my_service_Account_email.iam.gserviceaccount.com"
# #3 prepare the call the the service account credentials API
sa_credentials_url = f'https://iamcredentials.googleapis.com/' \
f'v1/projects/-/serviceAccounts/' \
f'{service_account_email}:generateIdToken'
headers = {'Content-Type': 'application/json'}
# Create an AuthorizedSession that includes
# automatically the access_token based on your credentials
authed_session = AuthorizedSession(credentials)
body = json.dumps({'audience': audience})
# Make the call
token_response = authed_session.request('POST',sa_credentials_url,
data=body, headers=headers)
jwt = token_response.json()
id_token = jwt['token']
return id_token
The above problem is solved, please ignore @ms4446
@ayushmaheshwari Was it a permission issue? Please share how you solved it.
@ms4446 , instead of using
credentials, project_id = google.auth.default(
scopes='https://www.googleapis.com/auth/iam')
I replaced it with :
credentials = service_account.Credentials.from_service_account_file(
service_account_json_path, scopes=["https://www.googleapis.com/auth/iam"]