I'm trying to run a dataform pipeline using a specific tag using the client's libraries, but I can't find a code example, the tests I carried out tell me that the project is non-existent, or even that the dataform doesn't identify the flow, I'm trying to use the dataform_v1beta library, I need to run it via Python because I perform other functions between moving files between gcs and so on, and all of this I do via local Python, what do you recommend?
dataform_v1beta
library in Python, you can follow these steps:Step-by-Step Guide
Install the Google Cloud Dataform library: Ensure you have the google-cloud-dataform
package installed using pip
:
pip install google-cloud-dataform
Authenticate Your Application: Set up authentication using a service account key file or Application Default Credentials.
Initialize the Dataform Client: Create a DataformClient
to interact with the Dataform service.
Obtain Compilation Result ID: Compile your Dataform code (either using the UI or API) and retrieve the successful compilation result ID.
Run the Pipeline with the Specific Tag: Execute the pipeline using the desired tag and the compilation result ID.
from google.cloud import dataform_v1beta
from google.protobuf.field_mask_pb2 import FieldMask
def run_dataform_pipeline(project_id, location, repository_id, tag):
# Initialize the Dataform client
client = dataform_v1beta.DataformClient()
# Define the parent path
parent = f"projects/{project_id}/locations/{location}/repositories/{repository_id}"
# List workflows to find the one with the specified tag
list_workflows_request = dataform_v1beta.ListWorkflowsRequest(parent=parent)
workflows = client.list_workflows(request=list_workflows_request)
# Filter the workflow with the specified tag
target_workflow = None
for workflow in workflows:
if tag in workflow.tags:
target_workflow = workflow
break
if not target_workflow:
raise ValueError(f"No workflow found with the tag: {tag}")
# Start the workflow
start_workflow_request = dataform_v1beta.StartWorkflowRequest(
name=target_workflow.name
)
operation = client.start_workflow(request=start_workflow_request)
operation.result() # Wait for the operation to complete
print(f"Workflow with tag '{tag}' started successfully.")
# Example usage
project_id = "your-project-id"
location = "your-location"
repository_id = "your-repository-id"
tag = "your-specific-tag"
run_dataform_pipeline(project_id, location, repository_id, tag)
Ensure Proper Authentication: Before running the script, ensure your environment is authenticated. If using a service account, set the GOOGLE_APPLICATION_CREDENTIALS
environment variable:
export GOOGLE_APPLICATION_CREDENTIALS="/path/to/your/service-account-key.json"
Compilation Result: Obtain the compilation_result_id
by compiling your Dataform project through the Dataform UI or using the Dataform API:
compilation_result = client.compile_repository(
request={"name": f"{parent}"}
)
compilation_result_id = compilation_result.result().name.split('/')[-1]
Permissions: Ensure that the service account used has the appropriate permissions to create workflow invocations in your Dataform repository.
API Documentation: Refer to the Dataform API documentation for additional details and methods.
Error Handling: The try-except block in the example handles exceptions that might occur during the workflow invocation, which helps in identifying issues during execution.
Environment Configuration: Ensure your Google Cloud environment is configured correctly with all the required setups and permissions.
Hello @ms4446 , from what I saw the dataform does not have the StartWorkflowRequest function, on the other hand I am using this script to perhaps run everything just for testing, but the code I run does not show the execution in the dataform's execution history, could it be something related to the code or Is there any configuration that I should insert in the dataform? Using list_workflow_invocations I receive workflows along with tags, etc. Is there another way to execute a specific tag without using for? I already have all the tags, but I couldn't insert them in the request
from google.cloud import dataform_v1beta1
from google.cloud import dataform_v1beta1
from google.protobuf.field_mask_pb2 import FieldMask
from google.oauth2 import service_account
from google.cloud import storage, bigquery
from google.oauth2 import service_account
os.environ['https_proxy'] = 'http://10.105.160.5:8080'
os.environ['http_proxy'] = 'http://10.105.160.5:8080'
credentials_file = "P:/COMUM/FALCAO/30. EBB/09. Analistas ODC+CS+MS/br-apps-atendimentodva-qa-6ef8f7a85d97.json"
credentials = service_account.Credentials.from_service_account_file(credentials_file)
client = dataform_v1beta1.DataformClient(credentials=credentials)
# Example usage
project_id = "br-apps-atendimentodva-qa"
location = "us-central1"
repository_id = "Jasper"
tag = "base_ura_chamadas"
request = dataform_v1beta1.GetWorkflowInvocationRequest(
name="projects/br-apps-atendimentodva-qa/locations/us-central1/repositories/Jasper"
#use when run list_workflow_invocations
)
response = client.list_workflow_invocations(parent=parent)#use when run list_workflow_invocations
response = client.get_workflow_invocation(request=request)#use when run get_workflow_invocation
To run a Dataform pipeline using a specific tag and ensure that the execution is recorded in Dataform's execution history, you'll need to use the Dataform API correctly. The dataform_v1beta library provides the functionality to create workflow invocations and run pipelines, but some details must be carefully managed.
Below are some steps you can take and example script to run a Dataform workflow using a specific tag:
Install the Required Libraries: Ensure you have the google-cloud-dataform package installed:
pip install google-cloud-dataform
Authenticate Your Application: Set up authentication using a service account key file.
Initialize the Dataform Client: Create a DataformClient
to interact with the Dataform API.
Obtain the Compilation Result ID: You must compile the repository to get a compilation_result_id
. This compilation result is necessary to create a workflow invocation.
Create and Execute a Workflow Invocation: Use the create_workflow_invocation
method to create and start the workflow invocation using the specific tag.
import os
from google.cloud import dataform_v1beta1
from google.oauth2 import service_account
# Set up your proxy if needed
os.environ['https_proxy'] = 'http://10.105.160.5:8080'
os.environ['http_proxy'] = 'http://10.105.160.5:8080'
def run_dataform_pipeline(project_id, location, repository_id, tag):
# Path to your service account key file
credentials_file = "P:/COMUM/FALCAO/30. EBB/09. Analistas ODC+CS+MS/br-apps-atendimentodva-qa-6ef8f7a85d97.json"
credentials = service_account.Credentials.from_service_account_file(credentials_file)
# Initialize the Dataform client
client = dataform_v1beta1.DataformClient(credentials=credentials)
# Define the parent path
parent = f"projects/{project_id}/locations/{location}/repositories/{repository_id}"
# Compile the repository to obtain the compilation result ID
compile_request = dataform_v1beta1.CompileRepositoryRequest(name=parent)
compilation_result = client.compile_repository(request=compile_request)
compilation_result_id = compilation_result.name.split('/')[-1]
# Define the workflow invocation with the specific tag
workflow_invocation = dataform_v1beta1.WorkflowInvocation(
compilation_result=f"{parent}/compilationResults/{compilation_result_id}",
invocation_config=dataform_v1beta1.WorkflowInvocation.InvocationConfig(
included_tags=[tag]
)
)
# Create the workflow invocation request
request = dataform_v1beta1.CreateWorkflowInvocationRequest(
parent=parent,
workflow_invocation=workflow_invocation
)
try:
# Run the workflow invocation
operation = client.create_workflow_invocation(request=request)
response = operation.result() # Wait for the operation to complete
print(f"WorkflowInvocation created: {response.name}")
except Exception as e:
print(f"Error running Dataform pipeline: {e}")
raise
# Example usage
project_id = "br-apps-atendimentodva-qa"
location = "us-central1"
repository_id = "Jasper"
tag = "base_ura_chamadas"
run_dataform_pipeline(project_id, location, repository_id, tag)
Additional Notes
Authentication: Ensure your environment is properly authenticated and that the service account has the necessary permissions.
Permissions: Verify that the service account used in the script has access to the Dataform repository and permissions to create workflow invocations.
Compilation and Invocation: Make sure to compile your Dataform repository before creating a workflow invocation.
Proxy Configuration: Include proxy settings only if required by your network setup.
Ensure you replace the placeholders (project_id
, location
, repository_id
, tag
) with your actual values. The script should then compile the repository and create a workflow invocation that runs the Dataform pipeline using the specified tag.
Feel free to modify the script based on your setup or add more detailed logging or error handling as needed.
Hi, I'm trying to use this code but It gives me an error: "module google.cloud.dataform_v1beta1 has no attribute CompileRepositoryRequest".
Olá Roger, estou usando este código: o código ainda está pendente para obter a última compilação na qual ainda preciso automatizar, então faço esse processo manualmente no momento, caso contrário, ele já está em execução
from google.oauth2 import service_account
from google.cloud import storage, bigquery
from google.oauth2 import service_account
from google.cloud import dataform
import time
credentials_file = key.json"
credentials = service_account.Credentials.from_service_account_file(credentials_file)
client = dataform.DataformClient(credentials=credentials)
project_id = "name_project"
location = "us-central1"
repository_id = "Jasper"
name = f"projects/br-apps-atendimentodva-qa/locations/us-central1/repositories/Jasper/workflowInvocations/1718715402-e390ac61-5bde-4c11-969b-eaa9068e4d45"
parent = f"projects/{project_id}/locations/{location}/repositories/{repository_id}"
workflow_invocation = dataform.WorkflowInvocation()
invocationconfi = dataform.InvocationConfig(included_tags=["hugme"])
workflow_invocation.invocation_config = invocationconfi
# CODIGO ABAIXO NECESSITA ATUALIZAR PARA BUSCAR A ULTIMA COMPILAÇÃO
compilacao_manual = "d437a3a0-551d-435c-bfbd-495e01c2df34"
compilation_res = client.list_compilation_results(parent = parent )
# print(compilation_res.compilation_results )
for compilation in compilation_res.compilation_results:
# Verifica se o nome da compilação é igual ao compilacao_manual
if compilation.name == f"projects/br-apps-atendimentodva-qa/locations/us-central1/repositories/Jasper/compilationResults/{compilacao_manual}":
# Verifica se não há erros de compilação
if not compilation.compilation_errors:
if compilation.resolved_git_commit_sha:
resolved_git_commit_sha = compilation.resolved_git_commit_sha
name_compilation_res = compilation.name
print(name_compilation_res)
print(resolved_git_commit_sha)
workflow_invocation.compilation_result = name_compilation_res
# workflow_invocation.compilation_result = "projects/497423421340/locations/us-central1/repositories/Jasper/compilationResults/4ccdfb9f-1388-4c0c-bc62-5d457cd993fe"
# workflow_invocation.compilation_result = "projects/497423421340/locations/us-central1/repositories/Jasper/compilationResults/e574f4b0-5a24-469e-a606-d547468dc440"
# dir(dataform.CompilationResult_query)
# request = dataform.CreateWorkflowInvocationRequest(
# parent=parent,
# workflow_invocation=workflow_invocation,
# )
# response = client.create_workflow_invocation(request=request,)
# response = client.get_workflow_invocation(request=request)
teste = client.create_workflow_invocation( parent=parent,
workflow_invocation=workflow_invocation,)
# create_workflow_config
# response = client.create_workflow_invocation(request=request)
# Obtendo o ID da invocação criada
invocation_id = teste.name
invoked_workflow = client.get_workflow_invocation(name=invocation_id)
state = invoked_workflow.state
print(state)
# workflow_invocations {
# name: "projects/br-apps-atendimentodva-qa/locations/us-central1/repositories/Jasper/workflowInvocations/1718715402-e390ac61-5bde-4c11-969b-eaa9068e4d45"
# compilation_result: "projects/497423421340/locations/us-central1/repositories/Jasper/compilationResults/4ccdfb9f-1388-4c0c-bc62-5d457cd993fe"
# invocation_config {
# included_tags: "tods_pr_controleoperacao"
# }
Ola Guilherme, muito obrigado. Funcionou aqui, mas eu também vou precisar fazer a compilação automatizada para fazer substituições. Com compilação vc teve sucesso?
Olá Roger, infelizmente ainda não consegui, estou tentando atualizar este código para que seja possível execuções em thread's pois desta forma não é possível executar e receber o status.