I have an excel file on GCS I'm trying to read in Dataflow them write it out to BigQuery.
I was successfully able to read the excel file using (apache_beam.dataframe.io), and I also was able to write the result to csv or txt file back to GCS bucket as well.
The issue I'm having now is writing the result to BigQuery table. My understating that BigQuery writer (
from __future__ import absolute_import
import argparse
import logging
import apache_beam as beam
from apache_beam.dataframe.io import read_excel
from apache_beam.io import WriteToText
from apache_beam.dataframe.convert import to_pcollection, to_dataframe
from apache_beam.io import fileio
from apache_beam.options.pipeline_options import PipelineOptions
def run_training_pipeline(pipeline, input_file=None, output_path=None):
with pipeline as p:
excel_file = (p | "Read Excel" >> read_excel(input_file))
dataframe_to_pcollection = to_pcollection(excel_file)
write_bigquery = (
dataframe_to_pcollection | "write to bigquery" >> beam.io.WriteToBigQuery(table="test3",
dataset="test_dataflows",
project="dev-tmp-project",
schema="Name:STRING,Gender:STRING"))
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument(
'--input_file',
dest='input_file',
type=str,
required=True,
help='Input specified as a GCS path containing excel file')
parser.add_argument(
'--output_path',
dest='output_path',
type=str,
required=True,
help='Location to write the output.')
known_args, pipeline_args = parser.parse_known_args(argv)
run_training_pipeline(
beam.Pipeline(options=PipelineOptions(pipeline_args)),
input_file=known_args.input_file,
output_path=known_args.output_path
)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
ERROR:apache_beam.runners.dataflow.dataflow_runner:2024-03-19T09:07:17.958Z: JOB_MESSAGE_ERROR: Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1480, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
File "apache_beam/runners/common.py", line 563, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
File "/usr/local/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery_file_loads.py", line 745, in finish_bundle
self.bq_wrapper.wait_for_bq_job(
File "/usr/local/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 650, in wait_for_bq_job
raise RuntimeError(
RuntimeError: BigQuery job beam_bq_job_LOAD_matartestingbghrt20240319120200_LOAD_STEP_c42ec524a1844323a4731635d2a7c7a3_c25dc827825039c396f4c1011eafb70a_pane0_partition0 failed. Error Result: <ErrorProto
location: 'gs://xxxxx/temp/bq_load/3d307898278a4123a0c0c0e7e87e90dd/dev-tmp-project.test_dataflows.hr-test3/1af0522c-5777-4409-810b-5a95b4064f10'
message: 'Error while reading data, error message: JSON table encountered too many errors, giving up. Rows: 1; errors: 1. Please look into the errors[] collection for more details. File: gs://xxxxx/temp/bq_load/3d307898278a4123a0c0c0e7e87e90dd/dev-tmp-project.test_dataflows.hr-test3/1af0522c-5777-4409-810b-5a95b4064f10'
reason: 'invalid'>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 297, in _execute
response = task()
File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 372, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 625, in do_instruction
return getattr(self, request_type)(
File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 663, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "/usr/local/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1057, in process_bundle
op.finish()
File "apache_beam/runners/worker/operations.py", line 983, in apache_beam.runners.worker.operations.DoOperation.finish
File "apache_beam/runners/worker/operations.py", line 986, in apache_beam.runners.worker.operations.DoOperation.finish
File "apache_beam/runners/worker/operations.py", line 987, in apache_beam.runners.worker.operations.DoOperation.finish
File "apache_beam/runners/common.py", line 1501, in apache_beam.runners.common.DoFnRunner.finish
File "apache_beam/runners/common.py", line 1482, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
File "apache_beam/runners/common.py", line 1533, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1480, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
File "apache_beam/runners/common.py", line 563, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
File "/usr/local/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery_file_loads.py", line 745, in finish_bundle
self.bq_wrapper.wait_for_bq_job(
File "/usr/local/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 650, in wait_for_bq_job
raise RuntimeError(
RuntimeError: BigQuery job beam_bq_job_LOAD_matartestingbghrt20240319120200_LOAD_STEP_c42ec524a1844323a4731635d2a7c7a3_c25dc827825039c396f4c1011eafb70a_pane0_partition0 failed. Error Result: <ErrorProto
location: 'gs://xxxxx/temp/bq_load/3d307898278a4123a0c0c0e7e87e90dd/dev-tmp-project.test_dataflows.hr-test3/1af0522c-5777-4409-810b-5a95b4064f10'
message: 'Error while reading data, error message: JSON table encountered too many errors, giving up. Rows: 1; errors: 1. Please look into the errors[] collection for more details. File: gs://xxxxx/temp/bq_load/3d307898278a4123a0c0c0e7e87e90dd/dev-tmp-project.test_dataflows.hr-test3/1af0522c-5777-4409-810b-5a95b4064f10'
reason: 'invalid'> [while running 'write to bigquery/BigQueryBatchFileLoads/TriggerLoadJobsWithoutTempTables/ParDo(TriggerLoadJobs)-ptransform-93']
ERROR:apache_beam.runners.dataflow.dataflow_runner:2024-03-19T09:07:19.446Z: JOB_MESSAGE_ERROR: Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1480, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
File "apache_beam/runners/common.py", line 563, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
File "/usr/local/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery_file_loads.py", line 745, in finish_bundle
self.bq_wrapper.wait_for_bq_job(
File "/usr/local/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 646, in wait_for_bq_job
job = self.get_job(
File "/usr/local/lib/python3.10/site-packages/apache_beam/utils/retry.py", line 275, in wrapper
return fun(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 992, in get_job
return self.client.jobs.Get(request)
File "/usr/local/lib/python3.10/site-packages/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py", line 302, in Get
return self._RunMethod(config, request, global_params=global_params)
File "/usr/local/lib/python3.10/site-packages/apitools/base/py/base_api.py", line 731, in _RunMethod
return self.ProcessHttpResponse(method_config, http_response, request)
File "/usr/local/lib/python3.10/site-packages/apitools/base/py/base_api.py", line 737, in ProcessHttpResponse
self.__ProcessHttpResponse(method_config, http_response, request))
File "/usr/local/lib/python3.10/site-packages/apitools/base/py/base_api.py", line 603, in __ProcessHttpResponse
raise exceptions.HttpError.FromResponse(
apitools.base.py.exceptions.HttpNotFoundError: HttpError accessing <https://bigquery.googleapis.com/bigquery/v2/projects/dev-tmp-project/jobs/beam_bq_job_LOAD_matartestingbghrt20240319120200_LOAD_STEP_c42ec524a1844323a4731635d2a7c7a3_c25dc827825039c396f4c1011eafb70a_pane0_partition0?alt=json>: response: <{'vary': 'Origin, X-Origin, Referer', 'content-type': 'application/json; charset=UTF-8', 'date': 'Tue, 19 Mar 2024 09:07:19 GMT', 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0', 'x-frame-options': 'SAMEORIGIN', 'x-content-type-options': 'nosniff', 'transfer-encoding': 'chunked', 'status': '404', 'content-length': '548', '-content-encoding': 'gzip'}>, content <{
"error": {
"code": 404,
"message": "Not found: Job dev-tmp-project:beam_bq_job_LOAD_matartestingbghrt20240319120200_LOAD_STEP_c42ec524a1844323a4731635d2a7c7a3_c25dc827825039c396f4c1011eafb70a_pane0_partition0",
"errors": [
{
"message": "Not found: Job dev-tmp-project:beam_bq_job_LOAD_matartestingbghrt20240319120200_LOAD_STEP_c42ec524a1844323a4731635d2a7c7a3_c25dc827825039c396f4c1011eafb70a_pane0_partition0",
"domain": "global",
"reason": "notFound"
}
],
"status": "NOT_FOUND"
}
}
>
Name | Gender |
John | Female |
John | Male |
John | Male |
John | Female |
John | Female |
Solved! Go to Solution.
update:
I was able to figure it out. I needed ParDo transformation.
Leaving the solution here for anyone needs it :
from __future__ import absolute_import
import argparse
import logging
import apache_beam as beam
from apache_beam.dataframe.io import read_excel
from apache_beam.io import WriteToText
from apache_beam.dataframe.convert import to_pcollection, to_dataframe
from apache_beam.io import fileio
from apache_beam.options.pipeline_options import PipelineOptions
class Convert_namedtupled_to_dict(beam.DoFn):
def process(self, element):
yield element._asdict()
def run_training_pipeline(pipeline, input_file=None, output_path=None):
with pipeline as p:
excel_file = (p | "Read Excel" >> read_excel(input_file))
dataframe_to_pcollection = to_pcollection(excel_file)
write_bigquery = (
dataframe_to_pcollection | "dict" >> beam.ParDo(Convert_namedtupled_to_dict())
| "write to bigquery" >> beam.io.WriteToBigQuery(table="test3",
dataset="test_dataflows",
project="dev-tmp-project",
schema="Name:STRING,Gender:STRING"))
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument(
'--input_file',
dest='input_file',
type=str,
required=True,
help='Input specified as a GCS path containing excel file')
parser.add_argument(
'--output_path',
dest='output_path',
type=str,
required=True,
help='Location to write the output.')
known_args, pipeline_args = parser.parse_known_args(argv)
run_training_pipeline(
beam.Pipeline(options=PipelineOptions(pipeline_args)),
input_file=known_args.input_file,
output_path=known_args.output_path
)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
update:
I was able to figure it out. I needed ParDo transformation.
Leaving the solution here for anyone needs it :
from __future__ import absolute_import
import argparse
import logging
import apache_beam as beam
from apache_beam.dataframe.io import read_excel
from apache_beam.io import WriteToText
from apache_beam.dataframe.convert import to_pcollection, to_dataframe
from apache_beam.io import fileio
from apache_beam.options.pipeline_options import PipelineOptions
class Convert_namedtupled_to_dict(beam.DoFn):
def process(self, element):
yield element._asdict()
def run_training_pipeline(pipeline, input_file=None, output_path=None):
with pipeline as p:
excel_file = (p | "Read Excel" >> read_excel(input_file))
dataframe_to_pcollection = to_pcollection(excel_file)
write_bigquery = (
dataframe_to_pcollection | "dict" >> beam.ParDo(Convert_namedtupled_to_dict())
| "write to bigquery" >> beam.io.WriteToBigQuery(table="test3",
dataset="test_dataflows",
project="dev-tmp-project",
schema="Name:STRING,Gender:STRING"))
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument(
'--input_file',
dest='input_file',
type=str,
required=True,
help='Input specified as a GCS path containing excel file')
parser.add_argument(
'--output_path',
dest='output_path',
type=str,
required=True,
help='Location to write the output.')
known_args, pipeline_args = parser.parse_known_args(argv)
run_training_pipeline(
beam.Pipeline(options=PipelineOptions(pipeline_args)),
input_file=known_args.input_file,
output_path=known_args.output_path
)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()