Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Node in Vertex Pipeline that runs infinitely

Hello folks,

I have a vertex pipeline with custom components that runs daily. My pipeline has several components. Some components depends on others to finish, and some do not. There is a node in my pipeline that has no dependencie (I mean it does not rely on another component's output). Lets name it "text extraction".

The text extraction component usually takes less than 3 minutes to finish. Since it does not depend on any another component, it is usually lunched at the beginning on my pipeline. This what happens most of the time.

However, sometimes, text extraction is lunched at the end of the pipeline (and I can't see why) and runs infinitely. I have a watching application that cancels the pipeline when it exceeds the TTL of the pipeline (20 hours). So the situation is that I have a component that is supposed to start at the beginning of the pipeline and takes 3 minutes, and sometimes its starts at the end and is canceled after 8-ish hours (20h -8h pipeline)

Nota bene : The node is logging `Job is running.` in loop

Thanks for you help

EDIT :The text extraction component uses multi processing. Here is a code snippet of the used function

def series_parallel_apply(data: pd.Series, func, **args):
    try:
        assert isinstance(data, pd.Series)
    except AssertionError:
        raise (Exception("Input data should be a pandas series"))
    cores = mp.cpu_count() - 1
    data_split = np.array_split(data, cores)
    pool = mp.Pool(cores)
    data_return = pd.concat(
        pool.map(partial(apply_series, func=func, **args), data_split),
        ignore_index=False,
    )
    pool.close()
    pool.join()
    return data_return
1 3 134
3 REPLIES 3

Hello @LEltarr,

The issue occurs when an independent "text extraction" component that runs quickly sometimes launches late and hangs indefinitely due to multiprocessing race conditions in Vertex AI’s dynamic environment. The root cause involves unmanaged subprocesses (from mp.Pool) that lack timeouts or cleanup, leading to orphaned processes that block resources. To fix this, enforce hardened multiprocessing (static worker counts, timeouts, and forced termination) and add pipeline-level safeguards (explicit dependencies, resource limits, and component timeouts). Properly structured, the component will either succeed within minutes or fail fast, preventing pipeline-wide delays. 

A few steps you may try: 

  • Replace mp.cpu_count() with a fixed worker count. 
  • Add timeouts and forceful pool termination. 
  • Use Vertex AI’s set_timeout() and resource constraints. 
  • Enforce execution order via dummy dependencies. 

Best regards,

Suwarna

Hi @LEltarr,

Welcome to Google Cloud Community!

In addition to @SuwarnaKale insights, ensuring detailed logging, monitoring, and idempotency for the component are crucial for robust performance. Detailed logging aids in diagnosing issues, monitoring helps in proactive detection, and idempotency ensures safe restarts. These practices enhance the component's reliability and maintain pipeline stability.

Was this helpful? If so, please accept this answer as “Solution”. If you need additional assistance, reply here within 2 business days and I’ll be happy to help.

Thank you for your answers. I investigated a little bit after reading your comment @SuwarnaKale

I found in multiprocessing doc's that `multiprocessing.cpu_count()`  is not equivalent to the number of CPUs the current process can use. This makes a lot of sense and probably explains these randoms fails. However the documentation does not point to anything I could use in my current stack.

Documentation suggesest these two options :

os.process_cpu_count() which was added in version 3.13 and my app runs on python 3.11

- `len(os.sched_getaffinity(0))` which is only available on some Unix platforms

 

After looking a bit more I found `len(psutil.Process().cpu_affinity())` may work here so it is going to be my next try. Does anyone have a best practise on how to multiprocess using the right number of CPUs the current process can use in Vertex AI ?