I would like to use Dataflow SQL with Source and Destination Pub/sub, windowing by a timestamp field on the message, rather than the published/processing time.
The Dataflow SQL streaming extensions docs do indicate that the event_timestamp field must be used when pub/sub is the source. However, when providing a past event_timestamp on messages, it seems to be ignored and Dataflow SQL just windows the message in real-time.
Is this use case supported and I'm just doing something wrong? Or are the docs implicitly saying that Pub/sub-sourced Dataflow SQL Jobs only window on message publish/processing time, not on a message-provided timestamp?
Publish Payload
{
"ride_id": "66302456-104c-46c0-b41b-c9fa4a5ce7eb",
"point_idx": 427,
"latitude": 40.758140000000004,
"longitude": -73.99975,
"event_timestamp": "2022-12-07T08:12:08.42402-05:00",
"meter_reading": 12.797783,
"meter_increment": 0.02997139,
"ride_status": "pickup",
"passenger_count": 2
}
DataFlow SQL
SELECT
window_start as period_start,
SUM(tr.passenger_count) AS pickup_count
FROM TUMBLE((SELECT * FROM pubsub.topic.`project-id`.`topic`),
DESCRIPTOR(event_timestamp),
"INTERVAL 1 MINUTE") as tr
GROUP BY tr.window_start
or
SELECT
TUMBLE_START('INTERVAL 1 MINUTE') as period_start,
SUM(passenger_count) AS pickup_count
FROM pubsub.topic.`project-id`.`topic`
WHERE ride_status = "pickup"
GROUP BY TUMBLE(event_timestamp, 'INTERVAL 1 MINUTE')
Outputted payload with Real-time window start of 12/11 instead of 12/7 provided on the message on event_timestamp
{"period_start":"2022-12-11T16:05:00.000Z","pickup_count":2}
Source Pubsub Schema (assigned when creating the topic)
syntax = "proto3";
message SomeMessage {
string ride_id = 1;
uint32 point_idx = 2;
double latitude = 3;
double longitude = 4;
string event_timestamp = 5;
double meter_reading = 6;
double meter_increment = 7;
string ride_status = 8;
uint32 passenger_count = 9;
}
Source Topic Data Catalog Schema
gcloud data-catalog entries update <source topic entry id> --location=global --entry-group=@pubsub --schema="ride_id=STRING,point_idx=INT64,latitude=FLOAT,longitude=FLOAT,event_timestamp=TIMESTAMP,meter_reading=FLOAT,meter_increment=FLOAT,ride_status=STRING,passenger_count=INT64"
Destination Topic Data Catalog Schema
gcloud data-catalog entries update <destination topic entry id> --location=global --entry-group=@pubsub --schema="period_start=TIMESTAMP,pickup_count=INT64"
Thank you for any insight
Hi,
I tried to reproduce this behavior but I'm having issues with the schema. Can you provide the schema you used for this (pubsub/data catalog)?
@ricconoel, I've updated the post with the schemas used.
Hi,
I did reproduce your issue. This is an expected behavior since event_timestamps is not modifyable. What I could suggest is to create a new field that accepts a timestamp and do your calculations on the new field.