Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes after testing. #223

Merged
merged 3 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion kfp/kfp_ray_components/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ FROM docker.io/rayproject/ray:2.9.3-py310

ARG BUILD_DATE
ARG GIT_COMMIT
ARG KFP_v2

LABEL build-date=$BUILD_DATE
LABEL git-commit=$GIT_COMMIT
Expand All @@ -22,7 +23,7 @@ RUN cd python_apiserver_client && pip install --no-cache-dir -e .

COPY --chown=ray:users workflow_support_lib workflow_support_lib/
RUN cd workflow_support_lib && pip install --no-cache-dir -e .

ENV KFP_v2=$KFP_v2
# remove credentials-containing file
RUN rm requirements.txt
# components
Expand Down
13 changes: 12 additions & 1 deletion kfp/kfp_ray_components/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,25 @@ endif
#DOCKER_IMG=${DOCKER_HOSTNAME}/${DOCKER_NAMESPACE}/${DOCKER_IMAGE_NAME}:${DOCKER_IMAGE_VERSION}
DOCKER_IMG=$(DOCKER_LOCAL_IMAGE)

.PHONY: .kfp_image
.kfp_image:: # Must be called with a DOCKER_IMAGE= settings.
@# Help: Create the docker image $(DOCKER_LOCAL_IMAGE) and a tag for $(DOCKER_REMOTE_IMAGE)
$(DOCKER) build -t $(DOCKER_LOCAL_IMAGE) \
-f $(DOCKER_FILE) \
--build-arg EXTRA_INDEX_URL=$(EXTRA_INDEX_URL) \
--build-arg BASE_IMAGE=$(BASE_IMAGE) \
--build-arg BUILD_DATE=$(shell date -u +'%Y-%m-%dT%H:%M:%SZ') \
--build-arg KFP_v2=$(KFPv2) \
--build-arg GIT_COMMIT=$(shell git log -1 --format=%h) .
$(DOCKER) tag $(DOCKER_LOCAL_IMAGE) $(DOCKER_REMOTE_IMAGE)

.PHONY: .lib-src-image
.lib-src-image::
$(MAKE) .defaults.copy-lib LIB_PATH=$(DPK_RAY_LIB_DIR) LIB_NAME=data-processing-lib-ray
$(MAKE) .defaults.copy-lib LIB_PATH=$(DPK_PYTHON_LIB_DIR) LIB_NAME=data-processing-lib-python
$(MAKE) .defaults.copy-lib LIB_PATH=$(REPOROOT)/kfp/kfp_support_lib/python_apiserver_client LIB_NAME=python_apiserver_client
$(MAKE) .defaults.copy-lib LIB_PATH=$(REPOROOT)/kfp/kfp_support_lib/$(WORKFLOW_SUPPORT_LIB) LIB_NAME=workflow_support_lib
$(MAKE) .defaults.image
$(MAKE) .kfp_image
rm -rf data-processing-lib-ray
rm -rf data-processing-lib-python
rm -rf python_apiserver_client
Expand Down
2 changes: 1 addition & 1 deletion kfp/kfp_ray_components/createRayClusterComponent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ inputs:

implementation:
container:
image: quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.2.0-v2
image: quay.io/dataprep1/data-prep-kit/kfp-data-processing_v2:0.2.0-v2
# command is a list of strings (command-line arguments).
# The YAML language has two syntaxes for lists and you can use either of them.
# Here we use the "flow syntax" - comma-separated strings inside square brackets.
Expand Down
2 changes: 1 addition & 1 deletion kfp/kfp_ray_components/deleteRayClusterComponent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ inputs:

implementation:
container:
image: quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.2.0-v2
image: quay.io/dataprep1/data-prep-kit/kfp-data-processing_v2:0.2.0-v2
# command is a list of strings (command-line arguments).
# The YAML language has two syntaxes for lists and you can use either of them.
# Here we use the "flow syntax" - comma-separated strings inside square brackets.
Expand Down
2 changes: 1 addition & 1 deletion kfp/kfp_ray_components/executeRayJobComponent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ inputs:

implementation:
container:
image: quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.2.0-v2
image: quay.io/dataprep1/data-prep-kit/kfp-data-processing_v2:0.2.0-v2
# command is a list of strings (command-line arguments).
# The YAML language has two syntaxes for lists and you can use either of them.
# Here we use the "flow syntax" - comma-separated strings inside square brackets.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ inputs:

implementation:
container:
image: quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.2.0-v2
image: quay.io/dataprep1/data-prep-kit/kfp-data-processing_v2:0.2.0-v2
# command is a list of strings (command-line arguments).
# The YAML language has two syntaxes for lists and you can use either of them.
# Here we use the "flow syntax" - comma-separated strings inside square brackets.
Expand Down
2 changes: 1 addition & 1 deletion kfp/kfp_ray_components/executeSubWorkflowComponent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ outputs:

