I'm facing the below issue whilst running the GCP Dataflow cron job. I've a cron job running every 20 minutes which fetches the data from the BigQuery table. When I've a single entry in the table, it fives no error. But when I've multiple entries in the table then it'll throw the below error. It doesn't throw any error when I run the tests in local. Any suggestions would be helpful and much appreciated.
java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton view. Consider using Combine.globally().asSingleton() to combine the PCollection into a single value
at org.apache.beam.sdk.transforms.View$SingletonCombineFn.apply ( org/apache.beam.sdk.transforms/View.java:464 ) at org.apache.beam.sdk.transforms.Combine$BinaryCombineFn.addInput ( org/apache.beam.sdk.transforms/Combine.java:528 ) at org.apache.beam.sdk.transforms.Combine$BinaryCombineFn.addInput ( org/apache.beam.sdk.transforms/Combine.java:497 ) at org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillCombiningState.add ( org/apache.beam.runners.dataflow.worker/WindmillStateInternals.java:2067 ) at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SystemReduceFn.processValue ( org/apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core/SystemReduceFn.java:119 )...........
PCollection<Long> cronPCollection = getCronJobPCollection(pipeline, options); PCollectionView<List<Pixel>> pixels = getPixelsSideInput(cronPCollection, options); pipeline.apply("Read Events", PubsubIO.readMessages().fromSubscription(options.getPurchasesSubscription())) .apply("Unpack Purchase Events", ParDo.of(new UnpackPurchaseMessageFn())) .apply("Publish Purchase Events", ParDo.of(new EventAndSendFn(pixels, sites)).withSideInputs(pixels, sites)); public static PCollection<Long> getCronJobPCollection(Pipeline pipeline, PipelineOptions options) { return pipeline .apply("Cron Trigger", GenerateSequence.from(0).withRate(1, Duration.standardMinutes(options.getMetadataRefreshMinutes()))) .apply("Cron Window", Window.<Long>into(new GlobalWindows()) .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane())) .discardingFiredPanes()); } public static PCollectionView<List<Pixel>> getPixelsSideInput(PCollection<Long> cronPCollection, PipelineOptions options) { return cronPCollection .apply("Read Pixel Configs", ParDo.of(new ReadPixelsFromBQFn(options.getProject()))) .apply("Transform Pixel Configs", View.asSingleton()); } static class ReadPixelsFromBQFn extends DoFn<Long, List<Pixel>> { private final String project; public ReadPixelsFromBQFn(String project) { this.project = project; } @ProcessElement public void processElement(@Element Long in, OutputReceiver<List<Pixel>> out) { TableResult result = BigQueryUtil.readPixelsFromBigQuery(this.project); // This method BigQueryUtil.convertBQRowsToPixels(result) returns List<Pixel> out.output(BigQueryUtil.convertBQRowsToPixels(result)); } }