I'm using apache beam 2.29.0 which is having a cron job to read the data from bigquery tables. BigQuery api's are coming from google.cloud.bigquery dependency which is part of apache beam's dependency (org.apache.beam:beam-runners-google-cloud-dataflow-java). This dependency inclues BigQuery, TableResult, FielValues etc classes. When I try to upgrade the apache beam version to 2.37.0, my code broken and it's not able to recognise these classes (BigQuery, TableResult, FielValues etc classes) anymore. I've an alternative of these which includes BigQueryIO, TableRow etc classes. I'm not very sure how to migrate my old code to the new APIs as the new APIs runs on pipeline and returns the query results in PCollection. If anyone could suggest me on this would be really helpful.
I've the below options to consider. Not sure which one should be feasible. Please suggest.
Here's the code sample from my application.
public class App { private static final Logger LOG = LoggerFactory.getLogger(App.class);
public static void main(String[] args) { EventsHandlerDataflowPipelineOptions options = PipelineOptionsFactory .fromArgs(args) .withValidation() .as(EventsHandlerDataflowPipelineOptions.class); Pipeline pipeline = Pipeline.create(options); PCollection<Long> cronPCollection = getCronJobPCollection(pipeline, options); PCollectionView<List<Pixel>> pixels = getPixelsSideInput(cronPCollection, options); PCollectionView<List<Site>> sites = getSitesSideInput(cronPCollection, options); pipeline.apply("Read Purchase Events", PubsubIO.readMessages().fromSubscription(options.getPurchasesSubscription())) .apply("Unpack Purchase Events", ParDo.of(new UnpackPurchaseMessageFn())) .apply("Publish Purchase Events", ParDo.of(new ToEventAndSendFn(pixels, sites)).withSideInputs(pixels, sites)); pipeline.run(); } public static PCollection<Long> getCronJobPCollection(Pipeline pipeline, EventsHandlerDataflowPipelineOptions options) { return pipeline .apply("Cron Trigger", GenerateSequence.from(0).withRate(1, Duration.standardMinutes(options.getMetadataRefreshMinutes()))) .apply("Cron Window", Window.<Long>into(new GlobalWindows()) .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane())) .discardingFiredPanes()); } public static PCollectionView<List<Pixel>> getPixelsSideInput(PCollection<Long> cronPCollection, EventsHandlerDataflowPipelineOptions options) { return cronPCollection .apply("Read Pixel Configs", ParDo.of(new ReadPixelsFromBQFn(options.getProject()))) .apply("Transform Pixel Configs", View.asSingleton()); } public static PCollectionView<List<Site>> getSitesSideInput(PCollection<Long> cronPCollection, EventsHandlerDataflowPipelineOptions options) { return cronPCollection .apply("Read Site Details", ParDo.of(new ReadSitesFromBQFn(options.getProject()))) .apply("Transform Site Details", View.asSingleton()); } static class ReadPixelsFromBQFn extends DoFn<Long, List<Pixel>> { private final String project; public ReadPixelsFromBQFn(String project) { this.project = project; } @ProcessElement public void processElement(@Element Long in, OutputReceiver<List<Pixel>> out) { TableResult result = BigQueryUtil.readPixelsFromBigQuery(this.project); out.output(BigQueryUtil.convertBQRowsToPixels(result)); } } static class ReadSitesFromBQFn extends DoFn<Long, List<Site>> { private final String project; public ReadSitesFromBQFn(String project) { this.project = project; } @ProcessElement public void processElement(@Element Long in, OutputReceiver<List<Site>> out) { TableResult result = BigQueryUtil.readSitesFromBigQuery(this.project); out.output(BigQueryUtil.convertBQRowsToSites(result)); } } static class UnpackSessionMessageFn extends DoFn<PubsubMessage, IntermediateEvent> { @ProcessElement public void processElement(@Element PubsubMessage in, OutputReceiver<IntermediateEvent> out) { out.output(new IntermediateEvent(in)); } } static class UnpackPurchaseMessageFn extends DoFn<PubsubMessage, IntermediateEvent> { @ProcessElement public void processElement(@Element PubsubMessage in, OutputReceiver<IntermediateEvent> out) { //TODO log should be removed after testing out.output(new IntermediateEvent(in, true)); } } static class ToEventAndSendFn extends DoFn<IntermediateEvent, Void> { private final PCollectionView<List<Pixel>> pixels; private final PCollectionView<List<Site>> sites; //add constructor with side input so it can be accessed in static class public ToEventAndSendFn(PCollectionView<List<Pixel>> pixels, PCollectionView<List<Site>> sites) { this.pixels = pixels; this.sites = sites; } @ProcessElement public void processElement(@Element IntermediateEvent in, OutputReceiver<Void> out, ProcessContext c) { out.output(Transformer.createEventAndSendToApi(in, c.sideInput(this.pixels), c.sideInput(this.sites))); } }
Hey Shakirsyed,
Google Groups are reserved for general product discussion, you shall go to StackOverflow for technical questions.
To get a better support you should post to the relevant forum, thus please read the Community Support article for better understanding.