Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Sync pipeline code change during soft code complete #26416

Merged
merged 26 commits into from
Sep 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
7528b12
sync pipeline changes under azure-ai-ml
zhengfeiwang Sep 23, 2022
879c380
sync pipeline unit test changes
zhengfeiwang Sep 23, 2022
fcea154
fix DSL breaking unit test
zhengfeiwang Sep 23, 2022
e1b0fc7
add missing file changes
zhengfeiwang Sep 23, 2022
68c1a14
skip breaking unit tests
zhengfeiwang Sep 23, 2022
468efaa
sync changes in component e2etest
zhengfeiwang Sep 23, 2022
c3c55c5
sync changes in DSL e2etest
zhengfeiwang Sep 23, 2022
92a77c9
sync changes in internal e2etest
zhengfeiwang Sep 23, 2022
3872c77
sync changes in schedule e2etest
zhengfeiwang Sep 23, 2022
2828a8a
sync changes in pipeline job e2etest
zhengfeiwang Sep 23, 2022
55c16f6
add "dowhile" in cspell.json
zhengfeiwang Sep 23, 2022
64b04b1
update pipeline_job recording & skip download test
zhengfeiwang Sep 23, 2022
cc233dc
update component recording
zhengfeiwang Sep 23, 2022
9a59b30
revert utils.py changes
zhengfeiwang Sep 23, 2022
b5811ef
update DSL test recording
zhengfeiwang Sep 23, 2022
6775ddb
Merge branch 'main' into sync-pipeline-change
zhengfeiwang Sep 23, 2022
7a26bb4
update tests
zhengfeiwang Sep 23, 2022
d569dbb
update recordings
zhengfeiwang Sep 23, 2022
0fc17bb
Merge branch 'main' into sync-pipeline-change
zhengfeiwang Sep 26, 2022
4ffcac5
try to update recordings
zhengfeiwang Sep 26, 2022
55d56c3
force Linux line separator and update recordings
zhengfeiwang Sep 26, 2022
49da5a9
update recording for "test_parallel_components_with_tabular_input_pip…
zhengfeiwang Sep 26, 2022
51931fb
update recording for "test_import_pipeline_submit_cancel"
zhengfeiwang Sep 26, 2022
23739f5
add .gitattributes for breaking tests in windows
zhengfeiwang Sep 26, 2022
a716526
update recordings
zhengfeiwang Sep 26, 2022
4440d7c
update to fix breaking test
zhengfeiwang Sep 26, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .vscode/cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@
"docfx",
"dont",
"dotenv",
"dowhile",
"dpkg",
"dtlk",
"dtlksd",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from azure.ai.ml._schema import StringTransformedEnum, UnionField
from azure.ai.ml._schema.component.input_output import InputPortSchema, OutputPortSchema, ParameterSchema
from azure.ai.ml._schema.core.fields import DumpableFloatField, DumpableIntegerField


class InternalInputPortSchema(InputPortSchema):
Expand Down Expand Up @@ -62,13 +63,37 @@ class InternalEnumParameterSchema(ParameterSchema):
required=True,
data_key="type",
)
enum = fields.List(UnionField([fields.Str(), fields.Number(), fields.Bool()]))
default = UnionField(
[
DumpableIntegerField(strict=True),
# Use DumpableFloatField to avoid '1'(str) serialized to 1.0(float)
DumpableFloatField(),
# put string schema after Int and Float to make sure they won't dump to string
fields.Str(),
# fields.Bool comes last since it'll parse anything non-falsy to True
fields.Bool(),
],
)
enum = fields.List(
UnionField(
[
DumpableIntegerField(strict=True),
# Use DumpableFloatField to avoid '1'(str) serialized to 1.0(float)
DumpableFloatField(),
# put string schema after Int and Float to make sure they won't dump to string
fields.Str(),
# fields.Bool comes last since it'll parse anything non-falsy to True
fields.Bool(),
]
),
required=True,
)

@post_dump
@post_load
def enum_value_to_string(self, data, **kwargs): # pylint: disable=unused-argument, disable=no-self-use
if "enum" in data:
data["enum"] = list(map(str, data["enum"]))
if "default" in data:
if "default" in data and data["default"] is not None:
data["default"] = str(data["default"])
return data
1 change: 0 additions & 1 deletion sdk/ml/azure-ai-ml/azure/ai/ml/_internal/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
Scope,
)
from azure.ai.ml._schema import NestedField
from azure.ai.ml.constants._component import IOConstants
from azure.ai.ml.entities._component.component_factory import component_factory
from azure.ai.ml.entities._job.pipeline._load_component import pipeline_node_factory

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# ---------------------------------------------------------
from typing import Dict, Optional, Union

