I'm facing the below issue whilst running the GCP Dataflow cron job. I've a cron job running every 20 minutes which fetches the data from the BigQuery table. When I've a single entry in the table, it fives no error. But when I've multiple entries in the table then it'll throw the below error. It doesn't throw any error when I run the tests in local. Any suggestions would be helpful and much appreciated.
java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton view. Consider using Combine.globally().asSingleton() to combine the PCollection into a single value
at org.apache.beam.sdk.transforms.View$SingletonCombineFn.apply ( org/apache.beam.sdk.transforms/View.java:464 ) at org.apache.beam.sdk.transforms.Combine$BinaryCombineFn.addInput ( org/apache.beam.sdk.transforms/Combine.java:528 ) at org.apache.beam.sdk.transforms.Combine$BinaryCombineFn.addInput ( org/apache.beam.sdk.transforms/Combine.java:497 ) at org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillCombiningState.add ( org/apache.beam.runners.dataflow.worker/WindmillStateInternals.java:2067 ) at org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SystemReduceFn.processValue ( org/apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core/SystemReduceFn.java:119 )...........
PCollection<Long> cronPCollection = getCronJobPCollection(pipeline, options); PCollectionView<List<Pixel>> pixels = getPixelsSideInput(cronPCollection, options); pipeline.apply("Read Events", PubsubIO.readMessages().fromSubscription(options.getPurchasesSubscription())) .apply("Unpack Purchase Events", ParDo.of(new UnpackPurchaseMessageFn())) .apply("Publish Purchase Events", ParDo.of(new EventAndSendFn(pixels, sites)).withSideInputs(pixels, sites)); public static PCollection<Long> getCronJobPCollection(Pipeline pipeline, PipelineOptions options) { return pipeline .apply("Cron Trigger", GenerateSequence.from(0).withRate(1, Duration.standardMinutes(options.getMetadataRefreshMinutes()))) .apply("Cron Window", Window.<Long>into(new GlobalWindows()) .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane())) .discardingFiredPanes()); } public static PCollectionView<List<Pixel>> getPixelsSideInput(PCollection<Long> cronPCollection, PipelineOptions options) { return cronPCollection .apply("Read Pixel Configs", ParDo.of(new ReadPixelsFromBQFn(options.getProject()))) .apply("Transform Pixel Configs", View.asSingleton()); } static class ReadPixelsFromBQFn extends DoFn<Long, List<Pixel>> { private final String project; public ReadPixelsFromBQFn(String project) { this.project = project; } @ProcessElement public void processElement(@Element Long in, OutputReceiver<List<Pixel>> out) { TableResult result = BigQueryUtil.readPixelsFromBigQuery(this.project); // This method BigQueryUtil.convertBQRowsToPixels(result) returns List<Pixel> out.output(BigQueryUtil.convertBQRowsToPixels(result)); } }
Hello,
Maybe you can try to use Combine.globally().asSingleton() to combine the PCollection into a single value as the message mentioned?
Java Combine.globally Examples might be a good example.
Thanks for your reply. I'm a bit confused here where exactly to use the below Combine.globally().asSingleton as globally method expects argument / arguments. Could you help me to modify my below code and direct me where exactly the change would apply please?
public class App {
private static final Logger LOG = LoggerFactory.getLogger(App.class);
public static void main(String[] args) {
EventsHandlerDataflowPipelineOptions options = PipelineOptionsFactory
.fromArgs(args)
.withValidation()
.as(EventsHandlerDataflowPipelineOptions.class);
Pipeline pipeline = Pipeline.create(options);
PCollection<Long> cronPCollection = getCronJobPCollection(pipeline, options);
PCollectionView<List<Pixel>> pixels = getPixelsSideInput(cronPCollection, options);
PCollectionView<List<Site>> sites = getSitesSideInput(cronPCollection, options);
pipeline.apply("Read Purchase Events",
PubsubIO.readMessages().fromSubscription(options.getPurchasesSubscription()))
.apply("Unpack Purchase Events", ParDo.of(new UnpackPurchaseMessageFn()))
.apply("Publish Purchase Events",
ParDo.of(new ToEventAndSendFn(pixels, sites)).withSideInputs(pixels, sites));
pipeline.run();
}
public static PCollection<Long> getCronJobPCollection(Pipeline pipeline,
EventsHandlerDataflowPipelineOptions options) {
return pipeline
.apply("Cron Trigger",
GenerateSequence.from(0).withRate(1,
Duration.standardMinutes(options.getMetadataRefreshMinutes())))
.apply("Cron Window", Window.<Long>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
.discardingFiredPanes());
}
public static PCollectionView<List<Pixel>> getPixelsSideInput(PCollection<Long> cronPCollection,
EventsHandlerDataflowPipelineOptions options) {
return cronPCollection
.apply("Read Pixel Configs", ParDo.of(new ReadPixelsFromBQFn(options.getProject())))
.apply("Transform Pixel Configs", View.asSingleton());
}
public static PCollectionView<List<Site>> getSitesSideInput(PCollection<Long> cronPCollection,
EventsHandlerDataflowPipelineOptions options) {
return cronPCollection
.apply("Read Site Details", ParDo.of(new ReadSitesFromBQFn(options.getProject())))
.apply("Transform Site Details", View.asSingleton());
}
static class ReadPixelsFromBQFn extends DoFn<Long, List<Pixel>> {
private final String project;
public ReadPixelsFromBQFn(String project) {
this.project = project;
}
@ProcessElement
public void processElement(@Element Long in, OutputReceiver<List<Pixel>> out) {
TableResult result = BigQueryUtil.readPixelsFromBigQuery(this.project);
out.output(BigQueryUtil.convertBQRowsToPixels(result));
}
}
static class ReadSitesFromBQFn extends DoFn<Long, List<Site>> {
private final String project;
public ReadSitesFromBQFn(String project) {
this.project = project;
}
@ProcessElement
public void processElement(@Element Long in, OutputReceiver<List<Site>> out) {
TableResult result = BigQueryUtil.readSitesFromBigQuery(this.project);
out.output(BigQueryUtil.convertBQRowsToSites(result));
}
}
static class UnpackSessionMessageFn extends DoFn<PubsubMessage, IntermediateEvent> {
@ProcessElement
public void processElement(@Element PubsubMessage in, OutputReceiver<IntermediateEvent> out) {
out.output(new IntermediateEvent(in));
}
}
static class UnpackPurchaseMessageFn extends DoFn<PubsubMessage, IntermediateEvent> {
@ProcessElement
public void processElement(@Element PubsubMessage in, OutputReceiver<IntermediateEvent> out) {
//TODO log should be removed after testing
out.output(new IntermediateEvent(in, true));
}
}
static class ToEventAndSendFn extends DoFn<IntermediateEvent, Void> {
private final PCollectionView<List<Pixel>> pixels;
private final PCollectionView<List<Site>> sites;
//add constructor with side input so it can be accessed in static class
public ToEventAndSendFn(PCollectionView<List<Pixel>> pixels, PCollectionView<List<Site>> sites) {
this.pixels = pixels;
this.sites = sites;
}
@ProcessElement
public void processElement(@Element IntermediateEvent in, OutputReceiver<Void> out, ProcessContext c) {
out.output(Transformer.createEventAndSendToApi(in,
c.sideInput(this.pixels), c.sideInput(this.sites)));
}
}
}
Thanks for your reply. I'm a bit confused here where exactly to use the Combine.globally().asSingleton as globally method expects argument / arguments. Could you help me to modify my below code and direct me where exactly the change would apply please?
public class App {
private static final Logger LOG = LoggerFactory.getLogger(App.class);
public static void main(String[] args) {
EventsHandlerDataflowPipelineOptions options = PipelineOptionsFactory
.fromArgs(args)
.withValidation()
.as(EventsHandlerDataflowPipelineOptions.class);
Pipeline pipeline = Pipeline.create(options);
PCollection<Long> cronPCollection = getCronJobPCollection(pipeline, options);
PCollectionView<List<Pixel>> pixels = getPixelsSideInput(cronPCollection, options);
PCollectionView<List<Site>> sites = getSitesSideInput(cronPCollection, options);
pipeline.apply("Read Purchase Events",
PubsubIO.readMessages().fromSubscription(options.getPurchasesSubscription()))
.apply("Unpack Purchase Events", ParDo.of(new UnpackPurchaseMessageFn()))
.apply("Publish Purchase Events",
ParDo.of(new ToEventAndSendFn(pixels, sites)).withSideInputs(pixels, sites));
pipeline.run();
}
public static PCollection<Long> getCronJobPCollection(Pipeline pipeline,
EventsHandlerDataflowPipelineOptions options) {
return pipeline
.apply("Cron Trigger",
GenerateSequence.from(0).withRate(1,
Duration.standardMinutes(options.getMetadataRefreshMinutes())))
.apply("Cron Window", Window.<Long>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
.discardingFiredPanes());
}
public static PCollectionView<List<Pixel>> getPixelsSideInput(PCollection<Long> cronPCollection,
EventsHandlerDataflowPipelineOptions options) {
return cronPCollection
.apply("Read Pixel Configs", ParDo.of(new ReadPixelsFromBQFn(options.getProject())))
.apply("Transform Pixel Configs", View.asSingleton());
}
public static PCollectionView<List<Site>> getSitesSideInput(PCollection<Long> cronPCollection,
EventsHandlerDataflowPipelineOptions options) {
return cronPCollection
.apply("Read Site Details", ParDo.of(new ReadSitesFromBQFn(options.getProject())))
.apply("Transform Site Details", View.asSingleton());
}
static class ReadPixelsFromBQFn extends DoFn<Long, List<Pixel>> {
private final String project;
public ReadPixelsFromBQFn(String project) {
this.project = project;
}
@ProcessElement
public void processElement(@Element Long in, OutputReceiver<List<Pixel>> out) {
TableResult result = BigQueryUtil.readPixelsFromBigQuery(this.project);
out.output(BigQueryUtil.convertBQRowsToPixels(result));
}
}
static class ReadSitesFromBQFn extends DoFn<Long, List<Site>> {
private final String project;
public ReadSitesFromBQFn(String project) {
this.project = project;
}
@ProcessElement
public void processElement(@Element Long in, OutputReceiver<List<Site>> out) {
TableResult result = BigQueryUtil.readSitesFromBigQuery(this.project);
out.output(BigQueryUtil.convertBQRowsToSites(result));
}
}
static class UnpackSessionMessageFn extends DoFn<PubsubMessage, IntermediateEvent> {
@ProcessElement
public void processElement(@Element PubsubMessage in, OutputReceiver<IntermediateEvent> out) {
out.output(new IntermediateEvent(in));
}
}
static class UnpackPurchaseMessageFn extends DoFn<PubsubMessage, IntermediateEvent> {
@ProcessElement
public void processElement(@Element PubsubMessage in, OutputReceiver<IntermediateEvent> out) {
//TODO log should be removed after testing
out.output(new IntermediateEvent(in, true));
}
}
static class ToEventAndSendFn extends DoFn<IntermediateEvent, Void> {
private final PCollectionView<List<Pixel>> pixels;
private final PCollectionView<List<Site>> sites;
//add constructor with side input so it can be accessed in static class
public ToEventAndSendFn(PCollectionView<List<Pixel>> pixels, PCollectionView<List<Site>> sites) {
this.pixels = pixels;
this.sites = sites;
}
@ProcessElement
public void processElement(@Element IntermediateEvent in, OutputReceiver<Void> out, ProcessContext c) {
out.output(Transformer.createEventAndSendToApi(in,
c.sideInput(this.pixels), c.sideInput(this.sites)));
}
}
}