Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

com.google.api.gax.rpc.UnknownException: io.grpc.StatusRuntimeException: UNKNOWN: Channel Pipeline:

When I'm trying to execute the Firestore query from dataflow job (apache beam) it's giving com.google.api.gax.rpc.UnknownException: io.grpc.StatusRuntimeException: UNKNOWN: Channel Pipeline: 

It's working fine with local emulator and standalone java application

Please help me.

addData and getData methods calling from dataflow job

public class PipelineReferenceClientFirestore implements ReferenceClient {

private static final Logger LOGGER = LoggerFactory.getLogger(PipelineReferenceClientFirestore.class);

private String projectId;

private String domain;

private String subCollection;

private Firestore firestore;

private CollectionReference collectionReference;


/**
* @PARAM id adding the in Firestore to specific documentId
* @PARAM data it's having Json data
*/
@Override
public void addData(String id, Map<String, Object> data) {
ApiFuture<WriteResult> future = null;
try {
PipelineReferenceDocument pipelineReferenceDocument = new PipelineReferenceDocument();
pipelineReferenceDocument.setDocumentId(id);
pipelineReferenceDocument.setDomain(domain);
pipelineReferenceDocument.setCollectionName(subCollection);
pipelineReferenceDocument.setDocument(data);
future = collectionReference.document(id).set(pipelineReferenceDocument);
Timestamp updatedTime = future.get().getUpdateTime();
if(LOGGER.isInfoEnabled()) {
LOGGER.info("updated timestamp : {}",updatedTime);
LOGGER.info("status : {}",future.isDone());
}
} catch (InterruptedException | ExecutionException e) {
Thread.currentThread().interrupt();
throw new ReferenceClientException("Query execution error", e);
}

}

/**
* @PARAM id fetching data from Firestore to specific documentId
* @return Map<String, Object> having Payload
*/
@Override
public Map<String, Object> getData(String id) {
PipelineReferenceDocument pipelineReferenceDocument = new PipelineReferenceDocument();
try {
ApiFuture<DocumentSnapshot> future = collectionReference.document(id).get();
DocumentSnapshot document = future.get();
if (document.exists()) {
pipelineReferenceDocument = document.toObject(PipelineReferenceDocument.class);
return pipelineReferenceDocument.getDocument();
} else {
throw new ReferenceClientException("documentId not found : "+ id);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ReferenceClientException("Firestore Connectivity Got interrupted", e);
} catch (ExecutionException e) {
throw new ReferenceClientException("Query execution error", e);
}
}

public static class Builder {
private String projectId;
private String domain;
private String subCollection;

public PipelineReferenceClientFirestore.Builder setDomain(String domain) {
this.domain = domain;
return this;
}

public PipelineReferenceClientFirestore.Builder setProjectId(String projectId) {
this.projectId = projectId;
return this;
}

public PipelineReferenceClientFirestore.Builder setLoadCollection(String subCollection) {
this.subCollection = subCollection;
return this;
}

public synchronized PipelineReferenceClientFirestore build() {
PipelineReferenceClientFirestore result = new PipelineReferenceClientFirestore();
result.projectId = this.projectId;
result.domain = this.domain;
result.subCollection = this.subCollection;
initializeFireStoreClient(result);
initializeCollections(result);
return result;
}

/**
* Method used to open the client with the already set connection variables.
*/
private void initializeFireStoreClient(PipelineReferenceClientFirestore client) {
FirestoreOptions firestoreOptions;
try {
if (client.firestore == null) {
FirestoreOptions.Builder optionsBuilder = FirestoreOptions.newBuilder().setProjectId(client.projectId);
if (System.getenv().containsKey("FIRESTORE_EMULATOR_HOST")) {
optionsBuilder.setHost(System.getenv().get("FIRESTORE_EMULATOR_HOST"));
optionsBuilder.setCredentials(NoCredentials.getInstance());
client.firestore = optionsBuilder.build().getService();
} else {
firestoreOptions = FirestoreOptions.getDefaultInstance().toBuilder()
.setProjectId(client.projectId)
.setCredentials(GoogleCredentials.getApplicationDefault())
.build();
client.firestore = firestoreOptions.getService();
}
}
} catch (IOException e) {
throw new ReferenceClientException("Firestore Connectivity not established", e);
}
}

/**
* @PARAM client is object for PipelineReferenceClientFirestore class
*/
private void initializeCollections(PipelineReferenceClientFirestore client) {
client.collectionReference = client.firestore.collection(PipelineReferenceClientConstants.PIPELINE_COLLECTION).document(PipelineReferenceClientConstants.PIPELINE_DOCUMENT).collection(PipelineReferenceClientConstants.DOMAIN_COLLECTION).document(client.domain).collection(client.subCollection);
}

}

0 1 2,755
1 REPLY 1

Hello,

Reading through your Code, it does NOT seem to me that you are using the Firestore in Native Mode Connector for Apache Beam[0]. Is this not a consideration for your use-case? Please let me know if my understanding is wrong.

As highlighted in this article[1], the Firestore in Native Mode connector for Apache Beam makes data processing easier for Firestore users. I think this may be a more appropriate and efficient way, if you are using the Firestore Database in Native Mode. There is even an implementation sample code provided in this link[2].

[0]https://beam.apache.org/releases/javadoc/2.32.0/org/apache/beam/sdk/io/gcp/firestore/FirestoreIO.htm...
[1]https://cloud.google.com/blog/products/databases/apache-beam-firestore-connector-released
[2]https://cloud.google.com/blog/topics/developers-practitioners/using-firestore-and-apache-beam-data-p...