from azure.ai.ml import Input
from azure.ai.ml import Input, Output
from azure.ai.ml.constants._component import ComponentParameterTypes, IOConstants

_INPUT_TYPE_ENUM = "enum"
Expand All @@ -23,6 +23,10 @@ class InternalInput(Input):
- Enum, enum (new)
"""

def __init__(self, datastore_mode=None, **kwargs):
self.datastore_mode = datastore_mode
super().__init__(**kwargs)

@property
def _allowed_types(self):
if self._lower_type == _INPUT_TYPE_ENUM:
Expand Down Expand Up @@ -86,7 +90,7 @@ def _get_python_builtin_type_str(self) -> str:
return super()._get_python_builtin_type_str()

@classmethod
def _cast_from_input_or_dict(cls, _input: Union[Input, Dict]) -> "InternalInput":
def _cast_from_input_or_dict(cls, _input: Union[Input, Dict]) -> Optional["InternalInput"]:
"""Cast from Input or Dict to InternalInput. Do not guarantee to create a new object."""
if _input is None:
return None
Expand All @@ -98,3 +102,18 @@ def _cast_from_input_or_dict(cls, _input: Union[Input, Dict]) -> "InternalInput"
_input.__class__ = InternalInput
return _input
return InternalInput(**_input)


class InternalOutput(Output):
@classmethod
def _cast_from_output_or_dict(cls, _output: Union[Output, Dict]) -> Optional["InternalOutput"]:
if _output is None:
return None
if isinstance(_output, InternalOutput):
return _output
if isinstance(_output, Output):
# do force cast directly as there is no new field added in InternalInput
# need to change the logic if new field is added
_output.__class__ = InternalOutput
return _output
return InternalOutput(**_output)
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from azure.ai.ml._schema.core.fields import DistributionField
from azure.ai.ml.entities import CommandJobLimits, JobResourceConfiguration
from azure.ai.ml.entities._job.distribution import DistributionConfiguration
from azure.ai.ml.entities._util import get_rest_dict
from azure.ai.ml.entities._util import get_rest_dict_for_node_attrs


class Command(InternalBaseNode):
Expand Down Expand Up @@ -87,11 +87,10 @@ def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema

def _to_rest_object(self, **kwargs) -> dict:
rest_obj = super()._to_rest_object(**kwargs)
limits = self.limits._to_rest_object() if self.limits else None
rest_obj.update(
dict(
limits=get_rest_dict(limits),
resources=get_rest_dict(self.resources, clear_empty_value=True),
limits=get_rest_dict_for_node_attrs(self.limits, clear_empty_value=True),
resources=get_rest_dict_for_node_attrs(self.resources, clear_empty_value=True),
)
)
return rest_obj
Expand Down Expand Up @@ -174,7 +173,7 @@ def _to_rest_object(self, **kwargs) -> dict:
distribution = self.distribution._to_rest_object() if self.distribution else None # pylint: disable=no-member
rest_obj.update(
dict(
distribution=get_rest_dict(distribution),
distribution=get_rest_dict_for_node_attrs(distribution),
)
)
return rest_obj
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from ... import Input, Output
from .._schema.component import InternalBaseComponentSchema
from ._additional_includes import _AdditionalIncludes
from ._input_outputs import InternalInput
from ._input_outputs import InternalInput, InternalOutput
from .environment import InternalEnvironment
from .node import InternalBaseNode

Expand Down Expand Up @@ -132,11 +132,12 @@ def __init__(

@classmethod
def _build_io(cls, io_dict: Union[Dict, Input, Output], is_input: bool):
if not is_input:
return super()._build_io(io_dict, is_input)
component_io = {}
for name, port in io_dict.items():
component_io[name] = InternalInput._cast_from_input_or_dict(port)
if is_input:
component_io[name] = InternalInput._cast_from_input_or_dict(port)
else:
component_io[name] = InternalOutput._cast_from_output_or_dict(port)
return component_io

def _post_process_internal_inputs_outputs(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def __init__(
self.min_instance_type_count = min_instance_type_count


class AISuperComputerConfiguration(BaseProperty):
class AISuperComputerConfiguration(BaseProperty): # pylint: disable=too-many-instance-attributes
"""A class to manage AI Super Computer Configuration."""

def __init__(
Expand Down
7 changes: 7 additions & 0 deletions sdk/ml/azure-ai-ml/azure/ai/ml/_ml_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,13 @@ def __init__(
**kwargs,
)

self._service_client_10_2022_preview = ServiceClient102022(
credential=self._credential,
subscription_id=self._operation_scope._subscription_id,
base_url=base_url,
**kwargs,
)

self._workspaces = WorkspaceOperations(
self._operation_scope,
self._rp_service_client,
Expand Down
6 changes: 6 additions & 0 deletions sdk/ml/azure-ai-ml/azure/ai/ml/_schema/compute/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ class RecurrenceScheduleSchema(metaclass=PatchedSchemaMeta):
hours = fields.List(fields.Int())
minutes = fields.List(fields.Int())

@post_load
def make(self, data, **kwargs):
from azure.ai.ml.entities import RecurrencePattern

return RecurrencePattern(**data)


class RecurrenceTriggerSchema(BaseTriggerSchema):
type = StringTransformedEnum(required=True, allowed_values=TriggerType.RECURRENCE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,14 @@ class BaseNodeSchema(PathAwareSchema):
keys=fields.Str(),
values=UnionField([OutputBindingStr, NestedField(OutputSchema)], allow_none=True),
)
properties = fields.Dict(keys=fields.Str(), values=fields.Str(allow_none=True))

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
support_data_binding_expression_for_fields(self, ["type"])
# data binding expression is not supported inside component field, while validation error
# message will be very long when component is an object as error message will include
# str(component), so just add component to skip list. The same to trial in Sweep.
support_data_binding_expression_for_fields(self, ["type", "component", "trial"])

@post_dump(pass_original=True)
def add_user_setting_attr_dict(self, data, original_data, **kwargs): # pylint: disable=unused-argument
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from marshmallow import fields, post_dump, post_load, validate

from azure.ai.ml._restclient.v2022_06_01_preview.models import RecurrenceFrequency, TriggerType, WeekDay
from azure.ai.ml._restclient.v2022_10_01_preview.models import RecurrenceFrequency, TriggerType, WeekDay
from azure.ai.ml._schema.core.fields import (
DateTimeStr,
DumpableIntegerField,
Expand Down Expand Up @@ -53,6 +53,12 @@ class RecurrencePatternSchema(metaclass=PatchedSchemaMeta):
fields.List(StringTransformedEnum(allowed_values=[o.value for o in WeekDay])),
]
)
month_days = UnionField(
[
fields.Int(),
fields.List(fields.Int()),
]
)

@post_load
def make(self, data, **kwargs) -> "RecurrencePattern": # pylint: disable=no-self-use, unused-argument
Expand Down
7 changes: 7 additions & 0 deletions sdk/ml/azure-ai-ml/azure/ai/ml/_utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -776,3 +776,10 @@ def _is_user_error_from_exception_type(e: Union[Exception, None]):
# For OSError/IOError with error no 28: "No space left on device" should be sdk user error
if isinstance(e, (ConnectionError, KeyboardInterrupt)) or (isinstance(e, (IOError, OSError)) and e.errno == 28):
return True


def get_all_enum_values_iter(enum_type):
"""Get all values of an enum type."""
for key in dir(enum_type):
if not key.startswith("_"):
yield getattr(enum_type, key)
2 changes: 1 addition & 1 deletion sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def _assert_arg_valid(kwargs: dict, keys: list, func_name: str):
"""Assert the arg keys are all in keys."""
# pylint: disable=protected-access
# validate component input names
Component._validate_io_names(io_dict=kwargs)
Component._validate_io_names(io_dict=kwargs, raise_error=True)

lower2original_parameter_names = {x.lower(): x for x in keys}
kwargs_need_to_update = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
from typing import Callable, Union

from azure.ai.ml._utils.utils import (
get_all_enum_values_iter,
is_private_preview_enabled,
is_valid_node_name,
parse_args_description_from_docstring,
)
from azure.ai.ml.constants import AssetTypes
from azure.ai.ml.constants._component import ComponentSource
from azure.ai.ml.dsl._utils import _sanitize_python_variable_name
from azure.ai.ml.entities import PipelineJob
Expand Down Expand Up @@ -244,11 +246,24 @@ def _build_pipeline_outputs(self, outputs: typing.Dict[str, NodeOutput]):
if not isinstance(key, str) or not isinstance(value, NodeOutput) or value._owner is None:
raise UserErrorException(message=error_msg, no_personal_data_message=error_msg)
meta = value._meta or value

# hack: map component output type to valid pipeline output type
def _map_type(_meta):
if type(_meta).__name__ != "InternalOutput":
return _meta.type
if _meta.type in list(get_all_enum_values_iter(AssetTypes)):
return _meta.type
if _meta.type in ["AnyFile"]:
return AssetTypes.URI_FILE
return AssetTypes.URI_FOLDER

# Note: Here we set PipelineOutput as Pipeline's output definition as we need output binding.
pipeline_output = PipelineOutput(
name=key,
data=None,
meta=Output(type=meta.type, description=meta.description, mode=meta.mode, is_control=meta.is_control),
meta=Output(
type=_map_type(meta), description=meta.description, mode=meta.mode, is_control=meta.is_control
),
owner="pipeline",
description=self._args_description.get(key, None),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ def _to_rest_object(self, **kwargs) -> dict: # pylint: disable=unused-argument
computeId=self.compute,
inputs=self._to_rest_inputs(),
outputs=self._to_rest_outputs(),
properties=self.properties,
_source=self._source,
# add all arbitrary attributes to support setting unknown attributes
**self._get_attrs(),
Expand Down
46 changes: 26 additions & 20 deletions sdk/ml/azure-ai-ml/azure/ai/ml/entities/_builders/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,13 @@
from ..._schema import PathAwareSchema
from ..._schema.job.distribution import MPIDistributionSchema, PyTorchDistributionSchema, TensorFlowDistributionSchema
from .._job.identity import AmlToken, Identity, ManagedIdentity, UserIdentity
from .._util import convert_ordered_dict_to_dict, get_rest_dict, load_from_dict, validate_attribute_type
from .._util import (
convert_ordered_dict_to_dict,
from_rest_dict_to_dummy_rest_object,
get_rest_dict_for_node_attrs,
load_from_dict,
validate_attribute_type,
)
from .base_node import BaseNode
from .sweep import Sweep

Expand Down Expand Up @@ -432,24 +438,16 @@ def _picked_fields_from_dict_to_rest_object(cls) -> List[str]:

def _to_rest_object(self, **kwargs) -> dict:
rest_obj = super()._to_rest_object(**kwargs)
distribution = self.distribution._to_rest_object() if self.distribution else None
limits = self.limits._to_rest_object() if self.limits else None
services = {k: v._to_rest_object() for k, v in self.services.items()} if self.services else None
rest_obj.update(
convert_ordered_dict_to_dict(
dict(
componentId=self._get_component_id(),
distribution=get_rest_dict(distribution),
limits=get_rest_dict(limits),
resources=get_rest_dict(self.resources, clear_empty_value=True),
services=services,
)
)
)
# TODO 1951540: Refactor: avoid send None field to server, for other fields like limits, resources etc.
if rest_obj["services"] is None:
del rest_obj["services"]
return rest_obj
for key, value in {
"componentId": self._get_component_id(),
"distribution": get_rest_dict_for_node_attrs(self.distribution, clear_empty_value=True),
"limits": get_rest_dict_for_node_attrs(self.limits, clear_empty_value=True),
"resources": get_rest_dict_for_node_attrs(self.resources, clear_empty_value=True),
"services": get_rest_dict_for_node_attrs(self.services),
}.items():
if value is not None:
rest_obj[key] = value
return convert_ordered_dict_to_dict(rest_obj)

@classmethod
def _load_from_dict(cls, data: Dict, context: Dict, additional_message: str, **kwargs) -> "Command":
Expand Down Expand Up @@ -486,7 +484,15 @@ def _from_rest_object(cls, obj: dict) -> "Command":

# services, sweep won't have services
if "services" in obj and obj["services"]:
obj["services"] = JobService._from_rest_job_services(obj["services"])
# pipeline node rest object are dicts while _from_rest_job_services expect RestJobService
services = {}
for service_name, service in obj["services"].items():
# in rest object of a pipeline job, service will be transferred to a dict as
# it's attributes of a node, but JobService._from_rest_object expect a
# RestJobService, so we need to convert it back. Here we convert the dict to a
# dummy rest object which may work as a RestJobService instead.
services[service_name] = from_rest_dict_to_dummy_rest_object(service)
obj["services"] = JobService._from_rest_job_services(services)

# handle limits
if "limits" in obj and obj["limits"]:
Expand Down
Loading