Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

run a dataform pipeline using a dataform_v1beta tag

I'm trying to run a dataform pipeline using a specific tag using the client's libraries, but I can't find a code example, the tests I carried out tell me that the project is non-existent, or even that the dataform doesn't identify the flow, I'm trying to use the dataform_v1beta library, I need to run it via Python because I perform other functions between moving files between gcs and so on, and all of this I do via local Python, what do you recommend?

0 7 1,143
7 REPLIES 7

To run a Dataform pipeline using a specific tag in Dataform with the dataform_v1beta library in Python, you can follow these steps:
 

Step-by-Step Guide

  1. Install the Google Cloud Dataform library: Ensure you have the google-cloud-dataform package installed using pip:

     
    pip install google-cloud-dataform
    
  2. Authenticate Your Application: Set up authentication using a service account key file or Application Default Credentials.

  3. Initialize the Dataform Client: Create a DataformClient to interact with the Dataform service.

  4. Obtain Compilation Result ID: Compile your Dataform code (either using the UI or API) and retrieve the successful compilation result ID.

  5. Run the Pipeline with the Specific Tag: Execute the pipeline using the desired tag and the compilation result ID.

from google.cloud import dataform_v1beta
from google.protobuf.field_mask_pb2 import FieldMask

def run_dataform_pipeline(project_id, location, repository_id, tag):
    # Initialize the Dataform client
    client = dataform_v1beta.DataformClient()

    # Define the parent path
    parent = f"projects/{project_id}/locations/{location}/repositories/{repository_id}"

    # List workflows to find the one with the specified tag
    list_workflows_request = dataform_v1beta.ListWorkflowsRequest(parent=parent)
    workflows = client.list_workflows(request=list_workflows_request)
    
    # Filter the workflow with the specified tag
    target_workflow = None
    for workflow in workflows:
        if tag in workflow.tags:
            target_workflow = workflow
            break
    
    if not target_workflow:
        raise ValueError(f"No workflow found with the tag: {tag}")

    # Start the workflow
    start_workflow_request = dataform_v1beta.StartWorkflowRequest(
        name=target_workflow.name
    )
    operation = client.start_workflow(request=start_workflow_request)
    operation.result()  # Wait for the operation to complete

    print(f"Workflow with tag '{tag}' started successfully.")

# Example usage
project_id = "your-project-id"
location = "your-location"
repository_id = "your-repository-id"
tag = "your-specific-tag"

run_dataform_pipeline(project_id, location, repository_id, tag)

 

  • Ensure Proper Authentication: Before running the script, ensure your environment is authenticated. If using a service account, set the GOOGLE_APPLICATION_CREDENTIALS environment variable:

    export GOOGLE_APPLICATION_CREDENTIALS="/path/to/your/service-account-key.json"
  • Compilation Result: Obtain the compilation_result_id by compiling your Dataform project through the Dataform UI or using the Dataform API:

     
    compilation_result = client.compile_repository(
        request={"name": f"{parent}"}
    )
    compilation_result_id = compilation_result.result().name.split('/')[-1]
    
  • Permissions: Ensure that the service account used has the appropriate permissions to create workflow invocations in your Dataform repository.

  • API Documentation: Refer to the Dataform API documentation for additional details and methods.

  • Error Handling: The try-except block in the example handles exceptions that might occur during the workflow invocation, which helps in identifying issues during execution.

  • Environment Configuration: Ensure your Google Cloud environment is configured correctly with all the required setups and permissions.

 

 

Hello @ms4446 , from what I saw the dataform does not have the StartWorkflowRequest function, on the other hand I am using this script to perhaps run everything just for testing, but the code I run does not show the execution in the dataform's execution history, could it be something related to the code or Is there any configuration that I should insert in the dataform? Using list_workflow_invocations I receive workflows along with tags, etc. Is there another way to execute a specific tag without using for? I already have all the tags, but I couldn't insert them in the request

from google.cloud import dataform_v1beta1
from google.cloud import dataform_v1beta1
from google.protobuf.field_mask_pb2 import FieldMask
from google.oauth2 import service_account
from google.cloud import storage, bigquery
from google.oauth2 import service_account
os.environ['https_proxy'] = 'http://10.105.160.5:8080'
os.environ['http_proxy'] = 'http://10.105.160.5:8080'

