I am trying to automate the process of creation and management of business glossary, terms, synonyms.... This process is going to be automatic from a composer, which collects the information from a BigQuery table, interprets it and creates the terms and assigns them to the entries. I have been looking for a Python API that can do this or Airflow Operators, but I have not found any way to do it. Is there an alternative process that resembles it?
Hi @raprieto,
Welcome to Google Cloud Community!
With your automation project you can try the following options:
google-cloud-datacatalog
pip install google-cloud-bigquery google-cloud-datacatalog
export GOOGLE_APPLICATION_CREDENTIALS="path/to/your/service-account-key.json"
from google.cloud import bigquery
from google.cloud import datacatalog_v1
from google.protobuf import field_mask_pb2
# Initialize BigQuery and Data Catalog clients
bq_client = bigquery.Client()
datacatalog_client = datacatalog_v1.DataCatalogClient()
# Define parameters
project_id = 'your-project-id'
dataset_id = 'your-dataset-id'
table_id = 'your-table-id'
entry_group_id = 'your-entry-group-id'
location = 'us-central1'
# BigQuery SQL to fetch terms
sql_query = f"""
SELECT term, definition
FROM `{project_id}.{dataset_id}.{table_id}`
"""
# Execute BigQuery query
query_job = bq_client.query(sql_query)
results = query_job.result()
# Define the parent entry group path
parent = f"projects/{project_id}/locations/{location}/entryGroups/{entry_group_id}"
# Process each row from BigQuery and create/update glossary entries
for row in results:
term = row['term']
definition = row['definition']
entry_id = term.replace(' ', '_') # Create a simple entry ID
# Define the glossary entry
glossary_entry = {
'display_name': term,
'description': definition,
'entry_group': entry_group_id
}
# Create or update the entry
entry_name = f"projects/{project_id}/locations/{location}/entryGroups/{entry_group_id}/entries/{entry_id}"
try:
# Check if the entry already exists
existing_entry = datacatalog_client.get_entry(name=entry_name)
print(f"Entry '{term}' already exists. Updating...")
existing_entry.description = definition
# Update the existing entry
update_mask = field_mask_pb2.FieldMask(paths=['description'])
response = datacatalog_client.update_entry(entry=existing_entry, update_mask=update_mask)
print(f"Updated entry: {response.name}")
except Exception as e:
# If entry does not exist, create a new one
print(f"Creating new entry '{term}'...")
response = datacatalog_client.create_entry(parent=parent, entry_id=entry_id, entry=glossary_entry)
print(f"Created entry: {response.name}")
Apache Airflow with Custom Operators:
BigQueryToDataCatalogOperator
is needed to query a BigQuery table and manage glossary entries in Google Cloud Data Catalog:pip install apache-airflow
pip install google-cloud-bigquery google-cloud-datacatalog
custom_operators.py
in your Airflow plugins
directory (e.g., airflow/plugins/
)
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from google.cloud import bigquery
from google.cloud import datacatalog_v1
from google.protobuf import field_mask_pb2
class BigQueryToDataCatalogOperator(BaseOperator):
@apply_defaults
def __init__(self, project_id, dataset_id, table_id, entry_group_id, location, *args, **kwargs):
super(BigQueryToDataCatalogOperator, self).__init__(*args, **kwargs)
self.project_id = project_id
self.dataset_id = dataset_id
self.table_id = table_id
self.entry_group_id = entry_group_id
self.location = location
def execute(self, context):
bq_client = bigquery.Client()
datacatalog_client = datacatalog_v1.DataCatalogClient()
sql_query = f"""
SELECT term, definition
FROM `{self.project_id}.{self.dataset_id}.{self.table_id}`
"""
query_job = bq_client.query(sql_query)
results = query_job.result()
parent = f"projects/{self.project_id}/locations/{self.location}/entryGroups/{self.entry_group_id}"
for row in results:
term = row['term']
definition = row['definition']
entry_id = term.replace(' ', '_')
glossary_entry = {
'display_name': term,
'description': definition,
'entry_group': self.entry_group_id
}
entry_name = f"projects/{self.project_id}/locations/{self.location}/entryGroups/{self.entry_group_id}/entries/{entry_id}"
try:
existing_entry = datacatalog_client.get_entry(name=entry_name)
self.log.info(f"Entry '{term}' already exists. Updating...")
existing_entry.description = definition
update_mask = field_mask_pb2.FieldMask(paths=['description'])
response = datacatalog_client.update_entry(entry=existing_entry, update_mask=update_mask)
self.log.info(f"Updated entry: {response.name}")
except Exception:
self.log.info(f"Creating new entry '{term}'...")
response = datacatalog_client.create_entry(parent=parent, entry_id=entry_id, entry=glossary_entry)
self.log.info(f"Created entry: {response.name}")
I hope the above information is helpful.