Hey all,
I currently have a working Cloud Composer (Airflow) environment with 50 or so Dags. Generally, my code base required a lot of settings files like JSON and YML so I ended up modifying this base to instead of only uploading python dag files, it also uploads the JSON and YML.
from __future__ import annotations
import argparse
import glob
import os
from shutil import copytree, ignore_patterns
import tempfile
# Imports the Google Cloud client library
from google.cloud import storage
def _create_dags_list(dags_directory: str) -> tuple[str, list[str]]:
temp_dir = tempfile.mkdtemp()
# ignore non-DAG Python files
files_to_ignore = ignore_patterns("__init__.py", "*_test.py")
# Copy everything but the ignored files to a temp directory
copytree(dags_directory, f"{temp_dir}/", ignore=files_to_ignore, dirs_exist_ok=True)
# The only Python files left in our temp directory are DAG files
# so we can exclude all non Python files
dags = glob.glob(f"{temp_dir}/*.py")
return (temp_dir, dags)
def upload_dags_to_composer(
dags_directory: str, bucket_name: str, name_replacement: str = "dags/"
) -> None:
"""
Given a directory, this function moves all DAG files from that directory
to a temporary directory, then uploads all contents of the temporary directory
to a given cloud storage bucket
Args:
dags_directory (str): a fully qualified path to a directory that contains a "dags/" subdirectory
bucket_name (str): the GCS bucket of the Cloud Composer environment to upload DAGs to
name_replacement (str, optional): the name of the "dags/" subdirectory that will be used when constructing the temporary directory path name Defaults to "dags/".
"""
temp_dir, dags = _create_dags_list(dags_directory)
if len(dags) > 0:
# Note - the GCS client library does not currently support batch requests on uploads
# if you have a large number of files, consider using
# the Python subprocess module to run gsutil -m cp -r on your dags
# See https://cloud.google.com/storage/docs/gsutil/commands/cp for more info
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
for dag in dags:
# Remove path to temp dir
dag = dag.replace(f"{temp_dir}/", name_replacement)
try:
# Upload to your bucket
blob = bucket.blob(dag)
blob.upload_from_filename(dag)
print(f"File {dag} uploaded to {bucket_name}/{dag}.")
except FileNotFoundError:
current_directory = os.listdir()
print(
f"{name_replacement} directory not found in {current_directory}, you may need to override the default value of name_replacement to point to a relative directory"
)
raise
else:
print("No DAGs to upload.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
"--dags_directory",
help="Relative path to the source directory containing your DAGs",
)
parser.add_argument(
"--dags_bucket",
help="Name of the DAGs bucket of your Composer environment without the gs:// prefix",
)
args = parser.parse_args()
upload_dags_to_composer(args.dags_directory, args.dags_bucket)
The problem is that I need access to my git history so I can compare diffs to generate a list of recently changed files. Without doing this, the script will delete everything in the cloud storage bucket, which causes a long 10-15 minute buggy period in my Airflow instance while everything reloads and reconnects. The following function is what I found to get this list:
However, since it uses Cloud Build to create the dags file in Cloud Storage, Cloud Build is unable to run the .diff function since I believe it accesses the console. Does anyone have any ideas to either fix my function to work on cloud build so I can get the commit differences, or an alternative solution?
Sincerely,
Dylan
Hello,
Thank you for contacting the Google Cloud Community.
I have gone through your reported issue, however it seems like this is an issue observed specifically at your end. It would need more specific debugging and analysis. To ensure a faster resolution and dedicated support for your issue, I kindly request you to file a support ticket by clicking here[1]. Our support team will prioritize your request and provide you with the assistance you need.
For individual support issues, it is best to utilize the support ticketing system. We appreciate your cooperation!
[1]: https://cloud.google.com/support/docs/manage-cases#creating_cases
Hi Jaia,
No need for support from a support center, was just seeing if theres any devs out there who have had a similar use case and maybe could come up with a better solution.
Sincerely,
Dylan