Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Is there Dataform API that can extract lineage on workflows?

Hello, 

Is there any Python APIs or some kind of metadata extractions we could have from Dataform? My team is looking to grab the lineage data (Compiled Graph) within Dataform. The purpose is to be able to extract external source data for specific workflows (by Tags). 

I tried to find the details by reading docs, but I couldn't find it. 

Solved Solved
0 1 1,284
1 ACCEPTED SOLUTION

Currently, there is no dedicated Python API for extracting lineage data from Dataform. However, with the release of Dataform version 0.13.0 in September 2022, there are now a few workarounds that you can employ to extract lineage data for specific workflows by tags.

  1. Using the Dataform CLI: The dataform lineage export --output-file lineage.json command was introduced in Dataform version 0.13.0. This command allows you to export lineage data to a JSON file. Once exported, you can use a Python script to parse the JSON and extract lineage data for specific workflows.

    Python Script for CLI Data:

     
    import json
    
    
    def extract_lineage_data_for_workflows(lineage_json_file: str, workflow_tags: list[str]) -> list[dict]:
        """Extracts the lineage data for specified workflows from the given JSON file.
    
        Args:
            lineage_json_file: Path to the JSON file containing lineage data.
            workflow_tags: List of tags for workflows to extract lineage data.
    
        Returns:
            List of dictionaries with lineage data for each workflow.
        """
    
        with open(lineage_json_file, "r") as f:
            lineage_data = json.load(f)
    
        workflow_lineage_data = [
            workflow for workflow in lineage_data["workflows"] if any(tag in workflow["tags"] for tag in workflow_tags)
        ]
        return workflow_lineage_data
    
    
    # Example usage:
    
    workflow_tags = ["my-workflow-tag"]
    lineage_data = extract_lineage_data_for_workflows("lineage.json", workflow_tags)
    
  2. Using the Dataform REST API: In the same version, Dataform introduced new REST API endpoints that can be utilized to extract lineage data.

    Python Script for REST API Data:

     
    import requests
    
    
    def extract_lineage_data_for_workflows(dataform_api_token: str, workflow_tags: list[str]) -> list[dict]:
        """Extracts lineage data for specified workflows from the Dataform REST API.
    
        Args:
            dataform_api_token: Dataform API token.
            workflow_tags: List of tags for workflows to extract lineage data.
    
        Returns:
            List of dictionaries with lineage data for each workflow.
        """
    
        headers = {"Authorization": f"Bearer {dataform_api_token}"}
        url = "https://api.dataform.com/v1/workflows"
        response = requests.get(url, headers=headers)
        workflows = response.json()["workflows"]
    
        workflow_lineage_data = []
        for workflow in workflows:
            if any(tag in workflow["tags"] for tag in workflow_tags):
                lineage_url = f"https://api.dataform.com/v1/workflows/{workflow['id']}/lineage"
                lineage_response = requests.get(lineage_url, headers=headers)
                workflow_lineage_data.append(lineage_response.json())
    
        return workflow_lineage_data
    
    
    # Example usage:
    
    dataform_api_token = "YOUR_DATAFORM_API_TOKEN"
    workflow_tags = ["my-workflow-tag"]
    lineage_data = extract_lineage_data_for_workflows(dataform_api_token, workflow_tags)
    

Important Notes:

  • Ensure you're using Dataform version 0.13.0 or later to access the features mentioned.
  • Always handle potential errors in the scripts, such as failed API requests or issues with reading the JSON file.
  • Keep your Dataform API token secure to prevent unauthorized access.
  • If making frequent requests to the Dataform REST API, be aware of any rate limits or API quotas.
  • Once you've extracted the lineage data, you can further analyze or use it to extract external source data. Consider providing more details or examples on how users can utilize the extracted lineage data for their specific needs.

View solution in original post

1 REPLY 1

Currently, there is no dedicated Python API for extracting lineage data from Dataform. However, with the release of Dataform version 0.13.0 in September 2022, there are now a few workarounds that you can employ to extract lineage data for specific workflows by tags.

  1. Using the Dataform CLI: The dataform lineage export --output-file lineage.json command was introduced in Dataform version 0.13.0. This command allows you to export lineage data to a JSON file. Once exported, you can use a Python script to parse the JSON and extract lineage data for specific workflows.

    Python Script for CLI Data:

     
    import json
    
    
    def extract_lineage_data_for_workflows(lineage_json_file: str, workflow_tags: list[str]) -> list[dict]:
        """Extracts the lineage data for specified workflows from the given JSON file.
    
        Args:
            lineage_json_file: Path to the JSON file containing lineage data.
            workflow_tags: List of tags for workflows to extract lineage data.
    
        Returns:
            List of dictionaries with lineage data for each workflow.
        """
    
        with open(lineage_json_file, "r") as f:
            lineage_data = json.load(f)
    
        workflow_lineage_data = [
            workflow for workflow in lineage_data["workflows"] if any(tag in workflow["tags"] for tag in workflow_tags)
        ]
        return workflow_lineage_data
    
    
    # Example usage:
    
    workflow_tags = ["my-workflow-tag"]
    lineage_data = extract_lineage_data_for_workflows("lineage.json", workflow_tags)
    
  2. Using the Dataform REST API: In the same version, Dataform introduced new REST API endpoints that can be utilized to extract lineage data.

    Python Script for REST API Data:

     
    import requests
    
    
    def extract_lineage_data_for_workflows(dataform_api_token: str, workflow_tags: list[str]) -> list[dict]:
        """Extracts lineage data for specified workflows from the Dataform REST API.
    
        Args:
            dataform_api_token: Dataform API token.
            workflow_tags: List of tags for workflows to extract lineage data.
    
        Returns:
            List of dictionaries with lineage data for each workflow.
        """
    
        headers = {"Authorization": f"Bearer {dataform_api_token}"}
        url = "https://api.dataform.com/v1/workflows"
        response = requests.get(url, headers=headers)
        workflows = response.json()["workflows"]
    
        workflow_lineage_data = []
        for workflow in workflows:
            if any(tag in workflow["tags"] for tag in workflow_tags):
                lineage_url = f"https://api.dataform.com/v1/workflows/{workflow['id']}/lineage"
                lineage_response = requests.get(lineage_url, headers=headers)
                workflow_lineage_data.append(lineage_response.json())
    
        return workflow_lineage_data
    
    
    # Example usage:
    
    dataform_api_token = "YOUR_DATAFORM_API_TOKEN"
    workflow_tags = ["my-workflow-tag"]
    lineage_data = extract_lineage_data_for_workflows(dataform_api_token, workflow_tags)
    

Important Notes:

  • Ensure you're using Dataform version 0.13.0 or later to access the features mentioned.
  • Always handle potential errors in the scripts, such as failed API requests or issues with reading the JSON file.
  • Keep your Dataform API token secure to prevent unauthorized access.
  • If making frequent requests to the Dataform REST API, be aware of any rate limits or API quotas.
  • Once you've extracted the lineage data, you can further analyze or use it to extract external source data. Consider providing more details or examples on how users can utilize the extracted lineage data for their specific needs.