Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Writing a PCollection<Row> to Cloud SQL

I am developing a Dataflow Pipeline as follows:

1. Read files from GCS having common prefix into a Deferred Dataframe

2. Apply Transformations to the Deferred Dataframe

3. Convert Deferred Dataframe to a PCollection

4. Write this PCollection to Cloud SQL for PostgreSQL (I am stuck at this step)

Can someone please advice me how can the step 4 be accomplished, and will the write operation happen parallelly on multiple worker machine

Solved Solved
0 21 4,154
1 ACCEPTED SOLUTION

The error message ValueError: Attempted to encode null for non-nullable field "suppressed" indicates that your pipeline is trying to process a record where the suppressed field is null (or None in Python), but the schema defined for your data expects this field to be non-nullable.

Possible solutions:

  1. Modify the schema to allow nulls: If null values are acceptable for the "suppressed" field, update the schema of your Business class (or the Beam schema) to specify that this field is nullable.

  2. Data cleaning before writing: Add a step to clean the data before writing it to Jdbc. This could involve:

    • Setting a default value: Replace null values in the "suppressed" field with a relevant default value (e.g., an empty string or a placeholder).
    • Filtering out records with null values: If records with null "suppressed" values are invalid, filter them out before reaching the WriteToJdbc transform.
  3. Debug and inspect data: Use logging or a ParDo transform to identify and understand why records have null "suppressed" values.

Example implementations:

  1. Data cleaning with a Map transform:
 
def clean_data(element):
  if element.suppressed is None:
    element = element._replace(suppressed='default_value')  # Replace with appropriate value
  return element

# In your pipeline
output_pcollection = (
    filtered_pcollection
    | 'Clean Data' >> beam.Map(clean_data)
    | 'Map to Business' >> beam.Map(lambda element: element).with_output_types(Business)
)
  1. Modifying Business class schema:
 
class Business(typing.NamedTuple):
  series_reference: str
  period: float
  data_value: float
  suppressed: typing.Optional[str]  # Now allows null values
  status: str
  units: str
  magnitude: int
  subject: str
  group: str
  series_title_1: str
  series_title_2: str
  series_title_3: str
  series_title_4: str
  series_title_5: str

Choose the best approach based on your specific data and use case. If null values are expected and acceptable for the "suppressed" field, modifying the schema is likely the simplest solution.

View solution in original post

21 REPLIES 21