Hi community,
I have been trying to merge an unbounded pcollection and a bounded pcollection
The unbounded pcollection is ingested using pubsub topic while the bounded pcollection is loaded using ReadFromMongoDB. Anytime i run the script i have this errror .
Transform node AppliedPTransform([7]: Read documents providers/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/ProcessKeyedElements/GroupByKey/GroupByKey, _GroupByKeyOnly) was not replaced as expected.
My code look like this
p = beam.Pipeline(InteractiveRunner(), options=options)
topic = "projects/xx/topics/mongoCDCzz"
dataOne = (p | "Read document " >> ReadFromPubSub(topic=topic)
| "Load document" >> Map(json.loads)
| "Transform columns" >> Map(lambda element: (element.key, element))
| "Window" >> WindowInto(window.FixedWindows(5)))
MONGODB_URI = "mongodb+srv://user:pass@xxxx/xxx?"
dataTwo = (p |"Read documents providers" >> ReadFromMongoDB(uri=MONGODB_URI,db='Xxx',
coll='xxx',bucket_auto=True)
)
dataThree = (dataTwo | Map(lambda element: (element.key, element)))
cogbk = {"a":dataOne, "b": dataThree} | CoGroupByKey()
def process_data(unbounded_element, side_input_element):
# get the provider id
providerid = unbounded_element["provider_id"]
# get the city name
processed_data["city"] = side_input_element[providerid]
# Return the processed data
return processed_data
class ProcessDataFn(beam.DoFn):
def process(self, element, bounded_data=beam.DoFn.SideInputParam):
# Access the elements from the bounded side input
for side_input_element in bounded_data:
# Process the unbounded element and side input element
processed_data = process_data(element, side_input_element)
# Emit the processed data
yield processed_data
processed_data = dataOne| beam.ParDo(ProcessDataFn(), bounded_data=beam.pvalue.AsList(dataThree))
ib.show_graph(p)
ib.show()
ib.show(providers_unbounded)
Based on your error message, it seems like you are trying to perform a GroupByKey
operation on a PCollection
that is not properly keyed.
In Apache Beam, GroupByKey
operation is only applicable to PCollections
of key/value pairs. That is, the input to the GroupByKey
transform must be a PCollection
where each element is a key/value pair represented as a 2-tuple. The GroupByKey
transform outputs a new PCollection
where each element is a key and an iterable of all values associated with that key.
In your case, you are trying to apply CoGroupByKey
on a dictionary with "a" mapped to dataOne
and "b" mapped to dataThree
. But it's not clear if dataThree
is properly keyed. From the code, it seems that you are trying to key it with the line dataThree = (dataTwo | Map(lambda element: (element.key, element)))
but it's unclear if element.key
actually exists in your MongoDB data. If it doesn't, that could be the source of your problem.
Also, you are trying to perform a join between unbounded and bounded PCollections using the CoGroupByKey
operation. Joining unbounded and bounded PCollections is a complex issue. Apache Beam's model of dealing with bounded and unbounded collections is based on windows and watermarks, and the idea of event time vs. processing time. Depending on your use case, you might need to use a different approach, like a stateful ParDo
operation or a different kind of windowing strategy.
A potential solution could be to use side inputs. A side input is an additional input that your DoFn
can access each time it processes an element. This input could be a static value that doesn't change, such as a configuration parameter, or a PCollection
itself. When using a PCollection
as a side input, you can access all the elements in it. For bounded PCollection
, you could use it as a side input for your unbounded PCollection
processing.
If you are not sure about the keying of your data, I would suggest checking your MongoDB data and making sure that every document has the key field that you are using for keying. If it doesn't, you need to adjust your code accordingly.
Hi ms446,
I am still unable to fix the error. The data are keyed but the groupbykey doesn't work.
The error message you're encountering might be related to an internal failure in the Python DirectRunner of Apache Beam when it attempts to rewrite the transforms. The issue is that the Python DirectRunner does not support both streaming and non-streaming sources. This is a known issue and is currently unresolved according to the Apache Software Foundation's JIRA tracker
See the following for more details: https://issues.apache.org/jira/browse/BEAM-12586
While it's not ideal, a potential workaround is to use the Java DirectRunner if possible, as the same code works with it. Otherwise, you might need to wait for the issue to be resolved in a future version of Apache Beam.
As for merging two PCollections, the best supported join is when the two PCollections have a common key. You can use the CoGroup.join
transform if most of your data allows Apache Beam to figure out a schema. However, if you need to carry forward values in a time series for keys that have no data, you can use state and timers to generate the "missing" values. Keep in mind that the keys need to be carefully chosen since state and timers are per-key-and-window. The state and timers also work in batch mode, so this solution can be used for both batch and streaming scenarios.