Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk): support dynamic machine type parameters in pipeline task setters #11097

Merged
merged 55 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
5fa24e7
temp title: change title
KevinGrantLee Aug 13, 2024
b8a1d72
add release notes
KevinGrantLee Aug 13, 2024
513d46b
formatting
KevinGrantLee Aug 13, 2024
e55c330
feat(backend): move comp logic to workflow params (#10979)
zazulam Aug 8, 2024
a503083
feat(component): internal
Aug 8, 2024
0aa54ee
feat(components): internal
Aug 9, 2024
eb47559
fix(components): Fix to model batch explanation component for Structu…
Aug 10, 2024
b632f1d
feat(components): Support dynamic values for boot_disk_type, boot_dis…
Aug 12, 2024
a188e54
chore: Upgrade Argo to v3.4.17 (#10978)
gmfrasca Aug 14, 2024
90df1c4
test: Moved kubeflow-pipelines-manifests to GitHub Actions (#11066)
VaniHaripriya Aug 14, 2024
79becef
fix: re-enable exit hanler test. (#11100)
liavweiss Aug 15, 2024
12aae02
fix(frontend): retrieve archived logs from correct location (#11010)
droctothorpe Aug 15, 2024
ae3e33c
feat(component): internal
Aug 16, 2024
27595c3
feat(component): internal
Aug 16, 2024
48cb7dd
chore(components): GCPC 2.16.1 Release
Aug 16, 2024
c736371
test: Fail fast when image build fails on tests #11102 (#11115)
ElayAharoni Aug 20, 2024
370b449
fix(components): Use instance.target_field_name format for text-bison…
Aug 21, 2024
65fa56a
chore: Renamed GitHub workflows from *.yaml to *.yml for consistency …
hbelmiro Aug 21, 2024
b0f4dfe
Fix view edit cluster roles (#11067)
papagala Aug 21, 2024
e32f995
fix(components): Pass moddel name to eval_runner to process batch pre…
Aug 21, 2024
a348039
feat(components): release LLM Model Evaluation image version v0.7
jsondai Aug 22, 2024
b75b210
chore: Adding @DharmitD to SDK reviewers (#11131)
DharmitD Aug 22, 2024
1e18f45
test: Kubeflow Pipelines V2 integration Tests (#11125)
diegolovison Aug 23, 2024
f71cecc
chore: Add make targets for building driver and launcher images (#11103)
gmfrasca Aug 23, 2024
c8c3edd
feat(Backend + SDK): Update kfp backend and kubernetes sdk to support…
gregsheremeta Aug 23, 2024
c1cdc1b
docs:fixing broken links in readme (#11108)
Fiona-Waters Aug 23, 2024
4a60449
chore(deps): bump micromatch from 4.0.5 to 4.0.8 in /test/frontend-in…
dependabot[bot] Aug 26, 2024
d982b0c
Fix: Basic sample tests - sequential is flaky (#11138)
diegolovison Aug 27, 2024
2c563d0
chore: Wrapped "Failed GetContextByTypeAndName" error for better trou…
hbelmiro Aug 28, 2024
60b1d5d
chore(components): Update AutoSxS and RLHF image tags
TheMichaelHu Aug 28, 2024
245bafe
test: Improvements to wait_for_pods function (#11162)
hbelmiro Sep 3, 2024
4caba7b
fix(frontend): fixes filter pipeline text box shows error when typing…
ElayAharoni Sep 3, 2024
e204552
correct artifact preview behavior in UI (#11059)
HumairAK Sep 3, 2024
18e0695
chore: Added DCO link to PR template (#11176)
hbelmiro Sep 6, 2024
20ad0f5
chore(backend): Update driver and launcher licenses (#11177)
chensun Sep 6, 2024
b2f3191
chore(backend): update driver and launcher default images (#11178)
chensun Sep 6, 2024
2e7261d
chore: Add instructions for releasing driver and launcher images (#11…
chensun Sep 6, 2024
17b55a3
test: Fixed `kfp-runtime-tests` to run on master branch (#11158)
hbelmiro Sep 6, 2024
066e8b5
(fix): reduce executor logs (#11169)
HumairAK Sep 6, 2024
88db3c7
chore: add PaulinaPacyna and ouadakarim as reviewers (#11180)
chensun Sep 6, 2024
d02afc2
test: Move run-all-gcpc-modules to GitHub Actions (#11157)
aman23bedi Sep 6, 2024
765d766
fix(sdk): Kfp support for pip trusted host (#11151)
diegolovison Sep 6, 2024
bd3bb59
chore(sdk): Loosening kubernetes dependency constraint (#11079)
egeucak Sep 6, 2024
c2d86cf
chore: Remove unwanted Frontend test files (#10973)
DharmitD Sep 6, 2024
d9cc5ee
fix(ui): fixes empty string value in pipeline parameters (#11175)
jan-stanek Sep 6, 2024
7ad84ca
chore(backend): update driver and launcher default images (#11182)
chensun Sep 6, 2024
bc1e41b
chore(release): bumped version to 2.3.0
chensun Sep 6, 2024
4e4baa2
chore: Update RELEASE.md to remove obsolete instructions (#11183)
chensun Sep 6, 2024
ac7c802
chore: Release kfp-pipeline-spec 0.4.0 (#11189)
chensun Sep 9, 2024
3655087
chore: release kfp-kubernetes 1.3.0 (#11190)
chensun Sep 9, 2024
b626fc4
chore: update kfp-kubernetes release scripts and instructions (#11191)
chensun Sep 9, 2024
8f9c8aa
feat(sdk)!: Pin kfp-pipeline-spec==0.4.0, kfp-server-api>=2.1.0,<2.4.…
chensun Sep 9, 2024
b653a9d
chore(sdk): release KFP SDK 2.9.0 (#11193)
chensun Sep 9, 2024
7a183c0
Delete test pipelines as they are duplicate with
KevinGrantLee Sep 17, 2024
11c464b
Merge branch 'master' of https://github.com/kubeflow/pipelines into t…
KevinGrantLee Sep 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Current Version (in development)

## Features
* Support dynamic machine type parameters in pipeline task setters. [\#11097](https://github.com/kubeflow/pipelines/pull/11097)

## Breaking changes

Expand Down
24 changes: 12 additions & 12 deletions sdk/python/kfp/compiler/compiler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3382,31 +3382,31 @@ def simple_pipeline():
['exec-return-1']['container'])

self.assertEqual(
5, dict_format['deploymentSpec']['executors']['exec-return-1-2']
['container']['resources']['cpuLimit'])
'5', dict_format['deploymentSpec']['executors']['exec-return-1-2']
['container']['resources']['resourceCpuLimit'])
self.assertNotIn(
'memoryLimit', dict_format['deploymentSpec']['executors']
['exec-return-1-2']['container']['resources'])

self.assertEqual(
50, dict_format['deploymentSpec']['executors']['exec-return-1-3']
['container']['resources']['memoryLimit'])
'50G', dict_format['deploymentSpec']['executors']['exec-return-1-3']
['container']['resources']['resourceMemoryLimit'])
self.assertNotIn(
'cpuLimit', dict_format['deploymentSpec']['executors']
['exec-return-1-3']['container']['resources'])

self.assertEqual(
2, dict_format['deploymentSpec']['executors']['exec-return-1-4']
['container']['resources']['cpuRequest'])
'2', dict_format['deploymentSpec']['executors']['exec-return-1-4']
['container']['resources']['resourceCpuRequest'])
self.assertEqual(
5, dict_format['deploymentSpec']['executors']['exec-return-1-4']
['container']['resources']['cpuLimit'])
'5', dict_format['deploymentSpec']['executors']['exec-return-1-4']
['container']['resources']['resourceCpuLimit'])
self.assertEqual(
4, dict_format['deploymentSpec']['executors']['exec-return-1-4']
['container']['resources']['memoryRequest'])
'4G', dict_format['deploymentSpec']['executors']['exec-return-1-4']
['container']['resources']['resourceMemoryRequest'])
self.assertEqual(
50, dict_format['deploymentSpec']['executors']['exec-return-1-4']
['container']['resources']['memoryLimit'])
'50G', dict_format['deploymentSpec']['executors']['exec-return-1-4']
['container']['resources']['resourceMemoryLimit'])


class TestPlatformConfig(unittest.TestCase):
Expand Down
49 changes: 39 additions & 10 deletions sdk/python/kfp/compiler/pipeline_spec_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,12 @@ def build_task_spec_for_task(
pipeline_task_spec.retry_policy.CopyFrom(
task._task_spec.retry_policy.to_proto())

# Inject resource fields into inputs
if task.container_spec and task.container_spec.resources:
for key, val in task.container_spec.resources.__dict__.items():
if val and pipeline_channel.extract_pipeline_channels_from_any(val):
task.inputs[key] = val

for input_name, input_value in task.inputs.items():
# Since LoopParameterArgument and LoopArtifactArgument and LoopArgumentVariable are narrower
# types than PipelineParameterChannel, start with them.
Expand Down Expand Up @@ -607,6 +613,24 @@ def build_container_spec_for_task(
Returns:
A PipelineContainerSpec object for the task.
"""

def convert_to_placeholder(input_value: str) -> str:
"""Checks if input is a pipeline channel and if so, converts to
compiler injected input name."""
pipeline_channels = (
pipeline_channel.extract_pipeline_channels_from_any(input_value))
if pipeline_channels:
assert len(pipeline_channels) == 1
channel = pipeline_channels[0]
additional_input_name = (
compiler_utils.additional_input_name_for_pipeline_channel(
channel))
additional_input_placeholder = placeholders.InputValuePlaceholder(
additional_input_name)._to_string()
input_value = input_value.replace(channel.pattern,
additional_input_placeholder)
return input_value

container_spec = (
pipeline_spec_pb2.PipelineDeploymentConfig.PipelineContainerSpec(
image=task.container_spec.image,
Expand All @@ -620,23 +644,28 @@ def build_container_spec_for_task(

if task.container_spec.resources is not None:
if task.container_spec.resources.cpu_request is not None:
container_spec.resources.cpu_request = (
task.container_spec.resources.cpu_request)
container_spec.resources.resource_cpu_request = (
convert_to_placeholder(
task.container_spec.resources.cpu_request))
if task.container_spec.resources.cpu_limit is not None:
container_spec.resources.cpu_limit = (
task.container_spec.resources.cpu_limit)
container_spec.resources.resource_cpu_limit = (
convert_to_placeholder(task.container_spec.resources.cpu_limit))
if task.container_spec.resources.memory_request is not None:
container_spec.resources.memory_request = (
task.container_spec.resources.memory_request)
container_spec.resources.resource_memory_request = (
convert_to_placeholder(
task.container_spec.resources.memory_request))
if task.container_spec.resources.memory_limit is not None:
container_spec.resources.memory_limit = (
task.container_spec.resources.memory_limit)
container_spec.resources.resource_memory_limit = (
convert_to_placeholder(
task.container_spec.resources.memory_limit))
if task.container_spec.resources.accelerator_count is not None:
container_spec.resources.accelerator.CopyFrom(
pipeline_spec_pb2.PipelineDeploymentConfig.PipelineContainerSpec
.ResourceSpec.AcceleratorConfig(
type=task.container_spec.resources.accelerator_type,
count=task.container_spec.resources.accelerator_count,
resource_type=convert_to_placeholder(
task.container_spec.resources.accelerator_type),
resource_count=convert_to_placeholder(
task.container_spec.resources.accelerator_count),
))

return container_spec
Expand Down
116 changes: 57 additions & 59 deletions sdk/python/kfp/dsl/pipeline_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,9 +321,9 @@ def _ensure_container_spec_exists(self) -> None:
f'{caller_method_name} can only be used on single-step components, not pipelines used as components, or special components like importers.'
)

def _validate_cpu_request_limit(self, cpu: str) -> float:
def _validate_cpu_request_limit(self, cpu: str) -> str:
"""Validates cpu request/limit string and converts to its numeric
value.
string value.

Args:
cpu: CPU requests or limits. This string should be a number or a
Expand All @@ -335,17 +335,22 @@ def _validate_cpu_request_limit(self, cpu: str) -> float:
ValueError if the cpu request/limit string value is invalid.

Returns:
The numeric value (float) of the cpu request/limit.
The numeric string of the cpu request/limit.
"""
if re.match(r'([0-9]*[.])?[0-9]+m?$', cpu) is None:
raise ValueError(
'Invalid cpu string. Should be float or integer, or integer'
' followed by "m".')

return float(cpu[:-1]) / 1000 if cpu.endswith('m') else float(cpu)
if isinstance(cpu, pipeline_channel.PipelineChannel):
cpu = str(cpu)
else:
if re.match(r'([0-9]*[.])?[0-9]+m?$', cpu) is None:
raise ValueError(
'Invalid cpu string. Should be float or integer, or integer'
' followed by "m".')
return cpu

@block_if_final()
def set_cpu_request(self, cpu: str) -> 'PipelineTask':
def set_cpu_request(
self,
cpu: Union[str,
pipeline_channel.PipelineChannel]) -> 'PipelineTask':
"""Sets CPU request (minimum) for the task.

Args:
Expand All @@ -370,7 +375,10 @@ def set_cpu_request(self, cpu: str) -> 'PipelineTask':
return self

@block_if_final()
def set_cpu_limit(self, cpu: str) -> 'PipelineTask':
def set_cpu_limit(
self,
cpu: Union[str,
pipeline_channel.PipelineChannel]) -> 'PipelineTask':
"""Sets CPU limit (maximum) for the task.

Args:
Expand All @@ -395,7 +403,9 @@ def set_cpu_limit(self, cpu: str) -> 'PipelineTask':
return self

@block_if_final()
def set_accelerator_limit(self, limit: int) -> 'PipelineTask':
def set_accelerator_limit(
self, limit: Union[int, str,
pipeline_channel.PipelineChannel]) -> 'PipelineTask':
"""Sets accelerator limit (maximum) for the task. Only applies if
accelerator type is also set via .set_accelerator_type().

Expand All @@ -406,11 +416,15 @@ def set_accelerator_limit(self, limit: int) -> 'PipelineTask':
Self return to allow chained setting calls.
"""
self._ensure_container_spec_exists()

if isinstance(limit, str):
if re.match(r'[1-9]\d*$', limit) is None:
raise ValueError(f'{"limit"!r} must be positive integer.')
limit = int(limit)
if isinstance(limit, pipeline_channel.PipelineChannel):
limit = str(limit)
else:
if isinstance(limit, int):
limit = str(limit)
if isinstance(limit, str) and re.match(r'^0$|^1$|^2$|^4$|^8$|^16$',
limit) is None:
raise ValueError(
f'{"limit"!r} must be one of 0, 1, 2, 4, 8, 16.')

if self.container_spec.resources is not None:
self.container_spec.resources.accelerator_count = limit
Expand Down Expand Up @@ -438,9 +452,9 @@ def set_gpu_limit(self, gpu: str) -> 'PipelineTask':
category=DeprecationWarning)
return self.set_accelerator_limit(gpu)

def _validate_memory_request_limit(self, memory: str) -> float:
def _validate_memory_request_limit(self, memory: str) -> str:
"""Validates memory request/limit string and converts to its numeric
value.
string value.

Args:
memory: Memory requests or limits. This string should be a number or
Expand All @@ -451,47 +465,24 @@ def _validate_memory_request_limit(self, memory: str) -> float:
ValueError if the memory request/limit string value is invalid.

Returns:
The numeric value (float) of the memory request/limit.
The numeric string value of the memory request/limit.
"""
if re.match(r'^[0-9]+(E|Ei|P|Pi|T|Ti|G|Gi|M|Mi|K|Ki){0,1}$',
memory) is None:
raise ValueError(
'Invalid memory string. Should be a number or a number '
'followed by one of "E", "Ei", "P", "Pi", "T", "Ti", "G", '
'"Gi", "M", "Mi", "K", "Ki".')

if memory.endswith('E'):
memory = float(memory[:-1]) * constants._E / constants._G
elif memory.endswith('Ei'):
memory = float(memory[:-2]) * constants._EI / constants._G
elif memory.endswith('P'):
memory = float(memory[:-1]) * constants._P / constants._G
elif memory.endswith('Pi'):
memory = float(memory[:-2]) * constants._PI / constants._G
elif memory.endswith('T'):
memory = float(memory[:-1]) * constants._T / constants._G
elif memory.endswith('Ti'):
memory = float(memory[:-2]) * constants._TI / constants._G
elif memory.endswith('G'):
memory = float(memory[:-1])
elif memory.endswith('Gi'):
memory = float(memory[:-2]) * constants._GI / constants._G
elif memory.endswith('M'):
memory = float(memory[:-1]) * constants._M / constants._G
elif memory.endswith('Mi'):
memory = float(memory[:-2]) * constants._MI / constants._G
elif memory.endswith('K'):
memory = float(memory[:-1]) * constants._K / constants._G
elif memory.endswith('Ki'):
memory = float(memory[:-2]) * constants._KI / constants._G
if isinstance(memory, pipeline_channel.PipelineChannel):
memory = str(memory)
else:
# By default interpret as a plain integer, in the unit of Bytes.
memory = float(memory) / constants._G

if re.match(r'^[0-9]+(E|Ei|P|Pi|T|Ti|G|Gi|M|Mi|K|Ki){0,1}$',
memory) is None:
raise ValueError(
'Invalid memory string. Should be a number or a number '
'followed by one of "E", "Ei", "P", "Pi", "T", "Ti", "G", '
'"Gi", "M", "Mi", "K", "Ki".')
return memory

@block_if_final()
def set_memory_request(self, memory: str) -> 'PipelineTask':
def set_memory_request(
self,
memory: Union[str,
pipeline_channel.PipelineChannel]) -> 'PipelineTask':
"""Sets memory request (minimum) for the task.

Args:
Expand All @@ -515,7 +506,10 @@ def set_memory_request(self, memory: str) -> 'PipelineTask':
return self

@block_if_final()
def set_memory_limit(self, memory: str) -> 'PipelineTask':
def set_memory_limit(
self,
memory: Union[str,
pipeline_channel.PipelineChannel]) -> 'PipelineTask':
"""Sets memory limit (maximum) for the task.

Args:
Expand Down Expand Up @@ -579,7 +573,9 @@ def add_node_selector_constraint(self, accelerator: str) -> 'PipelineTask':
return self.set_accelerator_type(accelerator)

@block_if_final()
def set_accelerator_type(self, accelerator: str) -> 'PipelineTask':
def set_accelerator_type(
self, accelerator: Union[str, pipeline_channel.PipelineChannel]
) -> 'PipelineTask':
"""Sets accelerator type to use when executing this task.

Args:
Expand All @@ -589,14 +585,16 @@ def set_accelerator_type(self, accelerator: str) -> 'PipelineTask':
Self return to allow chained setting calls.
"""
self._ensure_container_spec_exists()
if isinstance(accelerator, pipeline_channel.PipelineChannel):
accelerator = str(accelerator)

if self.container_spec.resources is not None:
self.container_spec.resources.accelerator_type = accelerator
if self.container_spec.resources.accelerator_count is None:
self.container_spec.resources.accelerator_count = 1
self.container_spec.resources.accelerator_count = '1'
else:
self.container_spec.resources = structures.ResourceSpec(
accelerator_count=1, accelerator_type=accelerator)
accelerator_count='1', accelerator_type=accelerator)

return self

Expand Down
Loading
Loading