Hi Folks! I would like to know if anyone as any idea of what could be happening here. I have a streaming job which works fine. However, memory utilziation on worker keeps growing forever, until the point it reaches the max capacity of the worker and then the worker is shut down and a new one is created. Pipeline does not fail, on these moments there is a latency in processing messages which can get to around 10 minutes, but then wverything returns to normal. Any idea what could be bad to generate this behaviour on memory utilization? It looks like memory is never flushed from processed messages.
I think to diagnose this would require detailed knowledge of the pipeline being executed. For example, if you are using a ParDo and have written a DoFn, then your DoFn logic could easily be increasing memory usage. I would suggest that you look and see if you can determine which transforms are running on which workers. That way you could constrain the examination to just a subset of steps. Then start looking at each of those steps and asking what are they doing.
Hi Kolban. I am running a bit big pipeline (cost restrictions on company required me to accumulate the ingestion of many pub sub subscriptions on only one streaming dataflow job).
Lots of map with python functions are being executed. Here is the code for processiong one of the subscriptions.
messages_points = (
pipeline
| "Read from Pub/Sub points"
>> beam.io.ReadFromPubSub(
subscription=input_subscription_points
).with_output_types(bytes)
| "UTF-8 bytes to string points"
>> beam.Map(lambda msg: msg.decode("utf-8"))
| "Parse JSON messages points" >> beam.Map(parse_json_message)
| "Transform fields points" >> beam.Map(transform_fields_points)
)
messages_points | "Update BT points" >> beam.Map(update_bt_points)
messages_points | "Update deliveries MongoDB points" >> beam.Map(
update_deliveries_points
)
messages_points | "Send to firestore" >> beam.Map(send_to_firestore)
messages_points_to_mysql = (
messages_points
| "Get points data points" >> beam.Map(get_package_data)
| "Get route data points" >> beam.Map(get_routes_data)
| "Get status update points" >> beam.Map(get_status_from_catalog)
)
messages_points_to_mysql | "Update MySql points" >> beam.Map(
update_mysql_points, role_id=role_id, secret_id=secret_id
)
messages_points_to_mysql | "Update delivery.packages" >> beam.Map(
update_delivery_packages_points
)
And, for example, here is the function get_package_data, executed on one of the maps:
def get_package_data(message: Dict):
if message == {}:
return {}
if message["status"] in ["COMPLETED", "FAILED", "INCOMPLETE"]:
if message["updatedFields"] is not None:
if "completedAt" not in message["updatedFields"].keys():
return {}
else:
return {}
client = bigtable.Client(project="ivoy-data", admin=True)
instance = client.instance("streaming-ivoy")
table = instance.table("routes.packages")
prefix = str(message["_id"])
end_key = prefix[:-1] + chr(ord(prefix[-1]) + 1)
row_set = RowSet()
row_set.add_row_range_from_keys(prefix.encode("utf-8"), end_key.encode("utf-8"))
rows = table.read_rows(row_set=row_set)
package_info = []
for row in rows:
status = row.cells["packages_info"]["status".encode("utf-8")][0].value.decode(
"utf-8"
)
packageId = row.cells["packages_info"]["packageId".encode("utf-8")][
0
].value.decode("utf-8")
package_info.append((packageId, status))
if package_info == []:
return {}
message["package_info"] = package_info
return message
I am collecting data from big table to enrich received messages. I wonder if the variables used to collect this data are being kept on workers and never flushed out of memory...
Anyway, how can I determine which transforms run on each worker?
We're having the same issue with batch pipelines. An answer from Google would be appreciated.
@jefmsantos I suggest using time windows as garbage collection can be affected by element bundles.
We're seeing the same issue (steadily increasing memory utilisation, to 100% when the worker restarts), with a very similar dataflow job. A functionally equivalent example (using apache-beam==2.48.0) :
import json
from typing import Dict
import apache_beam
from apache_beam import Pipeline
from apache_beam.io.gcp.bigquery_tools import RetryStrategy
def adjust_message(message: Dict) -> Dict:
if message["type"] == "foo":
# message["bar"] is a list of strings
message["bar"] = ",".join(message["bar"])
if "baz" in message and message["baz"] is None:
message["baz"] = []
return message
(
Pipeline()
| "Read from PubSub" >> apache_beam.io.ReadFromPubSub(subscription="my_subscription")
| "Deserialise JSON messages" >> apache_beam.Map(json.loads, parse_int=str)
| "Adjust messages" >> apache_beam.Map(adjust_message)
| "Write Rows to BigQuery" >> apache_beam.io.WriteToBigQuery("my_output_table")
)
We also run a version of this dataflow job without the "Adjust messages" step, which does not seem to suffer from this memory leak. All our jobs use a single worker.
@SuperCorks Could you perhaps elaborate how using a Window might help here?
@kolban To me it seems that the memory leak happens in adjust_messages, but I don't really have an idea how to fix this. What it's doing does not seem too crazy. Any help would be appreciated:)