implementation:
container:
image: quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.2.0-v2
image: quay.io/dataprep1/data-prep-kit/kfp-data-processing_v2:0.2.0-v2
# command is a list of strings (command-line arguments).
# The YAML language has two syntaxes for lists, and you can use either of them.
# Here we use the "flow syntax" - comma-separated strings inside square brackets.
Expand Down
3 changes: 2 additions & 1 deletion kfp/kfp_ray_components/src/create_ray_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
import sys

kfp_v2 = os.getenv("KFP_v2", 0)
if kfp_v2 == 1:
print(kfp_v2)
if kfp_v2 == "1":
from workflow_support.runtime_utils import KFPUtils, RayRemoteJobs
print(f"Load KFPv2 libs")
else:
Expand Down
2 changes: 1 addition & 1 deletion kfp/kfp_ray_components/src/delete_ray_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import sys

kfp_v2 = os.getenv("KFP_v2", 0)
if kfp_v2 == 1:
if kfp_v2 == "1":
from workflow_support.runtime_utils import KFPUtils, RayRemoteJobs
print(f"Load KFPv2 libs")
else:
Expand Down
2 changes: 1 addition & 1 deletion kfp/kfp_ray_components/src/execute_ray_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import os

kfp_v2 = os.getenv("KFP_v2", 0)
if kfp_v2 == 1:
if kfp_v2 == "1":
from workflow_support.runtime_utils import KFPUtils, execute_ray_jobs
print(f"Load KFPv2 libs")
else:
Expand Down
2 changes: 1 addition & 1 deletion kfp/kfp_ray_components/src/execute_ray_job_multi_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import os

