Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton

 

I'm facing the below issue whilst running the GCP Dataflow cron job. I've a cron job running every 20 minutes which fetches the data from the BigQuery table. When I've a single entry in the table, it fives no error. But when I've multiple entries in the table then it'll throw the below error. It doesn't throw any error when I run the tests in local. Any suggestions would be helpful and much appreciated.

java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton view. Consider using Combine.globally().asSingleton() to combine the PCollection into a single value

at org.apache.beam.sdk.transforms.View$SingletonCombineFn.apply ( org/apache.beam.sdk.transforms/View.java:464 ) at org.apache.beam.sdk.transforms.Combine$BinaryCombineFn.addInput ( org/apache.beam.sdk.transforms/Combine.java:528 ) at org.apache.beam.sdk.transforms.Combine$BinaryCombineFn.addInput ( org/apache.beam.sdk.transforms/Combine.java:497 ) at org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillCombiningState.add ( org/apache.beam.runners.dataflow.worker/WindmillStateInternals.java:2067 ) at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SystemReduceFn.processValue ( org/apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core/SystemReduceFn.java:119 )...........

PCollection<Long> cronPCollection = getCronJobPCollection(pipeline, options);
    PCollectionView<List<Pixel>> pixels = getPixelsSideInput(cronPCollection, options);


    pipeline.apply("Read Events",
            PubsubIO.readMessages().fromSubscription(options.getPurchasesSubscription()))
            .apply("Unpack Purchase Events", ParDo.of(new UnpackPurchaseMessageFn()))
            .apply("Publish Purchase Events",
                    ParDo.of(new EventAndSendFn(pixels, sites)).withSideInputs(pixels, sites));

public static PCollection<Long> getCronJobPCollection(Pipeline pipeline,
                                                      PipelineOptions options) {
    return pipeline
            .apply("Cron Trigger",
                    GenerateSequence.from(0).withRate(1,
                            Duration.standardMinutes(options.getMetadataRefreshMinutes())))
            .apply("Cron Window", Window.<Long>into(new GlobalWindows())
                    .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
                    .discardingFiredPanes());
}

public static PCollectionView<List<Pixel>> getPixelsSideInput(PCollection<Long> cronPCollection,
                                                              PipelineOptions options) {
    return cronPCollection
            .apply("Read Pixel Configs", ParDo.of(new ReadPixelsFromBQFn(options.getProject())))
            .apply("Transform Pixel Configs", View.asSingleton());
}

 static class ReadPixelsFromBQFn extends DoFn<Long, List<Pixel>> {
    private final String project;

    public ReadPixelsFromBQFn(String project) {
        this.project = project;
    }

    @ProcessElement
    public void processElement(@Element Long in, OutputReceiver<List<Pixel>> out) {
   
        TableResult result = BigQueryUtil.readPixelsFromBigQuery(this.project);
    // This method BigQueryUtil.convertBQRowsToPixels(result) returns List<Pixel>
        out.output(BigQueryUtil.convertBQRowsToPixels(result));
    }
}
0 3 2,073
3 REPLIES 3

Hello,

Maybe you can try to use Combine.globally().asSingleton() to combine the PCollection into a single value as the message mentioned?

Java Combine.globally Examples  might be a good example.

Thanks for your reply. I'm a bit confused here where exactly to use the below Combine.globally().asSingleton as globally method expects argument / arguments. Could you help me to modify my below code and direct me where exactly the change would apply please?

 