credentials_file = "P:/COMUM/FALCAO/30. EBB/09. Analistas ODC+CS+MS/br-apps-atendimentodva-qa-6ef8f7a85d97.json"
credentials = service_account.Credentials.from_service_account_file(credentials_file)
client = dataform_v1beta1.DataformClient(credentials=credentials)
# Example usage
project_id = "br-apps-atendimentodva-qa"
location = "us-central1"
repository_id = "Jasper"
tag = "base_ura_chamadas"
request = dataform_v1beta1.GetWorkflowInvocationRequest(
    name="projects/br-apps-atendimentodva-qa/locations/us-central1/repositories/Jasper"
    #use when run list_workflow_invocations
)

response = client.list_workflow_invocations(parent=parent)#use when run list_workflow_invocations
response = client.get_workflow_invocation(request=request)#use when run get_workflow_invocation

 

To run a Dataform pipeline using a specific tag and ensure that the execution is recorded in Dataform's execution history, you'll need to use the Dataform API correctly. The dataform_v1beta library provides the functionality to create workflow invocations and run pipelines, but some details must be carefully managed.

Below are some steps you can take and example script to run a Dataform workflow using a specific tag:

  1. Install the Required Libraries: Ensure you have the google-cloud-dataform package installed:

     
    pip install google-cloud-dataform
    
  2. Authenticate Your Application: Set up authentication using a service account key file.

  3. Initialize the Dataform Client: Create a DataformClient to interact with the Dataform API.

  4. Obtain the Compilation Result ID: You must compile the repository to get a compilation_result_id. This compilation result is necessary to create a workflow invocation.

  5. Create and Execute a Workflow Invocation: Use the create_workflow_invocation method to create and start the workflow invocation using the specific tag.

import os
from google.cloud import dataform_v1beta1
from google.oauth2 import service_account

# Set up your proxy if needed
os.environ['https_proxy'] = 'http://10.105.160.5:8080'
os.environ['http_proxy'] = 'http://10.105.160.5:8080'

def run_dataform_pipeline(project_id, location, repository_id, tag):
    # Path to your service account key file
    credentials_file = "P:/COMUM/FALCAO/30. EBB/09. Analistas ODC+CS+MS/br-apps-atendimentodva-qa-6ef8f7a85d97.json"
    credentials = service_account.Credentials.from_service_account_file(credentials_file)

    # Initialize the Dataform client
    client = dataform_v1beta1.DataformClient(credentials=credentials)

    # Define the parent path
    parent = f"projects/{project_id}/locations/{location}/repositories/{repository_id}"

    # Compile the repository to obtain the compilation result ID
    compile_request = dataform_v1beta1.CompileRepositoryRequest(name=parent)
    compilation_result = client.compile_repository(request=compile_request)
    compilation_result_id = compilation_result.name.split('/')[-1]

    # Define the workflow invocation with the specific tag
    workflow_invocation = dataform_v1beta1.WorkflowInvocation(
        compilation_result=f"{parent}/compilationResults/{compilation_result_id}",
        invocation_config=dataform_v1beta1.WorkflowInvocation.InvocationConfig(
            included_tags=[tag]
        )
    )

    # Create the workflow invocation request
    request = dataform_v1beta1.CreateWorkflowInvocationRequest(
        parent=parent,
        workflow_invocation=workflow_invocation
    )

    try:
        # Run the workflow invocation
        operation = client.create_workflow_invocation(request=request)
        response = operation.result()  # Wait for the operation to complete

        print(f"WorkflowInvocation created: {response.name}")
    except Exception as e:
        print(f"Error running Dataform pipeline: {e}")
        raise

# Example usage
project_id = "br-apps-atendimentodva-qa"
location = "us-central1"
repository_id = "Jasper"
tag = "base_ura_chamadas"

run_dataform_pipeline(project_id, location, repository_id, tag)

Additional Notes

  • Authentication: Ensure your environment is properly authenticated and that the service account has the necessary permissions.

  • Permissions: Verify that the service account used in the script has access to the Dataform repository and permissions to create workflow invocations.

  • Compilation and Invocation: Make sure to compile your Dataform repository before creating a workflow invocation.

  • Proxy Configuration: Include proxy settings only if required by your network setup.

