Hi folks,
I'm using the Flink PubSub connector library (https://github.com/apache/flink-connector-gcp-pubsub) and I seem to be observing that every ~hour or so I am seeing calls to pull messages from PubSub timing out. I started with a timeout of 1s and bumped it to ~5s and I am able to get past the timeouts (so my Flink app doesn't restart) but I do see my end to end event lag go up to 2-3s every hour or so. This is better than the timeouts but it would be nice to eliminate so I thought I'd reach out for ideas. Our jobs are running on DataProc VMs and using a PubSub subscription / topic in the same project / region.
The PubSub lib is setting up the subscription and credentials as follows:
public PubSubSource<OUT> build() throws IOException {
if (credentials == null) {
credentials = defaultCredentialsProviderBuilder().build().getCredentials();
}
if (pubSubSubscriberFactory == null) {
pubSubSubscriberFactory =
new DefaultPubSubSubscriberFactory(
ProjectSubscriptionName.format(projectName, subscriptionName),
3,
Duration.ofSeconds(15),
100);
}
return new PubSubSource<>(
deserializationSchema,
pubSubSubscriberFactory,
credentials,
new AcknowledgeOnCheckpointFactory(),
new GuavaFlinkConnectorRateLimiter(),
messagePerSecondRateLimit);
}
}
If it helps, the Netty connection setup code is here:
class DefaultPubSubSubscriberFactory implements PubSubSubscriberFactory {
...
@Override
public PubSubSubscriber getSubscriber(Credentials credentials) throws IOException {
ManagedChannel channel =
NettyChannelBuilder.forTarget(SubscriberStubSettings.getDefaultEndpoint())
.negotiationType(NegotiationType.TLS)
.sslContext(GrpcSslContexts.forClient().ciphers(null).build())
.build();
PullRequest pullRequest =
PullRequest.newBuilder()
.setMaxMessages(maxMessagesPerPull)
.setSubscription(projectSubscriptionName)
.build();
SubscriberGrpc.SubscriberBlockingStub stub =
SubscriberGrpc.newBlockingStub(channel)
.withCallCredentials(MoreCallCredentials.from(credentials));
return new BlockingGrpcPubSubSubscriber(
projectSubscriptionName, channel, stub, pullRequest, retries, timeout);
}
}
I was wondering if there is some credential refresh or some connection settings that are missing that could explain this behavior.
By default it does seem to be using some built-in PubSub code to refresh creds and iiuc this is using ADC to connect and refresh. We are writing data out to BigTable in the job and we're not seeing any such issues there. (Though I don't know if this credentials hunch of mine is incorrect)
Thanks,
Piyush