Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Management of the business glossary from composer and python

I am trying to automate the process of creation and management of business glossary, terms, synonyms.... This process is going to be automatic from a composer, which collects the information from a BigQuery table, interprets it and creates the terms and assigns them to the entries. I have been looking for a Python API that can do this or Airflow Operators, but I have not found any way to do it. Is there an alternative process that resembles it?

0 1 153
1 REPLY 1

Hi @raprieto,

Welcome to Google Cloud Community!

With your automation project you can try the following options:

    • Custom Python Scripts with BigQuery API:
      • Install necessary Python libraries, use:
        pip install google-cloud-bigquery google-cloud-datacatalog
      • Set up the authentication with service account key and necessary permission for BigQuery and Data Catalog:
        export GOOGLE_APPLICATION_CREDENTIALS="path/to/your/service-account-key.json"
      • Sample guideline of Python Script to Automate Glossary Management below:
      • The code complexity will depend on how you compose it on your project, here is a sample code that might help you:

 

 

from google.cloud import bigquery
from google.cloud import datacatalog_v1
from google.protobuf import field_mask_pb2

# Initialize BigQuery and Data Catalog clients
bq_client = bigquery.Client()
datacatalog_client = datacatalog_v1.DataCatalogClient()

# Define parameters
project_id = 'your-project-id'
dataset_id = 'your-dataset-id'
table_id = 'your-table-id'
entry_group_id = 'your-entry-group-id'
location = 'us-central1'

# BigQuery SQL to fetch terms
sql_query = f"""
SELECT term, definition
FROM `{project_id}.{dataset_id}.{table_id}`
"""

# Execute BigQuery query
query_job = bq_client.query(sql_query)
results = query_job.result()

# Define the parent entry group path
parent = f"projects/{project_id}/locations/{location}/entryGroups/{entry_group_id}"

# Process each row from BigQuery and create/update glossary entries
for row in results:
    term = row['term']
    definition = row['definition']
    entry_id = term.replace(' ', '_')  # Create a simple entry ID

    # Define the glossary entry
    glossary_entry = {
        'display_name': term,
        'description': definition,
        'entry_group': entry_group_id
    }

    # Create or update the entry
    entry_name = f"projects/{project_id}/locations/{location}/entryGroups/{entry_group_id}/entries/{entry_id}"

    try:
        # Check if the entry already exists
        existing_entry = datacatalog_client.get_entry(name=entry_name)
        print(f"Entry '{term}' already exists. Updating...")
        existing_entry.description = definition

        # Update the existing entry
        update_mask = field_mask_pb2.FieldMask(paths=['description'])
        response = datacatalog_client.update_entry(entry=existing_entry, update_mask=update_mask)
        print(f"Updated entry: {response.name}")

    except Exception as e:
        # If entry does not exist, create a new one
        print(f"Creating new entry '{term}'...")
        response = datacatalog_client.create_entry(parent=parent, entry_id=entry_id, entry=glossary_entry)
        print(f"Created entry: {response.name}")

 

 

                          Apache Airflow with Custom Operators:

      • The code complexity will depend on how you compose it on your project, here is a sample code that might help you, a custom operator
        BigQueryToDataCatalogOperator
        is needed to query a BigQuery table and manage glossary entries in Google Cloud Data Catalog:
          • Install Apache Airflow:
            pip install apache-airflow
          • Install Google Cloud Libraries:
            pip install google-cloud-bigquery google-cloud-datacatalog
          • Create a file named custom_operators.py in your Airflow plugins directory (e.g., airflow/plugins/)
          • Sample guideline of Apache Airflow with Custom Operator below:

 

 

from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from google.cloud import bigquery
from google.cloud import datacatalog_v1
from google.protobuf import field_mask_pb2

class BigQueryToDataCatalogOperator(BaseOperator):
    @apply_defaults
    def __init__(self, project_id, dataset_id, table_id, entry_group_id, location, *args, **kwargs):
        super(BigQueryToDataCatalogOperator, self).__init__(*args, **kwargs)
        self.project_id = project_id
        self.dataset_id = dataset_id
        self.table_id = table_id
        self.entry_group_id = entry_group_id
        self.location = location

    def execute(self, context):
        bq_client = bigquery.Client()
        datacatalog_client = datacatalog_v1.DataCatalogClient()

        sql_query = f"""
        SELECT term, definition
        FROM `{self.project_id}.{self.dataset_id}.{self.table_id}`
        """

        query_job = bq_client.query(sql_query)
        results = query_job.result()

        parent = f"projects/{self.project_id}/locations/{self.location}/entryGroups/{self.entry_group_id}"

        for row in results:
            term = row['term']
            definition = row['definition']
            entry_id = term.replace(' ', '_')

            glossary_entry = {
                'display_name': term,
                'description': definition,
                'entry_group': self.entry_group_id
            }

            entry_name = f"projects/{self.project_id}/locations/{self.location}/entryGroups/{self.entry_group_id}/entries/{entry_id}"

            try:
                existing_entry = datacatalog_client.get_entry(name=entry_name)
                self.log.info(f"Entry '{term}' already exists. Updating...")
                existing_entry.description = definition

                update_mask = field_mask_pb2.FieldMask(paths=['description'])
                response = datacatalog_client.update_entry(entry=existing_entry, update_mask=update_mask)
                self.log.info(f"Updated entry: {response.name}")

            except Exception:
                self.log.info(f"Creating new entry '{term}'...")
                response = datacatalog_client.create_entry(parent=parent, entry_id=entry_id, entry=glossary_entry)
                self.log.info(f"Created entry: {response.name}")

 

 

I hope the above information is helpful.