Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Do Dataflow SQL streaming extensions support back-dated windowing?

I would like to use Dataflow SQL with Source and Destination Pub/sub, windowing by a timestamp field on the message, rather than the published/processing time.

The Dataflow SQL streaming extensions docs do indicate that the event_timestamp field must be used when pub/sub is the source. However, when providing a past event_timestamp on messages, it seems to be ignored and Dataflow SQL just windows the message in real-time. 

Is this use case supported and I'm just doing something wrong? Or are the docs implicitly saying that Pub/sub-sourced Dataflow SQL Jobs only window on message publish/processing time, not on a message-provided timestamp?

Example

Publish Payload

 

{
    "ride_id": "66302456-104c-46c0-b41b-c9fa4a5ce7eb",
    "point_idx": 427,
    "latitude": 40.758140000000004,
    "longitude": -73.99975,
    "event_timestamp": "2022-12-07T08:12:08.42402-05:00",
    "meter_reading": 12.797783,
    "meter_increment": 0.02997139,
    "ride_status": "pickup",
    "passenger_count": 2
}

 

DataFlow SQL

 

SELECT
  window_start as period_start,
  SUM(tr.passenger_count) AS pickup_count 
FROM TUMBLE((SELECT * FROM pubsub.topic.`project-id`.`topic`),
            DESCRIPTOR(event_timestamp),
            "INTERVAL 1 MINUTE") as tr
GROUP BY tr.window_start

 

or

 

SELECT 
	TUMBLE_START('INTERVAL 1 MINUTE') as period_start, 
	SUM(passenger_count) AS pickup_count 
FROM pubsub.topic.`project-id`.`topic` 
WHERE ride_status = "pickup" 
GROUP BY TUMBLE(event_timestamp, 'INTERVAL 1 MINUTE')

 

Outputted payload with Real-time window start of 12/11 instead of 12/7 provided on the message on event_timestamp

 

{"period_start":"2022-12-11T16:05:00.000Z","pickup_count":2}

 

Schemas

Source Pubsub Schema (assigned when creating the topic)

 

syntax = "proto3";

message SomeMessage {
    string ride_id = 1;
    uint32 point_idx = 2;
    double latitude = 3;
    double longitude = 4;
    string event_timestamp = 5;
    double meter_reading = 6;
    double meter_increment = 7;
    string ride_status = 8;
    uint32 passenger_count = 9;
}

 

Source Topic Data Catalog Schema

 

gcloud data-catalog entries update <source topic entry id> --location=global --entry-group=@pubsub --schema="ride_id=STRING,point_idx=INT64,latitude=FLOAT,longitude=FLOAT,event_timestamp=TIMESTAMP,meter_reading=FLOAT,meter_increment=FLOAT,ride_status=STRING,passenger_count=INT64"

 

Destination Topic Data Catalog Schema

 

gcloud data-catalog entries update <destination topic entry id> --location=global --entry-group=@pubsub --schema="period_start=TIMESTAMP,pickup_count=INT64"

 

 

Thank you for any insight

0 3 190
3 REPLIES 3

Hi,

I tried to reproduce this behavior but I'm having issues with the schema. Can you provide the schema you used for this (pubsub/data catalog)?

@ricconoel, I've updated the post with the schemas used.

Hi,

I did reproduce your issue. This is an expected behavior since event_timestamps is not modifyable. What I could suggest is to create a new field that accepts a timestamp and do your calculations on the new field.