When I'm trying to execute the Firestore query from dataflow job (apache beam) it's giving com.google.api.gax.rpc.UnknownException: io.grpc.StatusRuntimeException: UNKNOWN: Channel Pipeline:
It's working fine with local emulator and standalone java application
Please help me.
addData and getData methods calling from dataflow job
public class PipelineReferenceClientFirestore implements ReferenceClient {
private static final Logger LOGGER = LoggerFactory.getLogger(PipelineReferenceClientFirestore.class);
private String projectId;
private String domain;
private String subCollection;
private Firestore firestore;
private CollectionReference collectionReference;
/**
* @PARAM id adding the in Firestore to specific documentId
* @PARAM data it's having Json data
*/
@Override
public void addData(String id, Map<String, Object> data) {
ApiFuture<WriteResult> future = null;
try {
PipelineReferenceDocument pipelineReferenceDocument = new PipelineReferenceDocument();
pipelineReferenceDocument.setDocumentId(id);
pipelineReferenceDocument.setDomain(domain);
pipelineReferenceDocument.setCollectionName(subCollection);
pipelineReferenceDocument.setDocument(data);
future = collectionReference.document(id).set(pipelineReferenceDocument);
Timestamp updatedTime = future.get().getUpdateTime();
if(LOGGER.isInfoEnabled()) {
LOGGER.info("updated timestamp : {}",updatedTime);
LOGGER.info("status : {}",future.isDone());
}
} catch (InterruptedException | ExecutionException e) {
Thread.currentThread().interrupt();
throw new ReferenceClientException("Query execution error", e);
}
}
/**
* @PARAM id fetching data from Firestore to specific documentId
* @return Map<String, Object> having Payload
*/
@Override
public Map<String, Object> getData(String id) {
PipelineReferenceDocument pipelineReferenceDocument = new PipelineReferenceDocument();
try {
ApiFuture<DocumentSnapshot> future = collectionReference.document(id).get();
DocumentSnapshot document = future.get();
if (document.exists()) {
pipelineReferenceDocument = document.toObject(PipelineReferenceDocument.class);
return pipelineReferenceDocument.getDocument();
} else {
throw new ReferenceClientException("documentId not found : "+ id);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ReferenceClientException("Firestore Connectivity Got interrupted", e);
} catch (ExecutionException e) {
throw new ReferenceClientException("Query execution error", e);
}
}
public static class Builder {
private String projectId;
private String domain;
private String subCollection;
public PipelineReferenceClientFirestore.Builder setDomain(String domain) {
this.domain = domain;
return this;
}
public PipelineReferenceClientFirestore.Builder setProjectId(String projectId) {
this.projectId = projectId;
return this;
}
public PipelineReferenceClientFirestore.Builder setLoadCollection(String subCollection) {
this.subCollection = subCollection;
return this;
}
public synchronized PipelineReferenceClientFirestore build() {
PipelineReferenceClientFirestore result = new PipelineReferenceClientFirestore();
result.projectId = this.projectId;
result.domain = this.domain;
result.subCollection = this.subCollection;
initializeFireStoreClient(result);
initializeCollections(result);
return result;
}
/**
* Method used to open the client with the already set connection variables.
*/
private void initializeFireStoreClient(PipelineReferenceClientFirestore client) {
FirestoreOptions firestoreOptions;
try {
if (client.firestore == null) {
FirestoreOptions.Builder optionsBuilder = FirestoreOptions.newBuilder().setProjectId(client.projectId);
if (System.getenv().containsKey("FIRESTORE_EMULATOR_HOST")) {
optionsBuilder.setHost(System.getenv().get("FIRESTORE_EMULATOR_HOST"));
optionsBuilder.setCredentials(NoCredentials.getInstance());
client.firestore = optionsBuilder.build().getService();
} else {
firestoreOptions = FirestoreOptions.getDefaultInstance().toBuilder()
.setProjectId(client.projectId)
.setCredentials(GoogleCredentials.getApplicationDefault())
.build();
client.firestore = firestoreOptions.getService();
}
}
} catch (IOException e) {
throw new ReferenceClientException("Firestore Connectivity not established", e);
}
}
/**
* @PARAM client is object for PipelineReferenceClientFirestore class
*/
private void initializeCollections(PipelineReferenceClientFirestore client) {
client.collectionReference = client.firestore.collection(PipelineReferenceClientConstants.PIPELINE_COLLECTION).document(PipelineReferenceClientConstants.PIPELINE_DOCUMENT).collection(PipelineReferenceClientConstants.DOMAIN_COLLECTION).document(client.domain).collection(client.subCollection);
}
}