Hi folks,
I'm using the Flink PubSub connector library (https://github.com/apache/flink-connector-gcp-pubsub) and I seem to be observing that every ~hour or so I am seeing calls to pull messages from PubSub timing out. I started with a timeout of 1s and bumped it to ~5s and I am able to get past the timeouts (so my Flink app doesn't restart) but I do see my end to end event lag go up to 2-3s every hour or so. This is better than the timeouts but it would be nice to eliminate so I thought I'd reach out for ideas. Our jobs are running on DataProc VMs and using a PubSub subscription / topic in the same project / region.
The PubSub lib is setting up the subscription and credentials as follows:
public PubSubSource<OUT> build() throws IOException {
if (credentials == null) {
credentials = defaultCredentialsProviderBuilder().build().getCredentials();
}
if (pubSubSubscriberFactory == null) {
pubSubSubscriberFactory =
new DefaultPubSubSubscriberFactory(
ProjectSubscriptionName.format(projectName, subscriptionName),
3,
Duration.ofSeconds(15),
100);
}
return new PubSubSource<>(
deserializationSchema,
pubSubSubscriberFactory,
credentials,
new AcknowledgeOnCheckpointFactory(),
new GuavaFlinkConnectorRateLimiter(),
messagePerSecondRateLimit);
}
}
If it helps, the Netty connection setup code is here:
class DefaultPubSubSubscriberFactory implements PubSubSubscriberFactory {
...
@Override
public PubSubSubscriber getSubscriber(Credentials credentials) throws IOException {
ManagedChannel channel =
NettyChannelBuilder.forTarget(SubscriberStubSettings.getDefaultEndpoint())
.negotiationType(NegotiationType.TLS)
.sslContext(GrpcSslContexts.forClient().ciphers(null).build())
.build();
PullRequest pullRequest =
PullRequest.newBuilder()
.setMaxMessages(maxMessagesPerPull)
.setSubscription(projectSubscriptionName)
.build();
SubscriberGrpc.SubscriberBlockingStub stub =
SubscriberGrpc.newBlockingStub(channel)
.withCallCredentials(MoreCallCredentials.from(credentials));
return new BlockingGrpcPubSubSubscriber(
projectSubscriptionName, channel, stub, pullRequest, retries, timeout);
}
}
I was wondering if there is some credential refresh or some connection settings that are missing that could explain this behavior.
By default it does seem to be using some built-in PubSub code to refresh creds and iiuc this is using ADC to connect and refresh. We are writing data out to BigTable in the job and we're not seeing any such issues there. (Though I don't know if this credentials hunch of mine is incorrect)
Thanks,
Piyush
Hi @piyush_zlai,
Welcome to the Google Cloud Community!
It looks like you're facing hourly latency spikes and timeouts when your Flink job pulls messages from Google Cloud Pub/Sub. Even with increased timeout settings, pulls still fail and event processing lags.
Here are the potential ways that might help with your use case:
In addition, you might find it helpful to check this case to address your issue. It offers a suggested approach to address the issue you encountered.
Was this helpful? If so, please accept this answer as “Solution”. If you need additional assistance, reply here within 2 business days and I’ll be happy to help.