kfp_v2 = os.getenv("KFP_v2", 0)
if kfp_v2 == 1:
if kfp_v2 == "1":
from workflow_support.runtime_utils import KFPUtils, execute_ray_jobs
print(f"Load KFPv2 libs")
else:
Expand Down
1 change: 1 addition & 0 deletions kfp/kfp_support_lib/kfp_v2_workflow_support/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ authors = [
]
dependencies = [
"kfp==2.7.0",
"kfp-kubernetes==1.2.0",
"ray==2.9.3",
"requests",
"data-prep-toolkit==0.2.0",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from kfp_support.workflow_support.compile_utils.component import (
from workflow_support.compile_utils.component import (
ONE_HOUR_SEC,
ONE_DAY_SEC,
ONE_WEEK_SEC,
ComponentUtils
)
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

RUN_NAME = "KFP_RUN_NAME"

ONE_HOUR_SEC = 60 * 60
ONE_DAY_SEC = ONE_HOUR_SEC * 24
ONE_WEEK_SEC = ONE_DAY_SEC * 7

class ComponentUtils:
"""
Class containing methods supporting building pipelines
Expand Down Expand Up @@ -52,50 +56,3 @@ def set_s3_env_vars_to_component(
for env_name, _ in env2key.items():
env2key[prefix + "_" + env_name] = env2key.pop(env_name)
kubernetes.use_secret_as_env(task=task, secret_name='s3-secret', secret_key_to_env=env2key)

@staticmethod
def default_compute_execution_params(
worker_options: str, # ray worker configuration
actor_options: str, # cpus per actor
) -> str:
"""
This is the most simplistic transform execution parameters computation
:param worker_options: configuration of ray workers
:param actor_options: actor request requirements
:return: number of actors
"""
import sys

from data_processing.utils import GB, get_logger
from kfp_support.workflow_support.runtime_utils import KFPUtils

logger = get_logger(__name__)

# convert input
w_options = KFPUtils.load_from_json(worker_options.replace("'", '"'))
a_options = KFPUtils.load_from_json(actor_options.replace("'", '"'))
# Compute available cluster resources
cluster_cpu = w_options["replicas"] * w_options["cpu"]
cluster_mem = w_options["replicas"] * w_options["memory"]
cluster_gpu = w_options["replicas"] * w_options.get("gpu", 0.0)
logger.info(f"Cluster available CPUs {cluster_cpu}, Memory {cluster_mem}, GPUs {cluster_gpu}")
# compute number of actors
n_actors_cpu = int(cluster_cpu * 0.85 / a_options.get("num_cpus", 0.5))
n_actors_memory = int(cluster_mem * 0.85 / (a_options.get("memory", GB) / GB))
n_actors = min(n_actors_cpu, n_actors_memory)
# Check if we need gpu calculations as well
actor_gpu = a_options.get("num_gpus", 0)
if actor_gpu > 0:
n_actors_gpu = int(cluster_gpu / actor_gpu)
n_actors = min(n_actors, n_actors_gpu)
logger.info(f"Number of actors - {n_actors}")
if n_actors < 1:
logger.warning(
f"Not enough cpu/gpu/memory to run transform, "
f"required cpu {a_options.get('num_cpus', .5)}, available {cluster_cpu}, "
f"required memory {a_options.get('memory', 1)}, available {cluster_mem}, "
f"required cpu {actor_gpu}, available {cluster_gpu}"
)
sys.exit(1)

return str(n_actors)
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
from kfp_support.workflow_support.runtime_utils.kfp_utils import KFPUtils
from kfp_support.workflow_support.runtime_utils.remote_jobs_utils import RayRemoteJobs, execute_ray_jobs
from workflow_support.runtime_utils.kfp_utils import KFPUtils
from workflow_support.runtime_utils.remote_jobs_utils import RayRemoteJobs, execute_ray_jobs
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,50 @@ def load_from_json(js: str) -> dict[str, Any]:
except Exception as e:
logger.warning(f"Failed to load parameters {js} with error {e}")
sys.exit(1)

@staticmethod
def default_compute_execution_params(
worker_options: str, # ray worker configuration
actor_options: str, # cpus per actor
) -> str:
"""
This is the most simplistic transform execution parameters computation
:param worker_options: configuration of ray workers
:param actor_options: actor request requirements
:return: number of actors
"""
import sys

from data_processing.utils import GB, get_logger
from workflow_support.runtime_utils import KFPUtils

logger = get_logger(__name__)

# convert input
w_options = KFPUtils.load_from_json(worker_options.replace("'", '"'))
a_options = KFPUtils.load_from_json(actor_options.replace("'", '"'))
# Compute available cluster resources
cluster_cpu = w_options["replicas"] * w_options["cpu"]
cluster_mem = w_options["replicas"] * w_options["memory"]
cluster_gpu = w_options["replicas"] * w_options.get("gpu", 0.0)
logger.info(f"Cluster available CPUs {cluster_cpu}, Memory {cluster_mem}, GPUs {cluster_gpu}")
# compute number of actors
n_actors_cpu = int(cluster_cpu * 0.85 / a_options.get("num_cpus", 0.5))
n_actors_memory = int(cluster_mem * 0.85 / (a_options.get("memory", GB) / GB))
n_actors = min(n_actors_cpu, n_actors_memory)
# Check if we need gpu calculations as well
actor_gpu = a_options.get("num_gpus", 0)
if actor_gpu > 0:
n_actors_gpu = int(cluster_gpu / actor_gpu)
n_actors = min(n_actors, n_actors_gpu)
logger.info(f"Number of actors - {n_actors}")
if n_actors < 1:
logger.warning(
f"Not enough cpu/gpu/memory to run transform, "
f"required cpu {a_options.get('num_cpus', .5)}, available {cluster_cpu}, "
f"required memory {a_options.get('memory', 1)}, available {cluster_mem}, "
f"required cpu {actor_gpu}, available {cluster_gpu}"
)
sys.exit(1)

return str(n_actors)
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

from data_processing.data_access import DataAccess, DataAccessFactory
from data_processing.utils import ParamsUtils, get_logger
from kfp_support.api_server_client import KubeRayAPIs
from kfp.kfp_support_lib.python_apiserver_client.src.python_apiserver_client.params import (
from python_apiserver_client import KubeRayAPIs
from python_apiserver_client.params import (
DEFAULT_HEAD_START_PARAMS,
DEFAULT_WORKER_START_PARAMS,
Cluster,
Expand All @@ -30,7 +30,7 @@
environment_variables_decoder,
volume_decoder,
)
from kfp_support.workflow_support.runtime_utils import KFPUtils
from workflow_support.runtime_utils import KFPUtils
from ray.job_submission import JobStatus


Expand Down
10 changes: 5 additions & 5 deletions kfp/requirements.env
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ KFP_v2=2.7.0
KFP_v1=1.8.22

ifeq ($(KFPv2), 1)
KFP=$(KFP_v2)
WORKFLOW_SUPPORT_LIB=kfp_v2_workflow_support
KFP=$(KFP_v2)
WORKFLOW_SUPPORT_LIB=kfp_v2_workflow_support
else
KFP=$(KFP_v1)
WORKFLOW_SUPPORT_LIB=kfp_v1_workflow_support
endif
KFP=$(KFP_v1)
WORKFLOW_SUPPORT_LIB=kfp_v1_workflow_support
endif
18 changes: 16 additions & 2 deletions transforms/universal/noop/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,29 @@ workflow-venv:

.PHONY: workflow-build
workflow-build:
ifeq ($(KFPv2), 0)
$(MAKE) -C kfp_ray/v1 workflow-build
else
$(MAKE) -C kfp_ray/v2 workflow-build
endif


.PHONY: workflow-test
workflow-test:
$(MAKE) -C $(PIPELINE_PATH) workflow-test
ifeq ($(KFPv2), 0)
$(MAKE) -C kfp_ray/v2 workflow-test
else
$(MAKE) -C kfp_ray/v2 workflow-test
endif

.PHONY: workflow-upload
workflow-upload:
$(MAKE) -C $(PIPELINE_PATH) workflow-upload
ifeq ($(KFPv2), 0)
$(MAKE) -C kfp_ray/v1 workflow-upload
else
$(MAKE) -C kfp_ray/v2 workflow-upload
endif


.PHONY: workflow-reconcile-requirements
workflow-reconcile-requirements:
Expand Down
Loading