Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Auto recreate HDE pipelines on Mapping changes in GCS

ashwinshetty
Staff
Business scenario

Healthcare Data Engine(HDE) is a popular GCP based solution built on Cloud Healthcare APIs (CHC API) to help healthcare stakeholders transition to FHIR and promote interoperability between multiple data origination sources. HDE provides users the ability to run Mapping pipelines which helps in converting healthcare data into FHIR and Reconciliation pipelines helps form a longitudinal patient record view.

Both the Mapping and Reconciliation pipelines use Mapping configs or reconciliation rules stored in GCS buckets. Every time these configs change, HDE users are required to re-create the pipelines for the new changes to take effect.

This article talks about how to use Cloud Run Functions to auto re-create the HDE pipelines whenever there is a change to configs stored in GCS buckets.

What do we need

We will set up a Cloud Run Function to be triggered whenever a file is uploaded to the GCS bucket where the configs are stored. Cloud Run Function will delete the existing HDE pipeline and create a new one using the latest changes by making Cloud Healthcare API calls. In this article we will use changes to Reconciliation rules as an example to recreate the Reconciliation Pipeline, but the same approach can be followed for Mapping changes (Whistle and Data Mapper) as well, i.e. Delete the existing Mapping pipeline and create a new one using the latest mapping configs. Cloud Run Function will be written in Python.

Assumptions

This article assumes that the user has CHC APIs enabled, HDE pipelines running and GCS buckets to store configs already created. Please refer to HDE docs for steps on enabling/creating any of these if not already created.

Steps

Step 1 - Enable Cloud Run Functions (if not already enabled) and click on ‘CREATE FUNCTION’

image1.png

Step 2 - Use the below snapshot as reference and set up the ‘Cloud Run Function’ configuration to be triggered when a file is uploaded to GCS bucket. ‘Bucket’ field should point to the GCS bucket where the Configs are stored and will be used while creating the pipeline.

image2.png

 Step 3 - Use the below folder/file structure and code snippets to create a Python based Cloud Run Function

Cloud Run Function:

|_ requirements.txt

|_ config.py

|_ main.py

requirements.txt

functions-framework==3.*
requests
google-auth-oauthlib
google-auth
config.py

PROJECT_ID = ""
LOCATION = ""
DATASET_NAME = ""
PIPELINE_NAME = ""
URI = ""
IMPORT_URI_PREFIX = ""
MATCHING_URI_PREFIX = ""
FHIR_STORE_NAME = ""
TARGET_SUBFOLDER = ''

main.py

from config import PROJECT_ID, LOCATION, DATASET_NAME, PIPELINE_NAME, URI, IMPORT_URI_PREFIX, MATCHING_URI_PREFIX, FHIR_STORE_NAME, TARGET_SUBFOLDER
import functions_framework
import json
import requests
import google.auth
import google.auth.transport.requests
from google.auth.exceptions import DefaultCredentialsError
from requests.exceptions import RequestException

# Triggered by a change in a storage bucket
@functions_framework.cloud_event
def recreate_hde_pipelines(cloud_event):
   data = cloud_event.data
  
   file_path = data['name']

   # Check if the file was uploaded to the exact sub folder where mapping/recon rules are stored
   if file_path.startswith(TARGET_SUBFOLDER):
       DELETE_API_ENDPOINT = f"https://healthcare.googleapis.com/v1alpha2/projects/{PROJECT_ID}/locations/{LOCATION}/datasets/{DATASET_NAME}/pipelineJobs/{PIPELINE_NAME}"
       CREATE_API_ENDPOINT = f"https://healthcare.googleapis.com/v1alpha2/projects/{PROJECT_ID}/locations/{LOCATION}/datasets/{DATASET_NAME}/pipelineJobs?pipelineJobId={PIPELINE_NAME}"

       recon_config = {
           "name": PIPELINE_NAME,
           "disableLineage": "true",
           "reconciliationPipelineJob": {
           "mergeConfig": {
               "whistleConfigSource": {
               "uri": URI,
               "importUriPrefix": IMPORT_URI_PREFIX
               }
           },
           "matchingUriPrefix": MATCHING_URI_PREFIX,
           "fhirStoreDestination": "projects/"+PROJECT_ID+"/locations/"+LOCATION+"/datasets/"+DATASET_NAME+"/fhirStores/"+FHIR_STORE_NAME
           }
       }

       # Authentication (using Google's default credentials)
       try:
           creds, _ = google.auth.default()
           auth_req = google.auth.transport.requests.Request()
           creds.refresh(auth_req)
           bearer_token = creds.token
           print("Getting token...")
       except DefaultCredentialsError as e:
           return (f"Error obtaining credentials: {e}", 500)

       # Set headers for the API call
       headers = {
           "Authorization": f"Bearer {bearer_token}",
           "Content-Type": "application/json"  # Adjust if needed
       }
      
       # Delete the existing pipeline
       try:
           print("Making DELETE API call...")
           response = requests.delete(DELETE_API_ENDPOINT, headers=headers)
           print("DELETE API Response: ", response.content)
           response.raise_for_status()  # Raise exception for bad responses (4xx or 5xx)
       except RequestException as e:
           error_msg = f"DELETE API call failed: {e}"
           print(error_msg)
           if response is not None:
               error_msg += f" (Status code: {response.status_code})"
           return (error_msg, 500)  # Internal Server Error

       print(f"{PIPELINE_NAME} pipeline deleted")

       # Create a new pipeline
       try:
           print("Making CREATE API call...")
           response = requests.post(CREATE_API_ENDPOINT, headers=headers, data=json.dumps(recon_config))
           print("CREATE API Response: ", response.content)
           response.raise_for_status()  # Raise exception for bad responses (4xx or 5xx)
       except RequestException as e:
           error_msg = f"CREATE API call failed: {e}"
           print(error_msg)
           if response is not None:
               error_msg += f" (Status code: {response.status_code})"
           return (error_msg, 500)  # Internal Server Error
       return (f"Pipeline re-created", 200)  # OK
   else:
       return('No Pipeline Recreation')

Code Explanation:

  • We are storing all the variables or configurable fields in the `config.py` file
  • Required libraries are mentioned in `requirements.txt` file
  • In `main.py` we check if the file is uploaded to the sub folder that we want. This is especially useful in case we are using the same GCS bucket for storing other contents as well.
  • We first make DELETE API call to delete the existing pipeline and then make POST API call to create a new pipeline pointing to the config mentioned in ‘recon_config’
  • In case you want to set this up for the Mapping Pipeline, then add the corresponding mapping variables to ‘config.py’ and use a variable similar to ‘recon_config’ to declare mapping configs. This should be a separate Cloud Run Function.

Step 4 - Once the above code has been added, go ahead and deploy the Cloud Run Function. This should now detect any file changes (Update/Insert) in the GCS bucket folders and trigger the Pipeline re creation process

Conclusion

HDE pipelines uses Mapping and Reconciliation process to convert healthcare data into FHIR and reconcile it to get Longitudinal Patient Record. Mapping or Reconciliation requirements can change over time and may require end users to re-create the corresponding pipelines. We can use the above mentioned simple steps to automate the pipeline re-creation process OR plugin this into any larger CICD workflows.

Reference links

Cloud Healthcare API documentation

HDE docs

Cloud Storage Trigger

0 0 219
Authors