Healthcare Data Engine(HDE) is a popular GCP based solution built on Cloud Healthcare APIs (CHC API) to help healthcare stakeholders transition to FHIR and promote interoperability between multiple data origination sources. HDE provides users the ability to run Mapping pipelines which helps in converting healthcare data into FHIR and Reconciliation pipelines helps form a longitudinal patient record view.
Both the Mapping and Reconciliation pipelines use Mapping configs or reconciliation rules stored in GCS buckets. Every time these configs change, HDE users are required to re-create the pipelines for the new changes to take effect.
This article talks about how to use Cloud Run Functions to auto re-create the HDE pipelines whenever there is a change to configs stored in GCS buckets.
We will set up a Cloud Run Function to be triggered whenever a file is uploaded to the GCS bucket where the configs are stored. Cloud Run Function will delete the existing HDE pipeline and create a new one using the latest changes by making Cloud Healthcare API calls. In this article we will use changes to Reconciliation rules as an example to recreate the Reconciliation Pipeline, but the same approach can be followed for Mapping changes (Whistle and Data Mapper) as well, i.e. Delete the existing Mapping pipeline and create a new one using the latest mapping configs. Cloud Run Function will be written in Python.
This article assumes that the user has CHC APIs enabled, HDE pipelines running and GCS buckets to store configs already created. Please refer to HDE docs for steps on enabling/creating any of these if not already created.
Step 1 - Enable Cloud Run Functions (if not already enabled) and click on ‘CREATE FUNCTION’
Step 2 - Use the below snapshot as reference and set up the ‘Cloud Run Function’ configuration to be triggered when a file is uploaded to GCS bucket. ‘Bucket’ field should point to the GCS bucket where the Configs are stored and will be used while creating the pipeline.
Step 3 - Use the below folder/file structure and code snippets to create a Python based Cloud Run Function
Cloud Run Function:
|_ requirements.txt
|_ config.py
|_ main.py
requirements.txt functions-framework==3.* requests google-auth-oauthlib google-auth
config.py PROJECT_ID = "" LOCATION = "" DATASET_NAME = "" PIPELINE_NAME = "" URI = "" IMPORT_URI_PREFIX = "" MATCHING_URI_PREFIX = "" FHIR_STORE_NAME = "" TARGET_SUBFOLDER = ''
main.py from config import PROJECT_ID, LOCATION, DATASET_NAME, PIPELINE_NAME, URI, IMPORT_URI_PREFIX, MATCHING_URI_PREFIX, FHIR_STORE_NAME, TARGET_SUBFOLDER import functions_framework import json import requests import google.auth import google.auth.transport.requests from google.auth.exceptions import DefaultCredentialsError from requests.exceptions import RequestException # Triggered by a change in a storage bucket @functions_framework.cloud_event def recreate_hde_pipelines(cloud_event): data = cloud_event.data file_path = data['name'] # Check if the file was uploaded to the exact sub folder where mapping/recon rules are stored if file_path.startswith(TARGET_SUBFOLDER): DELETE_API_ENDPOINT = f"https://healthcare.googleapis.com/v1alpha2/projects/{PROJECT_ID}/locations/{LOCATION}/datasets/{DATASET_NAME}/pipelineJobs/{PIPELINE_NAME}" CREATE_API_ENDPOINT = f"https://healthcare.googleapis.com/v1alpha2/projects/{PROJECT_ID}/locations/{LOCATION}/datasets/{DATASET_NAME}/pipelineJobs?pipelineJobId={PIPELINE_NAME}" recon_config = { "name": PIPELINE_NAME, "disableLineage": "true", "reconciliationPipelineJob": { "mergeConfig": { "whistleConfigSource": { "uri": URI, "importUriPrefix": IMPORT_URI_PREFIX } }, "matchingUriPrefix": MATCHING_URI_PREFIX, "fhirStoreDestination": "projects/"+PROJECT_ID+"/locations/"+LOCATION+"/datasets/"+DATASET_NAME+"/fhirStores/"+FHIR_STORE_NAME } } # Authentication (using Google's default credentials) try: creds, _ = google.auth.default() auth_req = google.auth.transport.requests.Request() creds.refresh(auth_req) bearer_token = creds.token print("Getting token...") except DefaultCredentialsError as e: return (f"Error obtaining credentials: {e}", 500) # Set headers for the API call headers = { "Authorization": f"Bearer {bearer_token}", "Content-Type": "application/json" # Adjust if needed } # Delete the existing pipeline try: print("Making DELETE API call...") response = requests.delete(DELETE_API_ENDPOINT, headers=headers) print("DELETE API Response: ", response.content) response.raise_for_status() # Raise exception for bad responses (4xx or 5xx) except RequestException as e: error_msg = f"DELETE API call failed: {e}" print(error_msg) if response is not None: error_msg += f" (Status code: {response.status_code})" return (error_msg, 500) # Internal Server Error print(f"{PIPELINE_NAME} pipeline deleted") # Create a new pipeline try: print("Making CREATE API call...") response = requests.post(CREATE_API_ENDPOINT, headers=headers, data=json.dumps(recon_config)) print("CREATE API Response: ", response.content) response.raise_for_status() # Raise exception for bad responses (4xx or 5xx) except RequestException as e: error_msg = f"CREATE API call failed: {e}" print(error_msg) if response is not None: error_msg += f" (Status code: {response.status_code})" return (error_msg, 500) # Internal Server Error return (f"Pipeline re-created", 200) # OK else: return('No Pipeline Recreation')
Code Explanation:
Step 4 - Once the above code has been added, go ahead and deploy the Cloud Run Function. This should now detect any file changes (Update/Insert) in the GCS bucket folders and trigger the Pipeline re creation process
HDE pipelines uses Mapping and Reconciliation process to convert healthcare data into FHIR and reconcile it to get Longitudinal Patient Record. Mapping or Reconciliation requirements can change over time and may require end users to re-create the corresponding pipelines. We can use the above mentioned simple steps to automate the pipeline re-creation process OR plugin this into any larger CICD workflows.