Not able to write datafram to BigQuery using Dataflow

I have an excel file on GCS I'm trying to read in Dataflow them write it out to BigQuery.

I was successfully able to read the excel file using (apache_beam.dataframe.io), and I also was able to write the result to csv or txt file back to GCS bucket as well.

The issue I'm having now is writing the result to BigQuery table. My understating that BigQuery writer (

beam.io.WriteToBigQuery) expect Pcollection of dictionaries (Link).
 
what's the best way to convert the resulting datafram to PCollection of dictionaries ?
 
I've tried few thing without it luck. For example, I tried to convert DF to PCollection using (
apache_beam.dataframe.convert) but it didn't work because I think it  to_pcollection returns tuple.
 
Here's my code :
 

 

from __future__ import absolute_import

import argparse
import logging

import apache_beam as beam
from apache_beam.dataframe.io import read_excel
from apache_beam.io import WriteToText
from apache_beam.dataframe.convert import to_pcollection, to_dataframe
from apache_beam.io import fileio
from apache_beam.options.pipeline_options import PipelineOptions


def run_training_pipeline(pipeline, input_file=None, output_path=None):
    with pipeline as p:
        excel_file = (p | "Read Excel" >> read_excel(input_file))

        dataframe_to_pcollection = to_pcollection(excel_file)

        write_bigquery = (
            dataframe_to_pcollection | "write to bigquery" >> beam.io.WriteToBigQuery(table="test3",
                                                                         dataset="test_dataflows",
                                                                         project="dev-tmp-project",
                                                                         schema="Name:STRING,Gender:STRING"))
def run(argv=None):
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--input_file',
        dest='input_file',
        type=str,
        required=True,
        help='Input specified as a GCS path containing excel file')
    parser.add_argument(
        '--output_path',
        dest='output_path',
        type=str,
        required=True,
        help='Location to write the output.')
    
    known_args, pipeline_args = parser.parse_known_args(argv)

    run_training_pipeline(
        beam.Pipeline(options=PipelineOptions(pipeline_args)),
        input_file=known_args.input_file,
        output_path=known_args.output_path
        )
    
if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

​

 

 
 
Here's the error (I think it's kind of misleading) :

 

