Hello,
Is there any Python APIs or some kind of metadata extractions we could have from Dataform? My team is looking to grab the lineage data (Compiled Graph) within Dataform. The purpose is to be able to extract external source data for specific workflows (by Tags).
I tried to find the details by reading docs, but I couldn't find it.
Solved! Go to Solution.
Currently, there is no dedicated Python API for extracting lineage data from Dataform. However, with the release of Dataform version 0.13.0 in September 2022, there are now a few workarounds that you can employ to extract lineage data for specific workflows by tags.
Using the Dataform CLI: The dataform lineage export --output-file lineage.json
command was introduced in Dataform version 0.13.0. This command allows you to export lineage data to a JSON file. Once exported, you can use a Python script to parse the JSON and extract lineage data for specific workflows.
Python Script for CLI Data:
import json
def extract_lineage_data_for_workflows(lineage_json_file: str, workflow_tags: list[str]) -> list[dict]:
"""Extracts the lineage data for specified workflows from the given JSON file.
Args:
lineage_json_file: Path to the JSON file containing lineage data.
workflow_tags: List of tags for workflows to extract lineage data.
Returns:
List of dictionaries with lineage data for each workflow.
"""
with open(lineage_json_file, "r") as f:
lineage_data = json.load(f)
workflow_lineage_data = [
workflow for workflow in lineage_data["workflows"] if any(tag in workflow["tags"] for tag in workflow_tags)
]
return workflow_lineage_data
# Example usage:
workflow_tags = ["my-workflow-tag"]
lineage_data = extract_lineage_data_for_workflows("lineage.json", workflow_tags)
Using the Dataform REST API: In the same version, Dataform introduced new REST API endpoints that can be utilized to extract lineage data.
Python Script for REST API Data:
import requests
def extract_lineage_data_for_workflows(dataform_api_token: str, workflow_tags: list[str]) -> list[dict]:
"""Extracts lineage data for specified workflows from the Dataform REST API.
Args:
dataform_api_token: Dataform API token.
workflow_tags: List of tags for workflows to extract lineage data.
Returns:
List of dictionaries with lineage data for each workflow.
"""
headers = {"Authorization": f"Bearer {dataform_api_token}"}
url = "https://api.dataform.com/v1/workflows"
response = requests.get(url, headers=headers)
workflows = response.json()["workflows"]
workflow_lineage_data = []
for workflow in workflows:
if any(tag in workflow["tags"] for tag in workflow_tags):
lineage_url = f"https://api.dataform.com/v1/workflows/{workflow['id']}/lineage"
lineage_response = requests.get(lineage_url, headers=headers)
workflow_lineage_data.append(lineage_response.json())
return workflow_lineage_data
# Example usage:
dataform_api_token = "YOUR_DATAFORM_API_TOKEN"
workflow_tags = ["my-workflow-tag"]
lineage_data = extract_lineage_data_for_workflows(dataform_api_token, workflow_tags)
Important Notes:
Currently, there is no dedicated Python API for extracting lineage data from Dataform. However, with the release of Dataform version 0.13.0 in September 2022, there are now a few workarounds that you can employ to extract lineage data for specific workflows by tags.
Using the Dataform CLI: The dataform lineage export --output-file lineage.json
command was introduced in Dataform version 0.13.0. This command allows you to export lineage data to a JSON file. Once exported, you can use a Python script to parse the JSON and extract lineage data for specific workflows.
Python Script for CLI Data:
import json
def extract_lineage_data_for_workflows(lineage_json_file: str, workflow_tags: list[str]) -> list[dict]:
"""Extracts the lineage data for specified workflows from the given JSON file.
Args:
lineage_json_file: Path to the JSON file containing lineage data.
workflow_tags: List of tags for workflows to extract lineage data.
Returns:
List of dictionaries with lineage data for each workflow.
"""
with open(lineage_json_file, "r") as f:
lineage_data = json.load(f)
workflow_lineage_data = [
workflow for workflow in lineage_data["workflows"] if any(tag in workflow["tags"] for tag in workflow_tags)
]
return workflow_lineage_data
# Example usage:
workflow_tags = ["my-workflow-tag"]
lineage_data = extract_lineage_data_for_workflows("lineage.json", workflow_tags)
Using the Dataform REST API: In the same version, Dataform introduced new REST API endpoints that can be utilized to extract lineage data.
Python Script for REST API Data:
import requests
def extract_lineage_data_for_workflows(dataform_api_token: str, workflow_tags: list[str]) -> list[dict]:
"""Extracts lineage data for specified workflows from the Dataform REST API.
Args:
dataform_api_token: Dataform API token.
workflow_tags: List of tags for workflows to extract lineage data.
Returns:
List of dictionaries with lineage data for each workflow.
"""
headers = {"Authorization": f"Bearer {dataform_api_token}"}
url = "https://api.dataform.com/v1/workflows"
response = requests.get(url, headers=headers)
workflows = response.json()["workflows"]
workflow_lineage_data = []
for workflow in workflows:
if any(tag in workflow["tags"] for tag in workflow_tags):
lineage_url = f"https://api.dataform.com/v1/workflows/{workflow['id']}/lineage"
lineage_response = requests.get(lineage_url, headers=headers)
workflow_lineage_data.append(lineage_response.json())
return workflow_lineage_data
# Example usage:
dataform_api_token = "YOUR_DATAFORM_API_TOKEN"
workflow_tags = ["my-workflow-tag"]
lineage_data = extract_lineage_data_for_workflows(dataform_api_token, workflow_tags)
Important Notes: