Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

PubSub periodic timeouts / latency spikes in Flink

Hi folks,

I'm using the Flink PubSub connector library (https://github.com/apache/flink-connector-gcp-pubsub) and I seem to be observing that every ~hour or so I am seeing calls to pull messages from PubSub timing out. I started with a timeout of 1s and bumped it to ~5s and I am able to get past the timeouts (so my Flink app doesn't restart) but I do see my end to end event lag go up to 2-3s every hour or so. This is better than the timeouts but it would be nice to eliminate so I thought I'd reach out for ideas. Our jobs are running on DataProc VMs and using a PubSub subscription / topic in the same project / region.

The PubSub lib is setting up the subscription and credentials as follows:

public PubSubSource<OUT> build() throws IOException {
            if (credentials == null) {
                credentials = defaultCredentialsProviderBuilder().build().getCredentials();
            }

            if (pubSubSubscriberFactory == null) {
                pubSubSubscriberFactory =
                        new DefaultPubSubSubscriberFactory(
                                ProjectSubscriptionName.format(projectName, subscriptionName),
                                3,
                                Duration.ofSeconds(15),
                                100);
            }

            return new PubSubSource<>(
                    deserializationSchema,
                    pubSubSubscriberFactory,
                    credentials,
                    new AcknowledgeOnCheckpointFactory(),
                    new GuavaFlinkConnectorRateLimiter(),
                    messagePerSecondRateLimit);
        }
    }

 

If it helps, the Netty connection setup code is here:

class DefaultPubSubSubscriberFactory implements PubSubSubscriberFactory {
...
    @Override
    public PubSubSubscriber getSubscriber(Credentials credentials) throws IOException {
        ManagedChannel channel =
                NettyChannelBuilder.forTarget(SubscriberStubSettings.getDefaultEndpoint())
                        .negotiationType(NegotiationType.TLS)
                        .sslContext(GrpcSslContexts.forClient().ciphers(null).build())
                        .build();

        PullRequest pullRequest =
                PullRequest.newBuilder()
                        .setMaxMessages(maxMessagesPerPull)
                        .setSubscription(projectSubscriptionName)
                        .build();
        SubscriberGrpc.SubscriberBlockingStub stub =
                SubscriberGrpc.newBlockingStub(channel)
                        .withCallCredentials(MoreCallCredentials.from(credentials));
        return new BlockingGrpcPubSubSubscriber(
                projectSubscriptionName, channel, stub, pullRequest, retries, timeout);
    }
}

I was wondering if there is some credential refresh or some connection settings that are missing that could explain this behavior. 

By default it does seem to be using some built-in PubSub code to refresh creds and iiuc this is using ADC to connect and refresh. We are writing data out to BigTable in the job and we're not seeing any such issues there. (Though I don't know if this credentials hunch of mine is incorrect)

Thanks,

Piyush

0 1 53
1 REPLY 1

Hi @piyush_zlai,

Welcome to the Google Cloud Community!

It looks like you're facing hourly latency spikes and timeouts when your Flink job pulls messages from Google Cloud Pub/Sub. Even with increased timeout settings, pulls still fail and event processing lags.

Here are the potential ways that might help with your use case:

  • Monitor Network Performance from DataProc VM: From your DataProc VM, try running ‘ping pubsub.googleapis.com’ and ‘mtr pubsub.googleapis.com’ at regular intervals. Watch for any packet loss or spikes in latency that line up with your Pub/Sub delays. This can help you pinpoint potential network-related issues.
  • Monitor your Flink Metrics: You may want to monitor your Flink Source’s ‘currentEventTimeLag’ and watermark metrics to see if the lag aligns with your Pub/Sub delays. At the same time, check your ‘task_manager_heap_memory_usage’ and ‘task_manager_cpu_utilization’ to rule out any Flink-side resource issues.

In addition, you might find it helpful to check this case to address your issue. It offers a suggested approach to address the issue you encountered.

Was this helpful? If so, please accept this answer as “Solution”. If you need additional assistance, reply here within 2 business days and I’ll be happy to help.