This content, written by Dan Young, was initially posted in Looker Blog on Jun 20, 2017. The content is subject to limited support.
With the release of NiFi 1.2/1.3 a number of new processors were introduced, included in these are the Wait/Notify, and GCSObject processors. Using the Wait along with the Notify processor, you can hold up the processing of a particular flow until a "release signal" is stored in the Map Cache Server. These processors are very useful with a number of various ETL flows like calling a webservice to get data. Most web services return data in a paginated fashion. You’ll grab a “page” of data at a time and process, and then grab the next page and process, etc... Although you can use the processor for this sometimes, it’s really hard to setup the configuration properly so that you delay the merging long enough before continuing on with the flow.
The initial flow that we planned on applying these processors to was our Zendesk Chat service. Although the current flow has been working fine, it does require a dependency on a third party database sync process from Zendesk to our Redshift, which only runs once an hour. Being able to process the chats directly from the Zendesk allows quicker chat metrics as well as cutting out the middleman and removing an integration point. Now were able to load directly into Google BigQuery at a much quicker 10 minute cadence.
The high level overview of the flow is as follows:
I won't cover the details of the first step, triggering the flow, but essentially we use a combination of HandleHttpRequest and GenerateFlowFile processors to do this. The HandleHttpRequest allows us to process a given range of dates in an ad-hoc manner via a simple HTTP Post, whereas the GenerateFlowFile allows for scheduling the flow via a cron expression and/or run every X seconds/minutes/hours.
Once we have the date range, the next part is to get all the "chats" from the API. We don't know ahead of time how many chats/sessions we'll have, so we'll need to factor in the need to handle pagination and some sort of looping construct. The search Zendesk API returns a next_url if there are more chats for the given search. In that case, we'll need to loop around and get the next batch of sessions and the next_url. This will continue until the next_url is NULL. Below is the flow we're using to get all the chats for a given date range.
Since we need to be careful to not exceed the rate limit for the API, a delay is implemented that throttles each call to the search API. With each iteration, the HTTP response contains a results JSON array. The results array includes a url of the chat, starting timestamp, preview message, type attribute, and an id of the chat session. It's rather easy to extract out all the ids from the results array using JQ.
Next, all the "pages of chat sessions" are sent downstream to a MergeContent processor which will merge them into a single FlowFile. Once all the session ids are merged, we then immediately split the entire group via the SplitText processor into chunks of 50 (the Zendesk API limit). The SplitText processor provides some useful attributes that we'll use in the Wait processor. The key one in this flow is the fragment.count. Each FlowFile resulting from the split will have a fragment.index attribute which indicates the ordering of that file in the split, and a fragment.count which is the number of splits from the parent. For example, if we have 120 chat sessions to process, and we split those into 50 sessions per chunk, we will have three chunks. Each one of these FlowFiles will have a fragment.count that is equal to three, along with a fragment.index that corresponds to the an index value in the split. The original FlowFile is sent off to the Wait processor, which we'll discuss later. At this point, we've set the stage to use the bulk chat API to efficiently extract the chat messages.
All the split chunks are sent downstream for processing. Each "chunk of 50" is sent downstream through the flow, calling the Zendesk API to get all the chat messages for a given “chunk” of ids. A bit of JSON transformation is done in the ExecuteStreamCommand by calling JQ again. The output-stream is then compressed and written to Google Cloud Storage via the new PutGCSObject processor.
The success relationship of the PutGCSObject is routed to the Notify processor which in-turn updates the key/value in DistrbutedMapCacheServer. The key being updated in the cache is the Release Signal Identifier which in this case is the ${filename}, and the value being updated is Signal Counter Name. Since the same flow is used for both the sessions and the chat processing, we want to use an attribute for the Signal Counter Name and expression language to set that value at runtime. The delta being applied to the update is the Signal Counter Delta. So for each file written successfully, the Signal Counter Name is incremented by 1.
Now we need to back up a bit in the flow to the SplitText processor labelled "chunk chats" where we see a fork in the processing. As I mentioned earlier, the original FlowFile will be routed to the Wait processor. The Flowfile will sit and “wait” for the appropriate "release signal" to be written to the DistrbutedMapCacheServer before proceeding. Once the "release signal" is detected, indicating all the chunks/chats have been processed and written to Google Cloud Storage, the FlowFile will then be routed to the success queue. From here the flow continues where downstream processors will load the data into a Google BigQuery daily partitioned table.
In the event that one or more of the files are not successfully written to Google Cloud Storage, the original FlowFile in the wait queue will eventually (10 min default) expire. This will result in FlowFile being routed to the expired queue. The expired queue triggers a message being sent to an Amazon SNS Topic that will notify me with the FlowFile identifier. I’m also notified in the same way if any of the copies to Google Cloud Storage fail. Using the identifier, I can quickly and easily lookup the to see what the problems was.
The addition of the Wait/Notify processors along with the GCSObject processors further expands and extends the usefulness of NiFi. The flexibility of being able to adjust and change workflows on the fly in NiFi is amazing. We’re constantly finding additional ways to leverage NiFi to enhance our abilities to efficiently consolidate more data sources into Google BigQuery.