ERROR:apache_beam.runners.dataflow.dataflow_runner:2024-03-19T09:07:17.958Z: JOB_MESSAGE_ERROR: Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1480, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
  File "apache_beam/runners/common.py", line 563, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
  File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
  File "/usr/local/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery_file_loads.py", line 745, in finish_bundle
    self.bq_wrapper.wait_for_bq_job(
  File "/usr/local/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 650, in wait_for_bq_job
    raise RuntimeError(
RuntimeError: BigQuery job beam_bq_job_LOAD_matartestingbghrt20240319120200_LOAD_STEP_c42ec524a1844323a4731635d2a7c7a3_c25dc827825039c396f4c1011eafb70a_pane0_partition0 failed. Error Result: <ErrorProto
 location: 'gs://xxxxx/temp/bq_load/3d307898278a4123a0c0c0e7e87e90dd/dev-tmp-project.test_dataflows.hr-test3/1af0522c-5777-4409-810b-5a95b4064f10'
 message: 'Error while reading data, error message: JSON table encountered too many errors, giving up. Rows: 1; errors: 1. Please look into the errors[] collection for more details. File: gs://xxxxx/temp/bq_load/3d307898278a4123a0c0c0e7e87e90dd/dev-tmp-project.test_dataflows.hr-test3/1af0522c-5777-4409-810b-5a95b4064f10'
 reason: 'invalid'>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 297, in _execute
    response = task()
  File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 372, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 625, in do_instruction
    return getattr(self, request_type)(
  File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 663, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1057, in process_bundle
    op.finish()
  File "apache_beam/runners/worker/operations.py", line 983, in apache_beam.runners.worker.operations.DoOperation.finish
  File "apache_beam/runners/worker/operations.py", line 986, in apache_beam.runners.worker.operations.DoOperation.finish
  File "apache_beam/runners/worker/operations.py", line 987, in apache_beam.runners.worker.operations.DoOperation.finish
  File "apache_beam/runners/common.py", line 1501, in apache_beam.runners.common.DoFnRunner.finish
  File "apache_beam/runners/common.py", line 1482, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
  File "apache_beam/runners/common.py", line 1533, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1480, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
  File "apache_beam/runners/common.py", line 563, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
  File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
  File "/usr/local/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery_file_loads.py", line 745, in finish_bundle
    self.bq_wrapper.wait_for_bq_job(
  File "/usr/local/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 650, in wait_for_bq_job
    raise RuntimeError(
RuntimeError: BigQuery job beam_bq_job_LOAD_matartestingbghrt20240319120200_LOAD_STEP_c42ec524a1844323a4731635d2a7c7a3_c25dc827825039c396f4c1011eafb70a_pane0_partition0 failed. Error Result: <ErrorProto
 location: 'gs://xxxxx/temp/bq_load/3d307898278a4123a0c0c0e7e87e90dd/dev-tmp-project.test_dataflows.hr-test3/1af0522c-5777-4409-810b-5a95b4064f10'
 message: 'Error while reading data, error message: JSON table encountered too many errors, giving up. Rows: 1; errors: 1. Please look into the errors[] collection for more details. File: gs://xxxxx/temp/bq_load/3d307898278a4123a0c0c0e7e87e90dd/dev-tmp-project.test_dataflows.hr-test3/1af0522c-5777-4409-810b-5a95b4064f10'
 reason: 'invalid'> [while running 'write to bigquery/BigQueryBatchFileLoads/TriggerLoadJobsWithoutTempTables/ParDo(TriggerLoadJobs)-ptransform-93']

ERROR:apache_beam.runners.dataflow.dataflow_runner:2024-03-19T09:07:19.446Z: JOB_MESSAGE_ERROR: Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1480, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
  File "apache_beam/runners/common.py", line 563, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
  File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
  File "/usr/local/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery_file_loads.py", line 745, in finish_bundle
    self.bq_wrapper.wait_for_bq_job(
  File "/usr/local/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 646, in wait_for_bq_job
    job = self.get_job(
  File "/usr/local/lib/python3.10/site-packages/apache_beam/utils/retry.py", line 275, in wrapper
    return fun(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 992, in get_job
    return self.client.jobs.Get(request)
  File "/usr/local/lib/python3.10/site-packages/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py", line 302, in Get
    return self._RunMethod(config, request, global_params=global_params)
  File "/usr/local/lib/python3.10/site-packages/apitools/base/py/base_api.py", line 731, in _RunMethod
    return self.ProcessHttpResponse(method_config, http_response, request)
  File "/usr/local/lib/python3.10/site-packages/apitools/base/py/base_api.py", line 737, in ProcessHttpResponse
    self.__ProcessHttpResponse(method_config, http_response, request))
  File "/usr/local/lib/python3.10/site-packages/apitools/base/py/base_api.py", line 603, in __ProcessHttpResponse
    raise exceptions.HttpError.FromResponse(
apitools.base.py.exceptions.HttpNotFoundError: HttpError accessing <https://bigquery.googleapis.com/bigquery/v2/projects/dev-tmp-project/jobs/beam_bq_job_LOAD_matartestingbghrt20240319120200_LOAD_STEP_c42ec524a1844323a4731635d2a7c7a3_c25dc827825039c396f4c1011eafb70a_pane0_partition0?alt=json>: response: <{'vary': 'Origin, X-Origin, Referer', 'content-type': 'application/json; charset=UTF-8', 'date': 'Tue, 19 Mar 2024 09:07:19 GMT', 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0', 'x-frame-options': 'SAMEORIGIN', 'x-content-type-options': 'nosniff', 'transfer-encoding': 'chunked', 'status': '404', 'content-length': '548', '-content-encoding': 'gzip'}>, content <{
  "error": {
    "code": 404,
    "message": "Not found: Job dev-tmp-project:beam_bq_job_LOAD_matartestingbghrt20240319120200_LOAD_STEP_c42ec524a1844323a4731635d2a7c7a3_c25dc827825039c396f4c1011eafb70a_pane0_partition0",
    "errors": [
      {
        "message": "Not found: Job dev-tmp-project:beam_bq_job_LOAD_matartestingbghrt20240319120200_LOAD_STEP_c42ec524a1844323a4731635d2a7c7a3_c25dc827825039c396f4c1011eafb70a_pane0_partition0",
        "domain": "global",
        "reason": "notFound"
      }
    ],
    "status": "NOT_FOUND"
  }
}
>

 

 
 
 
My excel file looks like this :
NameGender
JohnFemale
JohnMale
JohnMale
JohnFemale
JohnFemale
 
 
 
 
 
 
 
 
Solved Solved
0 1 147
1 ACCEPTED SOLUTION

update:

I was able to figure it out. I needed ParDo transformation.

Leaving the solution here for anyone needs it :

from __future__ import absolute_import

import argparse
import logging

import apache_beam as beam
from apache_beam.dataframe.io import read_excel
from apache_beam.io import WriteToText
from apache_beam.dataframe.convert import to_pcollection, to_dataframe
from apache_beam.io import fileio
from apache_beam.options.pipeline_options import PipelineOptions

class Convert_namedtupled_to_dict(beam.DoFn):
    def process(self, element):
        yield element._asdict()
        
def run_training_pipeline(pipeline, input_file=None, output_path=None):
    with pipeline as p:
        excel_file = (p | "Read Excel" >> read_excel(input_file))

        dataframe_to_pcollection = to_pcollection(excel_file)

        write_bigquery = (
            dataframe_to_pcollection | "dict" >> beam.ParDo(Convert_namedtupled_to_dict()) 
                                     | "write to bigquery" >> beam.io.WriteToBigQuery(table="test3",
                                                                         dataset="test_dataflows",
                                                                         project="dev-tmp-project",
                                                                         schema="Name:STRING,Gender:STRING"))
def run(argv=None):
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--input_file',
        dest='input_file',
        type=str,
        required=True,
        help='Input specified as a GCS path containing excel file')
    parser.add_argument(
        '--output_path',
        dest='output_path',
        type=str,
        required=True,
        help='Location to write the output.')
    
    known_args, pipeline_args = parser.parse_known_args(argv)

    run_training_pipeline(
        beam.Pipeline(options=PipelineOptions(pipeline_args)),
        input_file=known_args.input_file,
        output_path=known_args.output_path
        )
    
if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

 

 

View solution in original post

1 REPLY 1

update:

I was able to figure it out. I needed ParDo transformation.

Leaving the solution here for anyone needs it :

from __future__ import absolute_import

import argparse
import logging

import apache_beam as beam
from apache_beam.dataframe.io import read_excel
from apache_beam.io import WriteToText
from apache_beam.dataframe.convert import to_pcollection, to_dataframe
from apache_beam.io import fileio
from apache_beam.options.pipeline_options import PipelineOptions

class Convert_namedtupled_to_dict(beam.DoFn):
    def process(self, element):
        yield element._asdict()
        
def run_training_pipeline(pipeline, input_file=None, output_path=None):
    with pipeline as p:
        excel_file = (p | "Read Excel" >> read_excel(input_file))

        dataframe_to_pcollection = to_pcollection(excel_file)

        write_bigquery = (
            dataframe_to_pcollection | "dict" >> beam.ParDo(Convert_namedtupled_to_dict()) 
                                     | "write to bigquery" >> beam.io.WriteToBigQuery(table="test3",
                                                                         dataset="test_dataflows",
                                                                         project="dev-tmp-project",
                                                                         schema="Name:STRING,Gender:STRING"))
def run(argv=None):
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--input_file',
        dest='input_file',
        type=str,
        required=True,
        help='Input specified as a GCS path containing excel file')
    parser.add_argument(
        '--output_path',
        dest='output_path',
        type=str,
        required=True,
        help='Location to write the output.')
    
    known_args, pipeline_args = parser.parse_known_args(argv)

    run_training_pipeline(
        beam.Pipeline(options=PipelineOptions(pipeline_args)),
        input_file=known_args.input_file,
        output_path=known_args.output_path
        )
    
if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()