public class App {
private static final Logger LOG = LoggerFactory.getLogger(App.class);

public static void main(String[] args) {
EventsHandlerDataflowPipelineOptions options = PipelineOptionsFactory
.fromArgs(args)
.withValidation()
.as(EventsHandlerDataflowPipelineOptions.class);

Pipeline pipeline = Pipeline.create(options);
PCollection<Long> cronPCollection = getCronJobPCollection(pipeline, options);
PCollectionView<List<Pixel>> pixels = getPixelsSideInput(cronPCollection, options);
PCollectionView<List<Site>> sites = getSitesSideInput(cronPCollection, options);


pipeline.apply("Read Purchase Events",
PubsubIO.readMessages().fromSubscription(options.getPurchasesSubscription()))
.apply("Unpack Purchase Events", ParDo.of(new UnpackPurchaseMessageFn()))
.apply("Publish Purchase Events",
ParDo.of(new ToEventAndSendFn(pixels, sites)).withSideInputs(pixels, sites));

pipeline.run();
}

public static PCollection<Long> getCronJobPCollection(Pipeline pipeline,
EventsHandlerDataflowPipelineOptions options) {
return pipeline
.apply("Cron Trigger",
GenerateSequence.from(0).withRate(1,
Duration.standardMinutes(options.getMetadataRefreshMinutes())))
.apply("Cron Window", Window.<Long>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
.discardingFiredPanes());
}

public static PCollectionView<List<Pixel>> getPixelsSideInput(PCollection<Long> cronPCollection,
EventsHandlerDataflowPipelineOptions options) {
return cronPCollection
.apply("Read Pixel Configs", ParDo.of(new ReadPixelsFromBQFn(options.getProject())))
.apply("Transform Pixel Configs", View.asSingleton());
}

public static PCollectionView<List<Site>> getSitesSideInput(PCollection<Long> cronPCollection,
EventsHandlerDataflowPipelineOptions options) {
return cronPCollection
.apply("Read Site Details", ParDo.of(new ReadSitesFromBQFn(options.getProject())))
.apply("Transform Site Details", View.asSingleton());
}

static class ReadPixelsFromBQFn extends DoFn<Long, List<Pixel>> {
private final String project;

public ReadPixelsFromBQFn(String project) {
this.project = project;
}

@ProcessElement
public void processElement(@Element Long in, OutputReceiver<List<Pixel>> out) {
TableResult result = BigQueryUtil.readPixelsFromBigQuery(this.project);
out.output(BigQueryUtil.convertBQRowsToPixels(result));
}
}

static class ReadSitesFromBQFn extends DoFn<Long, List<Site>> {
private final String project;

public ReadSitesFromBQFn(String project) {
this.project = project;
}

@ProcessElement
public void processElement(@Element Long in, OutputReceiver<List<Site>> out) {
TableResult result = BigQueryUtil.readSitesFromBigQuery(this.project);
out.output(BigQueryUtil.convertBQRowsToSites(result));
}
}

static class UnpackSessionMessageFn extends DoFn<PubsubMessage, IntermediateEvent> {
@ProcessElement
public void processElement(@Element PubsubMessage in, OutputReceiver<IntermediateEvent> out) {
out.output(new IntermediateEvent(in));
}
}

static class UnpackPurchaseMessageFn extends DoFn<PubsubMessage, IntermediateEvent> {
@ProcessElement
public void processElement(@Element PubsubMessage in, OutputReceiver<IntermediateEvent> out) {
//TODO log should be removed after testing
out.output(new IntermediateEvent(in, true));
}
}

static class ToEventAndSendFn extends DoFn<IntermediateEvent, Void> {
private final PCollectionView<List<Pixel>> pixels;
private final PCollectionView<List<Site>> sites;

//add constructor with side input so it can be accessed in static class
public ToEventAndSendFn(PCollectionView<List<Pixel>> pixels, PCollectionView<List<Site>> sites) {
this.pixels = pixels;
this.sites = sites;
}

@ProcessElement
public void processElement(@Element IntermediateEvent in, OutputReceiver<Void> out, ProcessContext c) {
out.output(Transformer.createEventAndSendToApi(in,
c.sideInput(this.pixels), c.sideInput(this.sites)));
}
}
}

  

Thanks for your reply. I'm a bit confused here where exactly to use the Combine.globally().asSingleton as globally method expects argument / arguments. Could you help me to modify my below code and direct me where exactly the change would apply please?

 


