Hi,
I am testing the Bigquery storage write API by attempting to write sample data into Bigquery and I am getting a permission error as below
google.api_core.exceptions.PermissionDenied: 403 Permission 'TABLES_UPDATE_DATA' denied on resource 'projects/xxx/datasets/xxx.yyy/tables/xxx.yyy.test_customer_data' (or it may not exist).
I have given the service account the necessary roles according to this documentation. If I test the same service account using the Legacy streaming API I am able to write successfully into Bigquery. I will appreciate the help by being shown what might be wrong since we are intending to use the Bigquery storage write API. Thanks
Below is my full code, if it might help
"""
This code sample demonstrates how to write records in pending mode
using the low-level generated client for Python.
"""
from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types
from google.cloud.bigquery_storage_v1 import writer
from google.protobuf import descriptor_pb2
from google.cloud import storage
import os
import customer_record_pb2 #
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'xxx-b033d9535e9d.json' #The Json file is the same location
storage_client = storage.Client()
def create_row_data(row_num: int, name: str):
row = customer_record_pb2.CustomerRecord()
row.row_num = row_num
row.customer_name = name
return row.SerializeToString()
def append_rows_pending(project_id: str, dataset_id: str, table_id: str):
"""Create a write stream, write some sample data, and commit the stream."""
write_client = bigquery_storage_v1.BigQueryWriteClient()
parent = write_client.table_path(project_id, dataset_id, table_id)
write_stream = types.WriteStream()
# Setting the Pending Mode
write_stream.type_ = types.WriteStream.Type.PENDING
write_stream = write_client.create_write_stream(
parent=parent, write_stream=write_stream
)
stream_name = write_stream.name
request_template = types.AppendRowsRequest()
request_template.write_stream = stream_name
proto_schema = types.ProtoSchema()
proto_descriptor = descriptor_pb2.DescriptorProto()
customer_record_pb2.CustomerRecord.DESCRIPTOR.CopyToProto(proto_descriptor)
proto_schema.proto_descriptor = proto_descriptor
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.writer_schema = proto_schema
request_template.proto_rows = proto_data
# AppendRowsStream to send an arbitrary number of requests to a stream.
append_rows_stream = writer.AppendRowsStream(write_client, request_template)
# First Batch
proto_rows = types.ProtoRows()
proto_rows.serialized_rows.append(create_row_data(1, "Alice"))
proto_rows.serialized_rows.append(create_row_data(2, "Bob"))
request = types.AppendRowsRequest()
request.offset = 0
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.rows = proto_rows
request.proto_rows = proto_data
response_future_1 = append_rows_stream.send(request)
# Sending another batch.
proto_rows = types.ProtoRows()
proto_rows.serialized_rows.append(create_row_data(3, "Charles"))
request = types.AppendRowsRequest()
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.rows = proto_rows
request.proto_rows = proto_data
request.offset = 2
response_future_2 = append_rows_stream.send(request)
print(response_future_1.result())
print(response_future_2.result())
# Shutdown background threads and close the streaming connection.
append_rows_stream.close()
write_client.finalize_write_stream(name=write_stream.name)
# Commit the stream you created earlier.
batch_commit_write_streams_request = types.BatchCommitWriteStreamsRequest()
batch_commit_write_streams_request.parent = parent
batch_commit_write_streams_request.write_streams = [write_stream.name]
write_client.batch_commit_write_streams(batch_commit_write_streams_request)
print(f"Writes to stream: '{write_stream.name}' have been committed.")
append_rows_pending(project_id='xxxx', dataset_id='xxxx.yyyy',
table_id='xxxx.yyyy.test_customer_data')