Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Cloud Composer Dag Storage

Hey all,

I currently have a working Cloud Composer (Airflow) environment with 50 or so Dags. Generally, my code base required a lot of settings files like JSON and YML so I ended up modifying this base to instead of only uploading python dag files, it also uploads the JSON and YML.

from __future__ import annotations

import argparse
import glob
import os
from shutil import copytree, ignore_patterns
import tempfile

# Imports the Google Cloud client library
from google.cloud import storage


def _create_dags_list(dags_directory: str) -> tuple[str, list[str]]:
    temp_dir
= tempfile.mkdtemp()

   
# ignore non-DAG Python files
    files_to_ignore
= ignore_patterns("__init__.py", "*_test.py")

   
# Copy everything but the ignored files to a temp directory
    copytree
(dags_directory, f"{temp_dir}/", ignore=files_to_ignore, dirs_exist_ok=True)

   
# The only Python files left in our temp directory are DAG files
   
# so we can exclude all non Python files
    dags
= glob.glob(f"{temp_dir}/*.py")
   
return (temp_dir, dags)


def upload_dags_to_composer(
    dags_directory
: str, bucket_name: str, name_replacement: str = "dags/"
) -> None:
   
"""
    Given a directory, this function moves all DAG files from that directory
    to a temporary directory, then uploads all contents of the temporary directory
    to a given cloud storage bucket
    Args:
        dags_directory (str): a fully qualified path to a directory that contains a "dags/" subdirectory
        bucket_name (str): the GCS bucket of the Cloud Composer environment to upload DAGs to
        name_replacement (str, optional): the name of the "dags/" subdirectory that will be used when constructing the temporary directory path name Defaults to "dags/".
    """

    temp_dir
, dags = _create_dags_list(dags_directory)

   
if len(dags) > 0:
       
# Note - the GCS client library does not currently support batch requests on uploads
       
# if you have a large number of files, consider using
       
# the Python subprocess module to run gsutil -m cp -r on your dags
       
# See https://cloud.google.com/storage/docs/gsutil/commands/cp for more info
        storage_client
= storage.Client()
        bucket
= storage_client.bucket(bucket_name)

       
for dag in dags:
           
# Remove path to temp dir
            dag
= dag.replace(f"{temp_dir}/", name_replacement)

           
try:
               
# Upload to your bucket
                blob
= bucket.blob(dag)
                blob
.upload_from_filename(dag)
               
print(f"File {dag} uploaded to {bucket_name}/{dag}.")
           
except FileNotFoundError:
                current_directory
= os.listdir()
               
print(
                    f
"{name_replacement} directory not found in {current_directory}, you may need to override the default value of name_replacement to point to a relative directory"
               
)
               
raise

   
else:
       
print("No DAGs to upload.")


if __name__ == "__main__":
    parser
= argparse.ArgumentParser(
        description
=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
   
)
    parser
.add_argument(
       
"--dags_directory",
        help
="Relative path to the source directory containing your DAGs",
   
)
    parser
.add_argument(
       
"--dags_bucket",
        help
="Name of the DAGs bucket of your Composer environment without the gs:// prefix",
   
)

    args
= parser.parse_args()

    upload_dags_to_composer
(args.dags_directory, args.dags_bucket)

The problem is that I need access to my git history so I can compare diffs to generate a list of recently  changed files. Without doing this, the script will delete everything in the cloud storage bucket, which causes a long 10-15 minute buggy period in my Airflow instance while everything reloads and reconnects. The following function is what I found to get this list:

 

def get_recent_changed_files():
  from git import Repo
  # Open the Git repository
  repo = Repo(".")

  # Get the HEAD commit and the previous commit
  head_commit = repo.head.commit
  previous_commit = head_commit.parents[0]

  # Get the list of files changed between the two commits
  changed_files = head_commit.diff(previous_commit)

  # Extract filenames from the diff objects
  filenames = [f.a_path for f in changed_files]
  return filenames

However, since it uses Cloud Build to create the dags file in Cloud Storage, Cloud Build is unable to run the .diff function since I believe it accesses the console. Does anyone have any ideas to either fix my function to work on cloud build so I can get the commit differences, or an alternative solution?

Sincerely,

Dylan

0 2 1,258
2 REPLIES 2