Hello,
I'm trying to run a CustomContainerTrainingJob with the Python SDK on g2-standard-24 machines with 2 NVIDIA_L4 GPUs each.
I have a custom training script which can be seen below:
import argparse
from google.cloud import storage
import pickle
from dask.distributed import Client, wait
import json
import os
import time
import subprocess
from cuml.dask.ensemble import RandomForestClassifier as cuRFC_mg
from cuml.metrics import accuracy_score
import cudf
import dask_cudf
from cuml.dask.common import utils as dask_utils
cudf.pandas.install()
import pandas as pd
def get_chief_ip(cluster_config_dict):
if 'workerpool0' in cluster_config_dict['cluster']:
ip_address = cluster_config_dict['cluster']['workerpool0'][0].split(":")[0]
else:
# if the job is not distributed, 'chief' will be populated instead of
# workerpool0.
ip_address = cluster_config_dict['cluster']['chief'][0].split(":")[0]
print('The ip address of workerpool 0 is : {}'.format(ip_address))
return ip_address
def get_chief_port(cluster_config_dict):
if "open_ports" in cluster_config_dict:
port = cluster_config_dict['open_ports'][0]
else:
# Use any port for the non-distributed job.
port = <port_no>
print("The open port is: {}".format(port))
return port
def run_training():
parser = argparse.ArgumentParser()
parser.add_argument("--encoded_data")
parser.add_argument("--artifact_storge_location")
args = parser.parse_args()
cluster_config_str = os.environ.get('CLUSTER_SPEC')
cluster_config_dict = json.loads(cluster_config_str)
print(json.dumps(cluster_config_dict, indent=2))
print('The workerpool type is:', flush=True)
print(cluster_config_dict['task']['type'], flush=True)
workerpool_type = cluster_config_dict['task']['type']
chief_ip = get_chief_ip(cluster_config_dict)
chief_port = get_chief_port(cluster_config_dict)
chief_address = "{}:{}".format(chief_ip, chief_port)
if workerpool_type == "workerpool0":
print('Running the dask scheduler.', flush=True)
scheduler_cmd_run = '/root/.local/bin/dask scheduler --dashboard --dashboard-address 8888 --port {}'.format(
chief_port)
subprocess.Popen(scheduler_cmd_run, shell=True, executable='/bin/bash')
print('Done the dask scheduler.', flush=True)
client = Client(chief_address, timeout=1200)
print('Waiting the scheduler to be connected.', flush=True)
client.wait_for_workers(1)
workers = client.has_what().keys()
n_workers = len(workers)
data_pdf = pd.read_csv(args.encoded_data)
cols_to_verify_against = [
'hour_part',
'dummy_start_msoa',
'gender_Female',
'gender_Male',
'gender_Unknown',
'journey_purpose_Commute',
'journey_purpose_Other',
'journey_purpose_direction_IB_HBO',
'journey_purpose_direction_IB_HBW',
'journey_purpose_direction_NHBO',
'journey_purpose_direction_NHBW',
'journey_purpose_direction_OB_HBO',
'journey_purpose_direction_OB_HBW',
'journey_mode_Other',
'journey_mode_Rail'
]
# Alternative, using a set to avoid duplicates if some columns already exist:
all_cols = list(set(data_pdf.columns).union(cols_to_verify_against))
for column in all_cols:
if column not in data_pdf.columns:
data_pdf[column] = 0
x = data_pdf[cols_to_verify_against]
x = x.fillna(value=0)
y = data_pdf['dummy_end_msoa']
X_train_dask = dask_cudf.from_cudf(cudf.from_pandas(x), npartitions=n_workers)
y_train_dask = dask_cudf.from_cudf(cudf.from_pandas(y), npartitions=n_workers)
X_train_dask, y_train_dask = \
dask_utils.persist_across_workers(client, [X_train_dask, y_train_dask], workers=workers)
cu_rf_mg = cuRFC_mg(n_estimators=25, max_depth=13, n_bins=15, n_streams=8)
cu_rf_mg.fit(X_train_dask, y_train_dask)
wait(cu_rf_mg.rfs)
prediction = cu_rf_mg.predict(x)
acc_score = accuracy_score(y, prediction)
print(f"Accuracy: {acc_score}", flush=True)
single_gpu_model = cu_rf_mg.get_combined_model()
with open("model.pkl", 'wb') as model_file:
pickle.dump(single_gpu_model, model_file)
bucket = storage.Client()
blob = storage.Blob.from_string(f"{args.artifact_storge_location}model/model.pkl", client=bucket)
blob.upload_from_filename('model.pkl')
time.sleep(60 * 10)
client.shutdown()
else:
print('Running the dask worker.', flush=True)
client = Client(chief_address, timeout=1200)
print('client: {}.'.format(client), flush=True)
worker_run_cmd = '/root/.local/bin/dask cuda worker {}'.format(chief_address)
subprocess.Popen(worker_run_cmd, shell=True, executable='/bin/bash')
print('Done with the dask worker.', flush=True)
# Waiting 10 mins to connect the Dask dashboard
time.sleep(60 * 10)
if __name__ == '__main__':
run_training()
I have created a custom training container that can be seen below which utilises the above training script:
FROM nvidia/cuda:12.8.0-runtime-ubuntu24.04
WORKDIR /root
COPY training/run_random_forest_train_vertex_ai.py /root/run_random_forest_train_vertex_ai.py
ENTRYPOINT ["python3", "/root/run_random_forest_train_vertex_ai.py"]
CMD ["--encoded_data=test", "--artifact_storge_location=test"]
This is stored in the Artifact registry once built and then I utilise the SDK with the below code to create the training pipeline:
aiplatform.init(project="<project_name>",
location="europe-west2",
staging_bucket="<bucket_name>/<folder_name>/")
cmd_args = [
"--encoded_data=" + str(
f"{folder_name_formatted_uk_encoded_data}/{file_name_identifier_encoded_data}"),
"--artifact_storge_location=" + str(f"{folder_name_formatted_model_artifacts}/")
]
job = aiplatform.CustomContainerTrainingJob(
display_name="<training_job_name>",
container_uri="<training_image>",
model_serving_container_image_uri="<serving_image>",
)
model = job.run(
model_display_name=model_display_name,
replica_count=3,
machine_type="g2-standard-24",
accelerator_type="NVIDIA_L4",
accelerator_count=2,
sync=True,
service_account=<sa_name>,
base_output_dir=f"{folder_name_formatted_model_artifacts}/",
args=cmd_args,
enable_dashboard_access=True,
enable_web_access=True
)
The job starts and then fails 2 times with the error seen below:
Then it fails for a final time and the status changes from pending to failed as per the below:
So far, I have tried:
Ultimately, I am well and truly blocked and not sure where else to turn, apart from redesigning my pipeline to not use vertex ai as the logs are simply no help at all.
If anyone has any suggestions please do let me know... I will attempt to try anything. If you want any more info, please also let me know.
Best,
Jack
Solved! Go to Solution.
Update: I contacted Google Support and raised a ticket for this and the error message they were able to see is below:
RunContainerError: failed to create containerd task: failed to create shim task: OCI runtime create failed: runc create failed: unable to start container process: exec: \"python\": executable file not found in $PATH: unknown" logger="UnhandledError""
It looks to me that this was due to an unhandled error, hence it showed up as an internal one. I have now refactored the docker file and all is working well.
Update: I have contacted Google Cloud support and raised a ticket and the error message Google engineers were able to see is below:
RunContainerError: failed to create containerd task: failed to create shim task: OCI runtime create failed: runc create failed: unable to start container process: exec: \"python\": executable file not found in $PATH: unknown" logger="UnhandledError""
I have now refactored my image and the training is working well. The reason for the issue was that the error message was classed as 'unhandled' and therefore reflected in the logs as an 'internal error'. I have asked if they plan to improve this in the future.
Update: I contacted Google Support and raised a ticket for this and the error message they were able to see is below:
RunContainerError: failed to create containerd task: failed to create shim task: OCI runtime create failed: runc create failed: unable to start container process: exec: \"python\": executable file not found in $PATH: unknown" logger="UnhandledError""
It looks to me that this was due to an unhandled error, hence it showed up as an internal one. I have now refactored the docker file and all is working well.
Update: I have contacted Google Cloud support and raised a ticket and the error message Google engineers were able to see is below:
RunContainerError: failed to create containerd task: failed to create shim task: OCI runtime create failed: runc create failed: unable to start container process: exec: \"python\": executable file not found in $PATH: unknown" logger="UnhandledError""
I have now refactored my image and the training is working well. The reason for the issue was that the error message was classed as 'unhandled' and therefore reflected in the logs as an 'internal error'. I have asked if they plan to improve this in the future.
User | Count |
---|---|
2 | |
2 | |
1 | |
1 | |
1 |