In a @Dimitri_Masin pondered: Could we use this approach to stream the full results of a Looker query back to BigQuery? In this post I’ll outline how to do just that leveraging the Action Hub’s Action API.
Once again, we’ll be using Google Cloud Functions to build the action. As a refresher:
Like AWS’s Lambda, Cloud Functions let you deploy code that gets executed based off of some event. With an action, you can push JSON containing data from Looker as well as user-defined form parameters to a Cloud Function endpoint. The Cloud Function then parses the JSON, extracts the relevant values, and calls on the BigQuery SDK to stream the results to BigQuery.
In this example, we’ll be creating three Cloud Functions – one for each endpoint in the Action API. Two of these Cloud Functions will just be returning (relatively) static JSON. Let’s get started!
Head over to the BigQuery Console and create a new dataset for your Looker Exports.
Now, create an empty table with a schema definition. In the schema, you should include columns which you plan to push to BigQuery from Looker, as well as a timestamp column called record_created_at
.
A quick note on naming conventions: This script assumes that the column names in your BigQuery table match the field names (not labels!) in the data that you’re exporting from Looker. Do not include the view name in your columns.
Here’s the table that we’ll be streaming data to in our example:
Follow the first three steps here to select your GCP project and enable API access for Cloud Functions and BigQuery.
We’ll start off with our first function. This will serve as the Action List Endpoint. This function will return static JSON that lists all the actions your hub exposes. Each listed action contains metadata instructing Looker how to execute your action.
def action_list(request):
r = request.get_json()
print(r) # so it shows up in logs
response = """
{
"integrations": [{
"name": "bigquery",
"label": "Send results to a BigQuery Table.",
"supported_action_types": ["query"],
"url": "",
"form_url": "",
"icon_data_uri": "https://uploads-us-west-2.insided.com/looker-en/attachment/base64_bb319a954eef4a99b72e60f9b295f123.png",
"required_fields": [],
"params": [{
"name": "api_key",
"label": "API Key",
"required": true
}],
"supported_formats": ["json"],
"supported_formattings": ["unformatted"],
"supported_visualization_formattings": ["noapply"]
}]
}
"""
return response
action_list
We’ll now create a new Cloud Function that will serve as the Action Form Endpoint.
If the action’s definition specifies a form, Looker will ask this endpoint for a form template that should be displayed to the end user to configure the action. Since Looker will ask this endpoint for the form every time it’s displayed, it’s possible to dynamically change the form based on whatever information you like. For example, in a chat application the form might dynamically list all the available chat channels that data can be sent to.
bigquery_action_form
requirements.txt
and include a line for google-cloud-bigquery==1.5.0
. This will allow you to use the BigQuery SDK in your function.main.py
, paste the following:import google.cloud.bigquery as bigquery
import json
def action_form(request):
client = bigquery.Client()
# collect list of available tables
dataset_ref = client.dataset('streaming_inserts')
tables_obj = list(client.list_tables(dataset_ref))
table_options = [{'name': table.table_id, 'label': table.table_id } for table in tables_obj]
form = """
[{
"name":"table",
"label":"Table",
"description":"Name of destination table.",
"type":"select",
"default":"no",
"required":true,
"options":""" + json.dumps(table_options) + """
}]
"""
return form
Be sure to substitute that name of your dataset in line 5.
Set ‘Function to execute’ to action_form
We’ll now create our last and most important function. This function will serve as the Action Execute Endpoint.
This is the endpoint that Looker will send the payload, form information, and other metadata in order to execute a given action. This is the main implementation of your action.
bigquery_action_execute
requirements.txt
and include a line for google-cloud-bigquery==1.5.0
. This will allow you to use the BigQuery SDK in your function.main.py
, paste the following:import google.cloud.bigquery as bigquery
import time
import datetime
import os
import json
def verify_key(api_key):
return api_key == os.environ.get('api_key')
def parse_request(request):
request_json = request.get_json()
print(request_json) # so it shows up in the logs
dataset_id = <name_of_your_dataset>
api_key = request_json['data']['api_key']
query_data = json.loads(request_json['attachment']['data'])
table_id = request_json['form_params']['table']
looker_scoped_fields = request_json['scheduled_plan']['query']['fields']
looker_fields = [field.split('.')[1] for field in looker_scoped_fields]
return dataset_id, api_key, query_data, table_id, looker_fields
def verify_fields(bq_fields, looker_fields):
# Verify that columns from looker are subset of BigQuery columns)
return set(looker_fields) < set(bq_fields)
def non_existant_bq_fields(bq_fields, looker_fields):
return list(set(looker_fields) - set(bq_fields))
def remove_namespace(data):
# Remove view namespace from Looker field names
return [dict([(l[0].split('.')[1],l[1]) for l in x.items()]) for x in data]
def bigquery_metadata(dataset_id, table_id):
client = bigquery.Client()
table_ref = client.dataset(dataset_id).table(table_id)
table = client.get_table(table_ref)
bq_fields = [field.name for field in table.schema]
return client, table, bq_fields
def stream_rows(client, table, rows):
errors = client.insert_rows(table, rows)
if errors:
print(errors) # so they show up in the logs
assert errors == []
def write_to_bigquery(request):
dataset_id, api_key, query_data, table_id, looker_fields = parse_request(request)
# Verify API key
if not verify_key(api_key):
return '{"looker": {"success": false, "validation_errors": {"Unauthorized"}}}'
client, table, bq_fields = bigquery_metadata(dataset_id, table_id)
# Verify that the fields from the Looker query exist in BigQuery
if not verify_fields(bq_fields, looker_fields):
fields_missing = non_existant_bq_fields(bq_fields, looker_fields)
return '{"looker": {"success": false, "validation_errors": {"field(s) ' + ', '.join(fields_missing) +' do not exist in table ' + table_id+'"}}}'
# Prepare the data to stream to BigQuery
data_to_stream = remove_namespace(query_data)
for row in data_to_stream:
row['record_created_at'] = datetime.datetime.fromtimestamp(int(time.time())).strftime('%Y-%m-%d %H:%M:%S')
# Stream the data to BigQuery
stream_rows(client, table, data_to_stream)
return '{"looker": {"success": true}}'
write_to_bigquery
.We will be using this api_key to authenticate the request with Looker. Save the api key for later.
action_list
function.action_list
function and paste in the URLS:response = """
{
"integrations": [{
"name": "bigquery",
"label": "Send results to a BigQuery Table.",
"supported_action_types": ["query"],
"url": "<action_execute function URL>",
"form_url": "<action_form function URL>",
....
Now that we’ve created our cloud functions, we are going to configure the Action in Looker.
The BigQuery Action can be accessed via the native Schedules interface. Create a query in the Explore section of Looker, and when you’re ready to send the results to BigQuery, click the gear icon and hit Send or Schedule. You’ll now notice Google BigQuery as one of your destination options.
Select the table you wish to export the data to, and hit send! You can now monitor the progress in the Scheduler History section of the admin panel.
As of now, it seems Cloud Functions functions cannot be called by default if the user is unauthorized.
I saw an error Your client does not have permission to get URL at “Add the Action to Your Looker Instance” step.
To avoid the error, you need to add allUsers as Cloud Functions Invoker from Cloud Functions > Permissions so that Looker can access to them.