Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton

 

I'm facing the below issue whilst running the GCP Dataflow cron job. I've a cron job running every 20 minutes which fetches the data from the BigQuery table. When I've a single entry in the table, it fives no error. But when I've multiple entries in the table then it'll throw the below error. It doesn't throw any error when I run the tests in local. Any suggestions would be helpful and much appreciated.

java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton view. Consider using Combine.globally().asSingleton() to combine the PCollection into a single value

at org.apache.beam.sdk.transforms.View$SingletonCombineFn.apply ( org/apache.beam.sdk.transforms/View.java:464 ) at org.apache.beam.sdk.transforms.Combine$BinaryCombineFn.addInput ( org/apache.beam.sdk.transforms/Combine.java:528 ) at org.apache.beam.sdk.transforms.Combine$BinaryCombineFn.addInput ( org/apache.beam.sdk.transforms/Combine.java:497 ) at org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillCombiningState.add ( org/apache.beam.runners.dataflow.worker/WindmillStateInternals.java:2067 ) at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SystemReduceFn.processValue ( org/apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core/SystemReduceFn.java:119 )...........

PCollection<Long> cronPCollection = getCronJobPCollection(pipeline, options);
    PCollectionView<List<Pixel>> pixels = getPixelsSideInput(cronPCollection, options);


    pipeline.apply("Read Events",
            PubsubIO.readMessages().fromSubscription(options.getPurchasesSubscription()))
            .apply("Unpack Purchase Events", ParDo.of(new UnpackPurchaseMessageFn()))
            .apply("Publish Purchase Events",
                    ParDo.of(new EventAndSendFn(pixels, sites)).withSideInputs(pixels, sites));

public static PCollection<Long> getCronJobPCollection(Pipeline pipeline,
                                                      PipelineOptions options) {
    return pipeline
            .apply("Cron Trigger",
                    GenerateSequence.from(0).withRate(1,
                            Duration.standardMinutes(options.getMetadataRefreshMinutes())))
            .apply("Cron Window", Window.<Long>into(new GlobalWindows())
                    .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
                    .discardingFiredPanes());
}

public static PCollectionView<List<Pixel>> getPixelsSideInput(PCollection<Long> cronPCollection,
                                                              PipelineOptions options) {
    return cronPCollection
            .apply("Read Pixel Configs", ParDo.of(new ReadPixelsFromBQFn(options.getProject())))
            .apply("Transform Pixel Configs", View.asSingleton());
}

 static class ReadPixelsFromBQFn extends DoFn<Long, List<Pixel>> {
    private final String project;

    public ReadPixelsFromBQFn(String project) {
        this.project = project;
    }

    @ProcessElement
    public void processElement(@Element Long in, OutputReceiver<List<Pixel>> out) {
   
        TableResult result = BigQueryUtil.readPixelsFromBigQuery(this.project);
    // This method BigQueryUtil.convertBQRowsToPixels(result) returns List<Pixel>
        out.output(BigQueryUtil.convertBQRowsToPixels(result));
    }
}
0 3 2,075
3 REPLIES 3