What backend does Cloud Composer use by default for xComs? Does it just use the database or does it use a custom GCS based XComs Backend?
What is the max file size for sharing data between tasks via Xcoms / Taskflow?
Is adding a custom XComs backend supported?
Solved! Go to Solution.
Yes, by default, Cloud Composer uses the metadata database for storing various metadata related to the workflows and tasks. The metadata database is an integral part of Airflow, the underlying technology of Cloud Composer.
In Cloud Composer, the recommended way to use Google Cloud Storage (GCS) for cross-communications between tasks is by leveraging intermediary data storage. This means saving the data to GCS in one task and then reading it from GCS in the subsequent task.
To use GCS for cross-communications, you can use the GCSHook provided by the airflow.providers.google.cloud.hooks.gcs
module. The GCSHook allows you to interact with GCS and perform operations such as uploading and downloading files. You can save your data as files in GCS, and then use the GCSHook to read and process the data in the next task.
See Below example that uses GCS for XComs:
from airflow.decorators import dag, task
from airflow.providers.google.cloud.hooks.gcs import GCSHook
GCS_CONN_ID = "gcs_connection"
BUCKET = "your_bucket_name"
@task
def upload_to_gcs(data):
gcs_hook = GCSHook(gcp_conn_id=GCS_CONN_ID)
gcs_hook.upload(bucket_name=BUCKET, data=data, object_name="data.txt")
@task
def process_data():
gcs_hook = GCSHook(gcp_conn_id=GCS_CONN_ID)
data = gcs_hook.download(bucket_name=BUCKET, object_name="data.txt")
# Process the data as needed
# Define the DAG
@dag
def my_dag():
upload_task = upload_to_gcs("Some data")
process_task = process_data()
upload_task >> process_task
dag = my_dag()
Please Note: the upload_to_gcs
task uploads the data to GCS using the GCSHook's upload
method. The process_data
task then downloads the data from GCS using the GCSHook's download
method. You can perform your desired data processing operations within the process_data
task.
Please see answers to your questions inline:
What backend does Cloud Composer use by default for xComs?
By default, Cloud Composer uses XComs (cross-communications) for inter-task communication.
XComs are per-task-instance and designed for communication within a DAG run. They are different from Variables, which are global and used for overall configuration and value sharing.
Does it just use the database or does it use a custom GCS based XComs Backend?
Cloud Composer can use either the metadata database as the default backend for XComs or a custom XCom backend based on external storage systems
What is the max file size for sharing data between tasks via Xcoms / Taskflow?
The size limit for sharing data between tasks via cross-communications/TaskFlow in Cloud Composer is not specifically defined, as it depends on the chosen approach for data storage and the capabilities of the external system used.
Is adding a custom XComs backend supported?
Yes, in Cloud Composer, it is possible to add a custom cross-communications (XCom) backend. The XCom system in Airflow (the underlying technology of Cloud Composer) supports interchangeable backends, allowing you to configure a custom backend for XComs.
So by default the metadata database is used?
What is the recommended way to use GCS for XComs? Write a custom backend or is there any prebuilt support / off the shelf opinionated GCS backend for using with Composer?
Yes, by default, Cloud Composer uses the metadata database for storing various metadata related to the workflows and tasks. The metadata database is an integral part of Airflow, the underlying technology of Cloud Composer.
In Cloud Composer, the recommended way to use Google Cloud Storage (GCS) for cross-communications between tasks is by leveraging intermediary data storage. This means saving the data to GCS in one task and then reading it from GCS in the subsequent task.
To use GCS for cross-communications, you can use the GCSHook provided by the airflow.providers.google.cloud.hooks.gcs
module. The GCSHook allows you to interact with GCS and perform operations such as uploading and downloading files. You can save your data as files in GCS, and then use the GCSHook to read and process the data in the next task.
See Below example that uses GCS for XComs:
from airflow.decorators import dag, task
from airflow.providers.google.cloud.hooks.gcs import GCSHook
GCS_CONN_ID = "gcs_connection"
BUCKET = "your_bucket_name"
@task
def upload_to_gcs(data):
gcs_hook = GCSHook(gcp_conn_id=GCS_CONN_ID)
gcs_hook.upload(bucket_name=BUCKET, data=data, object_name="data.txt")
@task
def process_data():
gcs_hook = GCSHook(gcp_conn_id=GCS_CONN_ID)
data = gcs_hook.download(bucket_name=BUCKET, object_name="data.txt")
# Process the data as needed
# Define the DAG
@dag
def my_dag():
upload_task = upload_to_gcs("Some data")
process_task = process_data()
upload_task >> process_task
dag = my_dag()
Please Note: the upload_to_gcs
task uploads the data to GCS using the GCSHook's upload
method. The process_data
task then downloads the data from GCS using the GCSHook's download
method. You can perform your desired data processing operations within the process_data
task.
Ok great that makes sense.
Do you foresee any issues with creating a custom GCS backend like shown here and having XComs automatically passing? Thus allowing passing things around with the following semantics:
@task
def get_data():
large_blob = download_data()
return large_blob
@task
def process_data(data):
# Process the data as needed
# Define the DAG
@dag
def my_dag():
process_data(get_data(data))
dag = my_dag()
Creating a custom GCS backend for XComs, in the link that you provided, can be a viable approach for handling large data payloads in cross-communications within Cloud Composer. However, there are a few considerations and potential issues to be aware of:
Data Size and Performance: While using GCS for storing and passing large data payloads allows you to overcome the limitations of XComs, it's important to consider the size and performance implications. Large data transfers can introduce additional latency and impact the overall execution time of your tasks. You should assess the size of your data and ensure that the performance impact is acceptable for your use case.
Serialization and Deserialization: In the example you provided, the get_data
task returns a large blob of data, which is then passed to the process_data
task. You need to make sure that the custom XCom backend properly handles the serialization and deserialization of the data to and from JSON format. In the custom XCom class defined in the tutorial, you will need to implement the serialize_value
and deserialize_value
methods to handle the conversion between JSON and the desired data format.
XComs and Task Dependency: In your example, you have the get_data
task returning a value, which is then passed as an argument to the process_data
task. However, in Airflow/Cloud Composer, XComs are typically used for passing data between tasks, not for direct function call arguments. If you want to maintain the semantics you described, where the return value of one task is automatically passed to the next task, you would need to explicitly define the dependency between tasks using the >>
operator.
Here's an updated version of your code that shows the task dependency:
from airflow.decorators import dag, task
from airflow.utils.xcom import push, pull
@task
def get_data():
large_blob = download_data()
return large_blob
@task
def process_data(data):
# Process the data as needed
# Define the DAG
@dag
def my_dag():
data_task = get_data()
process_task = process_data(data_task.output)
data_task >> process_task
dag = my_dag()
In this modified code the get_data
task returns the large_blob
value, and it is passed to the process_data
task using the data_task.output
attribute. The data_task >> process_task
statement establishes the dependency between the tasks, ensuring that the output of get_data
is automatically passed to process_data
.