Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Cloud Composer XComs Backend

What backend does Cloud Composer use by default for xComs? Does it just use the database or does it use a custom GCS based XComs Backend?

What is the max file size for sharing data between tasks via Xcoms / Taskflow?

Is adding a custom XComs backend supported?

Solved Solved
0 5 2,126
1 ACCEPTED SOLUTION

Yes, by default, Cloud Composer uses the metadata database for storing various metadata related to the workflows and tasks. The metadata database is an integral part of Airflow, the underlying technology of Cloud Composer.

In Cloud Composer, the recommended way to use Google Cloud Storage (GCS) for cross-communications between tasks is by leveraging intermediary data storage. This means saving the data to GCS in one task and then reading it from GCS in the subsequent task.

To use GCS for cross-communications, you can use the GCSHook provided by the airflow.providers.google.cloud.hooks.gcs module. The GCSHook allows you to interact with GCS and perform operations such as uploading and downloading files. You can save your data as files in GCS, and then use the GCSHook to read and process the data in the next task.

See Below example that uses GCS for XComs:

from airflow.decorators import dag, task
from airflow.providers.google.cloud.hooks.gcs import GCSHook

GCS_CONN_ID = "gcs_connection"
BUCKET = "your_bucket_name"

@task
def upload_to_gcs(data):
gcs_hook = GCSHook(gcp_conn_id=GCS_CONN_ID)
gcs_hook.upload(bucket_name=BUCKET, data=data, object_name="data.txt")

@task
def process_data():
gcs_hook = GCSHook(gcp_conn_id=GCS_CONN_ID)
data = gcs_hook.download(bucket_name=BUCKET, object_name="data.txt")
# Process the data as needed

# Define the DAG
@dag
def my_dag():
upload_task = upload_to_gcs("Some data")
process_task = process_data()
upload_task >> process_task

dag = my_dag()

Please Note: the upload_to_gcs task uploads the data to GCS using the GCSHook's upload method. The process_data task then downloads the data from GCS using the GCSHook's download method. You can perform your desired data processing operations within the process_data task.

View solution in original post

5 REPLIES 5