I am developing a Dataflow Pipeline as follows:
1. Read files from GCS having common prefix into a Deferred Dataframe
2. Apply Transformations to the Deferred Dataframe
3. Convert Deferred Dataframe to a PCollection
4. Write this PCollection to Cloud SQL for PostgreSQL (I am stuck at this step)
Can someone please advice me how can the step 4 be accomplished, and will the write operation happen parallelly on multiple worker machine
Solved! Go to Solution.
The error message ValueError: Attempted to encode null for non-nullable field "suppressed"
indicates that your pipeline is trying to process a record where the suppressed
field is null
(or None
in Python), but the schema defined for your data expects this field to be non-nullable.
Possible solutions:
Modify the schema to allow nulls: If null values are acceptable for the "suppressed" field, update the schema of your Business class (or the Beam schema) to specify that this field is nullable.
Data cleaning before writing: Add a step to clean the data before writing it to Jdbc. This could involve:
Debug and inspect data: Use logging or a ParDo transform to identify and understand why records have null "suppressed" values.
Example implementations:
def clean_data(element):
if element.suppressed is None:
element = element._replace(suppressed='default_value') # Replace with appropriate value
return element
# In your pipeline
output_pcollection = (
filtered_pcollection
| 'Clean Data' >> beam.Map(clean_data)
| 'Map to Business' >> beam.Map(lambda element: element).with_output_types(Business)
)
class Business(typing.NamedTuple):
series_reference: str
period: float
data_value: float
suppressed: typing.Optional[str] # Now allows null values
status: str
units: str
magnitude: int
subject: str
group: str
series_title_1: str
series_title_2: str
series_title_3: str
series_title_4: str
series_title_5: str
Choose the best approach based on your specific data and use case. If null values are expected and acceptable for the "suppressed" field, modifying the schema is likely the simplest solution.