Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Apache beam 2.37.0 dropped google cloud bigquery dependency

I'm using apache beam 2.29.0 which is having a cron job to read the data from bigquery tables. BigQuery api's are coming from google.cloud.bigquery dependency which is part of apache beam's dependency (org.apache.beam:beam-runners-google-cloud-dataflow-java). This dependency inclues BigQuery, TableResult, FielValues etc classes. When I try to upgrade the apache beam version to 2.37.0, my code broken and it's not able to recognise these classes (BigQuery, TableResult, FielValues etc classes) anymore. I've an alternative of these which includes BigQueryIO, TableRow etc classes. I'm not very sure how to migrate my old code to the new APIs as the new APIs runs on pipeline and returns the query results in PCollection. If anyone could suggest me on this would be really helpful.

I've the below options to consider. Not sure which one should be feasible. Please suggest.

  1. Update the version to 2.37.0 and include google cloud bigquery library separately (no changes are required to our existing code)
  2. Update the code to use apache beam's bigquery library and upgrade the version to 2.37.0 (where I need help)
  3. Update the version to 2.34.0 (deprecating in November 2022) which won't required any changes to the codebase and not needed to include google cloud library separately. Look into this again in next quarter to upgrade it to 2.37.0 Taking option 1, does it going to break in the future as apache beam removed the library support internally and adding an extra dependency would going to be an overhead if in case apache beam stops supporting the google cloud bigquery APIs?

Here's the code sample from my application.

public class App { private static final Logger LOG = LoggerFactory.getLogger(App.class);

 

public static void main(String[] args) {
    EventsHandlerDataflowPipelineOptions options = PipelineOptionsFactory
            .fromArgs(args)
            .withValidation()
            .as(EventsHandlerDataflowPipelineOptions.class);

    Pipeline pipeline = Pipeline.create(options);
    PCollection<Long> cronPCollection = getCronJobPCollection(pipeline, options);
    PCollectionView<List<Pixel>> pixels = getPixelsSideInput(cronPCollection, options);
    PCollectionView<List<Site>> sites = getSitesSideInput(cronPCollection, options);


    pipeline.apply("Read Purchase Events",
            PubsubIO.readMessages().fromSubscription(options.getPurchasesSubscription()))
            .apply("Unpack Purchase Events", ParDo.of(new UnpackPurchaseMessageFn()))
            .apply("Publish Purchase Events",
                    ParDo.of(new ToEventAndSendFn(pixels, sites)).withSideInputs(pixels, sites));

    pipeline.run();
}

public static PCollection<Long> getCronJobPCollection(Pipeline pipeline,
                                                      EventsHandlerDataflowPipelineOptions options) {
    return pipeline
            .apply("Cron Trigger",
                    GenerateSequence.from(0).withRate(1,
                            Duration.standardMinutes(options.getMetadataRefreshMinutes())))
            .apply("Cron Window", Window.<Long>into(new GlobalWindows())
                    .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
                    .discardingFiredPanes());
}

public static PCollectionView<List<Pixel>> getPixelsSideInput(PCollection<Long> cronPCollection,
                                                              EventsHandlerDataflowPipelineOptions options) {
    return cronPCollection
            .apply("Read Pixel Configs", ParDo.of(new ReadPixelsFromBQFn(options.getProject())))
            .apply("Transform Pixel Configs", View.asSingleton());
}

public static PCollectionView<List<Site>> getSitesSideInput(PCollection<Long> cronPCollection,
                                                            EventsHandlerDataflowPipelineOptions options) {
    return cronPCollection
            .apply("Read Site Details", ParDo.of(new ReadSitesFromBQFn(options.getProject())))
            .apply("Transform Site Details", View.asSingleton());
}

static class ReadPixelsFromBQFn extends DoFn<Long, List<Pixel>> {
    private final String project;

    public ReadPixelsFromBQFn(String project) {
        this.project = project;
    }

    @ProcessElement
    public void processElement(@Element Long in, OutputReceiver<List<Pixel>> out) {
        TableResult result = BigQueryUtil.readPixelsFromBigQuery(this.project);
        out.output(BigQueryUtil.convertBQRowsToPixels(result));
    }
}

static class ReadSitesFromBQFn extends DoFn<Long, List<Site>> {
    private final String project;

    public ReadSitesFromBQFn(String project) {
        this.project = project;
    }

    @ProcessElement
    public void processElement(@Element Long in, OutputReceiver<List<Site>> out) {
        TableResult result = BigQueryUtil.readSitesFromBigQuery(this.project);
        out.output(BigQueryUtil.convertBQRowsToSites(result));
    }
}

static class UnpackSessionMessageFn extends DoFn<PubsubMessage, IntermediateEvent> {
    @ProcessElement
    public void processElement(@Element PubsubMessage in, OutputReceiver<IntermediateEvent> out) {
        out.output(new IntermediateEvent(in));
    }
}

static class UnpackPurchaseMessageFn extends DoFn<PubsubMessage, IntermediateEvent> {
    @ProcessElement
    public void processElement(@Element PubsubMessage in, OutputReceiver<IntermediateEvent> out) {
        //TODO log should be removed after testing
        out.output(new IntermediateEvent(in, true));
    }
}

static class ToEventAndSendFn extends DoFn<IntermediateEvent, Void> {
    private final PCollectionView<List<Pixel>> pixels;
    private final PCollectionView<List<Site>> sites;

    //add constructor with side input so it can be accessed in static class
    public ToEventAndSendFn(PCollectionView<List<Pixel>> pixels, PCollectionView<List<Site>> sites) {
        this.pixels = pixels;
        this.sites = sites;
    }

    @ProcessElement
    public void processElement(@Element IntermediateEvent in, OutputReceiver<Void> out, ProcessContext c) {
        out.output(Transformer.createEventAndSendToApi(in,
                c.sideInput(this.pixels), c.sideInput(this.sites)));
    }
}
1 REPLY 1

Hey Shakirsyed,

Google Groups are reserved for general product discussion, you shall go to StackOverflow for technical questions.

To get a better support you should post to the relevant forum, thus please read the Community Support article for better understanding.