Getting StreamWriterClosedException in BQ Storage Write Api

I have been using java service which write records to BQ using storage api. Its been running fine for almost a week and then suddenly i see below StreamWriterClosedException in application log.

 

	at bq.DataWriter.append(DataWriter.java:61)
	at bq.DataWriter$AppendCompleteCallback.onFailure(DataWriter.java:109)
	at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:67)
	at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1132)
	at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
	at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1270)
	at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1038)
	at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:808)
	at com.google.api.core.AbstractApiFuture$InternalSettableFuture.setException(AbstractApiFuture.java:94)
	at com.google.api.core.AbstractApiFuture.setException(AbstractApiFuture.java:76)
	at com.google.api.core.SettableApiFuture.setException(SettableApiFuture.java:51)
	at com.google.cloud.bigquery.storage.v1.StreamWriter.cleanupInflightRequests(StreamWriter.java:585)
	at com.google.cloud.bigquery.storage.v1.StreamWriter.appendLoop(StreamWriter.java:496)
	at com.google.cloud.bigquery.storage.v1.StreamWriter.access$1000(StreamWriter.java:51)
	at com.google.cloud.bigquery.storage.v1.StreamWriter$1.run(StreamWriter.java:221)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: com.google.cloud.bigquery.storage.v1.Exceptions$StreamWriterClosedException: FAILED_PRECONDITION: Connection is closed due to com.google.api.gax.rpc.InternalException: io.grpc.StatusRuntimeException: INTERNAL: An error occurred while verifying authorization. Entity: projects/project_id/datasets/dataset_name/tables/table_name/ Entity: projects/project_id/datasets/dataset_name/tables/table_name/_default
	at com.google.cloud.bigquery.storage.v1.StreamWriter.appendInternal(StreamWriter.java:327)
	at com.google.cloud.bigquery.storage.v1.StreamWriter.append(StreamWriter.java:287)
	at com.google.cloud.bigquery.storage.v1.JsonStreamWriter.append(JsonStreamWriter.java:166)
	at com.google.cloud.bigquery.storage.v1.JsonStreamWriter.append(JsonStreamWriter.java:109)

 

This is my build.gradle dependency for storage write api

implementation platform('com.google.cloud:libraries-bom:26.1.0')
implementation 'com.google.cloud:google-cloud-bigquerystorage'
implementation 'com.google.cloud:google-cloud-bigquery'

 I am using spring boot version 2.7.2

0 3 839
3 REPLIES 3

RC1
Bronze 4
Bronze 4

@subham611 

Is your java code using some credential file ? This issue is something related to authorization . Can you re check if some permission have been changed or something etc ? 

Also this issue might be that the session might have got have expired and you are retrying with the same and hence causing this issue. similar to this => https://stackoverflow.com/questions/57821150/dialogflow-com-google-api-gax-rpc-unauthenticatedexcept...

Can you share your code snippet when your write to BQ?

You can put your issue here : https://github.com/googleapis/java-bigquerystorage/issues

RC1
Bronze 4
Bronze 4

@subham611 

RC1_0-1665762894320.png

This might be the reasons for the error.
Whenever you encounter these , catch it and create a new writer stream and add

I am using a map to store DataWriter objects till application is running,

 public DataWriter getDataWriter(TableName tableName) throws Descriptors.DescriptorValidationException, IOException, InterruptedException {
DataWriter dataWriter = dataWriterMap.getOrDefault(tableName.getTable(), null);
if(dataWriter == null) {
synchronized (this) {
if(dataWriter == null) {
dataWriter = new DataWriter();
// One time initialization for the worker.
dataWriter.initialize(tableName);
dataWriterMap.put(tableName.getTable(), dataWriter);
}
}
}
return dataWriter;
}

@PreDestroy
public void cleanUp() {
// Final cleanup for the stream during worker teardown.
log.info("Cleaning up data writer");
dataWriterMap.forEach((tableName, dataWriter) -> dataWriter.cleanup());
}

and cleaning up on application shutdown.