Ensure you replace the placeholders (project_id, location, repository_id, tag) with your actual values. The script should then compile the repository and create a workflow invocation that runs the Dataform pipeline using the specified tag.

Feel free to modify the script based on your setup or add more detailed logging or error handling as needed.

Hi, I'm trying to use this code but It gives me an error: "module google.cloud.dataform_v1beta1 has no attribute CompileRepositoryRequest".

Olá Roger,  estou usando este código: o código ainda está pendente para obter a última compilação na qual ainda preciso automatizar, então faço esse processo manualmente no momento, caso contrário, ele já está em execução

 
 

 

 

 

from google.oauth2 import service_account
from google.cloud import storage, bigquery
from google.oauth2 import service_account
from google.cloud import dataform
import time
credentials_file = key.json"
credentials = service_account.Credentials.from_service_account_file(credentials_file)
client = dataform.DataformClient(credentials=credentials)

project_id = "name_project"
location = "us-central1"
repository_id = "Jasper"

name = f"projects/br-apps-atendimentodva-qa/locations/us-central1/repositories/Jasper/workflowInvocations/1718715402-e390ac61-5bde-4c11-969b-eaa9068e4d45"
parent = f"projects/{project_id}/locations/{location}/repositories/{repository_id}"

workflow_invocation = dataform.WorkflowInvocation()
invocationconfi = dataform.InvocationConfig(included_tags=["hugme"])
workflow_invocation.invocation_config = invocationconfi

# CODIGO ABAIXO NECESSITA ATUALIZAR PARA BUSCAR A ULTIMA COMPILAÇÃO
compilacao_manual = "d437a3a0-551d-435c-bfbd-495e01c2df34"
compilation_res = client.list_compilation_results(parent = parent )
# print(compilation_res.compilation_results )


for compilation in compilation_res.compilation_results:
    # Verifica se o nome da compilação é igual ao compilacao_manual
    if compilation.name == f"projects/br-apps-atendimentodva-qa/locations/us-central1/repositories/Jasper/compilationResults/{compilacao_manual}":
     
        # Verifica se não há erros de compilação
        if not compilation.compilation_errors:
            if compilation.resolved_git_commit_sha:
                resolved_git_commit_sha = compilation.resolved_git_commit_sha
                name_compilation_res = compilation.name
print(name_compilation_res)

print(resolved_git_commit_sha)

workflow_invocation.compilation_result = name_compilation_res
# workflow_invocation.compilation_result = "projects/497423421340/locations/us-central1/repositories/Jasper/compilationResults/4ccdfb9f-1388-4c0c-bc62-5d457cd993fe"
# workflow_invocation.compilation_result = "projects/497423421340/locations/us-central1/repositories/Jasper/compilationResults/e574f4b0-5a24-469e-a606-d547468dc440"

# dir(dataform.CompilationResult_query)

# request = dataform.CreateWorkflowInvocationRequest(
#     parent=parent,
#     workflow_invocation=workflow_invocation,
# )

# response = client.create_workflow_invocation(request=request,)

# response = client.get_workflow_invocation(request=request)

teste = client.create_workflow_invocation( parent=parent,
    workflow_invocation=workflow_invocation,)
# create_workflow_config
# response = client.create_workflow_invocation(request=request)

# Obtendo o ID da invocação criada
invocation_id = teste.name
invoked_workflow = client.get_workflow_invocation(name=invocation_id)
state = invoked_workflow.state
print(state)

# workflow_invocations {
#   name: "projects/br-apps-atendimentodva-qa/locations/us-central1/repositories/Jasper/workflowInvocations/1718715402-e390ac61-5bde-4c11-969b-eaa9068e4d45"
#   compilation_result: "projects/497423421340/locations/us-central1/repositories/Jasper/compilationResults/4ccdfb9f-1388-4c0c-bc62-5d457cd993fe"
#   invocation_config {
#     included_tags: "tods_pr_controleoperacao"
#   }

 

 

 

Ola Guilherme, muito obrigado. Funcionou aqui, mas eu também vou precisar fazer a compilação automatizada para fazer substituições. Com compilação vc teve sucesso?

Olá Roger, infelizmente ainda não consegui, estou tentando atualizar este código para que seja possível execuções em thread's pois desta forma não é possível executar e receber o status.