public class App {
private static final Logger LOG = LoggerFactory.getLogger(App.class);

public static void main(String[] args) {
EventsHandlerDataflowPipelineOptions options = PipelineOptionsFactory
.fromArgs(args)
.withValidation()
.as(EventsHandlerDataflowPipelineOptions.class);

Pipeline pipeline = Pipeline.create(options);
PCollection<Long> cronPCollection = getCronJobPCollection(pipeline, options);
PCollectionView<List<Pixel>> pixels = getPixelsSideInput(cronPCollection, options);
PCollectionView<List<Site>> sites = getSitesSideInput(cronPCollection, options);


pipeline.apply("Read Purchase Events",
PubsubIO.readMessages().fromSubscription(options.getPurchasesSubscription()))
.apply("Unpack Purchase Events", ParDo.of(new UnpackPurchaseMessageFn()))
.apply("Publish Purchase Events",
ParDo.of(new ToEventAndSendFn(pixels, sites)).withSideInputs(pixels, sites));

pipeline.run();
}

public static PCollection<Long> getCronJobPCollection(Pipeline pipeline,
EventsHandlerDataflowPipelineOptions options) {
return pipeline
.apply("Cron Trigger",
GenerateSequence.from(0).withRate(1,
Duration.standardMinutes(options.getMetadataRefreshMinutes())))
.apply("Cron Window", Window.<Long>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
.discardingFiredPanes());
}

public static PCollectionView<List<Pixel>> getPixelsSideInput(PCollection<Long> cronPCollection,
EventsHandlerDataflowPipelineOptions options) {
return cronPCollection
.apply("Read Pixel Configs", ParDo.of(new ReadPixelsFromBQFn(options.getProject())))
.apply("Transform Pixel Configs", View.asSingleton());
}

public static PCollectionView<List<Site>> getSitesSideInput(PCollection<Long> cronPCollection,
EventsHandlerDataflowPipelineOptions options) {
return cronPCollection
.apply("Read Site Details", ParDo.of(new ReadSitesFromBQFn(options.getProject())))
.apply("Transform Site Details", View.asSingleton());
}

static class ReadPixelsFromBQFn extends DoFn<Long, List<Pixel>> {
private final String project;

public ReadPixelsFromBQFn(String project) {
this.project = project;
}

@ProcessElement
public void processElement(@Element Long in, OutputReceiver<List<Pixel>> out) {
TableResult result = BigQueryUtil.readPixelsFromBigQuery(this.project);
out.output(BigQueryUtil.convertBQRowsToPixels(result));
}
}

static class ReadSitesFromBQFn extends DoFn<Long, List<Site>> {
private final String project;

public ReadSitesFromBQFn(String project) {
this.project = project;
}

@ProcessElement
public void processElement(@Element Long in, OutputReceiver<List<Site>> out) {
TableResult result = BigQueryUtil.readSitesFromBigQuery(this.project);
out.output(BigQueryUtil.convertBQRowsToSites(result));
}
}

static class UnpackSessionMessageFn extends DoFn<PubsubMessage, IntermediateEvent> {
@ProcessElement
public void processElement(@Element PubsubMessage in, OutputReceiver<IntermediateEvent> out) {
out.output(new IntermediateEvent(in));
}
}

static class UnpackPurchaseMessageFn extends DoFn<PubsubMessage, IntermediateEvent> {
@ProcessElement
public void processElement(@Element PubsubMessage in, OutputReceiver<IntermediateEvent> out) {
//TODO log should be removed after testing
out.output(new IntermediateEvent(in, true));
}
}

static class ToEventAndSendFn extends DoFn<IntermediateEvent, Void> {
private final PCollectionView<List<Pixel>> pixels;
private final PCollectionView<List<Site>> sites;

//add constructor with side input so it can be accessed in static class
public ToEventAndSendFn(PCollectionView<List<Pixel>> pixels, PCollectionView<List<Site>> sites) {
this.pixels = pixels;
this.sites = sites;
}

@ProcessElement
public void processElement(@Element IntermediateEvent in, OutputReceiver<Void> out, ProcessContext c) {
out.output(Transformer.createEventAndSendToApi(in,
c.sideInput(this.pixels), c.sideInput(this.sites)));
}
}
}