Hello folks,
I have a vertex pipeline with custom components that runs daily. My pipeline has several components. Some components depends on others to finish, and some do not. There is a node in my pipeline that has no dependencie (I mean it does not rely on another component's output). Lets name it "text extraction".
The text extraction component usually takes less than 3 minutes to finish. Since it does not depend on any another component, it is usually lunched at the beginning on my pipeline. This what happens most of the time.
However, sometimes, text extraction is lunched at the end of the pipeline (and I can't see why) and runs infinitely. I have a watching application that cancels the pipeline when it exceeds the TTL of the pipeline (20 hours). So the situation is that I have a component that is supposed to start at the beginning of the pipeline and takes 3 minutes, and sometimes its starts at the end and is canceled after 8-ish hours (20h -8h pipeline)
Nota bene : The node is logging `Job is running.` in loop
Thanks for you help
EDIT :The text extraction component uses multi processing. Here is a code snippet of the used function
def series_parallel_apply(data: pd.Series, func, **args):
try:
assert isinstance(data, pd.Series)
except AssertionError:
raise (Exception("Input data should be a pandas series"))
cores = mp.cpu_count() - 1
data_split = np.array_split(data, cores)
pool = mp.Pool(cores)
data_return = pd.concat(
pool.map(partial(apply_series, func=func, **args), data_split),
ignore_index=False,
)
pool.close()
pool.join()
return data_return
Hello @LEltarr,
The issue occurs when an independent "text extraction" component that runs quickly sometimes launches late and hangs indefinitely due to multiprocessing race conditions in Vertex AI’s dynamic environment. The root cause involves unmanaged subprocesses (from mp.Pool) that lack timeouts or cleanup, leading to orphaned processes that block resources. To fix this, enforce hardened multiprocessing (static worker counts, timeouts, and forced termination) and add pipeline-level safeguards (explicit dependencies, resource limits, and component timeouts). Properly structured, the component will either succeed within minutes or fail fast, preventing pipeline-wide delays.
A few steps you may try:
Best regards,
Suwarna
Hi @LEltarr,
Welcome to Google Cloud Community!
In addition to @SuwarnaKale insights, ensuring detailed logging, monitoring, and idempotency for the component are crucial for robust performance. Detailed logging aids in diagnosing issues, monitoring helps in proactive detection, and idempotency ensures safe restarts. These practices enhance the component's reliability and maintain pipeline stability.
Was this helpful? If so, please accept this answer as “Solution”. If you need additional assistance, reply here within 2 business days and I’ll be happy to help.
Thank you for your answers. I investigated a little bit after reading your comment @SuwarnaKale
I found in multiprocessing doc's that `multiprocessing.cpu_count()` is not equivalent to the number of CPUs the current process can use. This makes a lot of sense and probably explains these randoms fails. However the documentation does not point to anything I could use in my current stack.
Documentation suggesest these two options :
- os.process_cpu_count() which was added in version 3.13 and my app runs on python 3.11
- `len(os.sched_getaffinity(0))` which is only available on some Unix platforms
After looking a bit more I found `len(psutil.Process().cpu_affinity())` may work here so it is going to be my next try. Does anyone have a best practise on how to multiprocess using the right number of CPUs the current process can use in Vertex AI ?
User | Count |
---|---|
2 | |
2 | |
1 | |
1 | |
1 |