Skip to content

Commit

Permalink
Run unit tests on windows (#856)
Browse files Browse the repository at this point in the history
* Run unit tests on windows

Signed-off-by: Eduardo Apolinario <eapolinario@users.noreply.github.com>

* Set env vars using github yaml

Signed-off-by: Eduardo Apolinario <eapolinario@users.noreply.github.com>

* Merged master

Signed-off-by: Kevin Su <pingsutw@apache.org>

* Updated tests

Signed-off-by: Kevin Su <pingsutw@apache.org>

* Updated tests

Signed-off-by: Kevin Su <pingsutw@apache.org>

* Updated tests

Signed-off-by: Kevin Su <pingsutw@apache.org>

* Updated tests

Signed-off-by: Kevin Su <pingsutw@apache.org>

* updated tests

Signed-off-by: Kevin Su <pingsutw@gmail.com>

* Lint fixed

Signed-off-by: Kevin Su <pingsutw@apache.org>

* Updated tests

Signed-off-by: Kevin Su <pingsutw@gmail.com>

* Lint fixed

Signed-off-by: Kevin Su <pingsutw@apache.org>

* Lint fixed

Signed-off-by: Kevin Su <pingsutw@apache.org>

* Lint fixed

Signed-off-by: Kevin Su <pingsutw@apache.org>

* Lint fixed

Signed-off-by: Kevin Su <pingsutw@apache.org>

* Ignore tests in python3.10 + windows

Signed-off-by: Kevin Su <pingsutw@apache.org>

Co-authored-by: Eduardo Apolinario <eapolinario@users.noreply.github.com>
Co-authored-by: Kevin Su <pingsutw@apache.org>
Co-authored-by: Kevin Su <pingsutw@gmail.com>
  • Loading branch information
4 people committed Mar 15, 2022
1 parent 0405ef2 commit 66e1007
Show file tree
Hide file tree
Showing 14 changed files with 100 additions and 54 deletions.
10 changes: 9 additions & 1 deletion .github/workflows/pythonbuild.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ env:

jobs:
build:
runs-on: ubuntu-latest
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest, windows-latest]
python-version: ["3.7", "3.8", "3.9", "3.10"]
spark-version-suffix: ["", "-spark2"]
exclude:
Expand All @@ -24,6 +25,11 @@ jobs:
spark-version-suffix: "-spark2"
- python-version: 3.10
spark-version-suffix: "-spark2"
# Ignore this test because we failed to install docker-py
# docker-py will install pywin32==227, whereas pywin only added support for python 3.10 in version 301.
# For more detail, see https://github.com/flyteorg/flytekit/pull/856#issuecomment-1067152855
- python-version: 3.10
os: windows-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
Expand All @@ -42,6 +48,8 @@ jobs:
make setup${{ matrix.spark-version-suffix }}
pip freeze
- name: Test with coverage
env:
FLYTE_SDK_USE_STRUCTURED_DATASET: "TRUE"
run: |
coverage run -m pytest tests/flytekit/unit
- name: Codecov
Expand Down
18 changes: 15 additions & 3 deletions flytekit/core/data_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import datetime
import os
import pathlib
import re
import typing
from abc import abstractmethod
from distutils import dir_util
Expand Down Expand Up @@ -136,13 +137,22 @@ def register_plugin(cls, protocol: str, plugin: typing.Type[DataPersistence], fo

cls._PLUGINS[protocol] = plugin

@staticmethod
def get_protocol(url: str):
# copy from fsspec https://github.com/fsspec/filesystem_spec/blob/fe09da6942ad043622212927df7442c104fe7932/fsspec/utils.py#L387-L391
parts = re.split(r"(\:\:|\://)", url, 1)
if len(parts) > 1:
return parts[0]
logger.info("Setting protocol to file")
return "file"

@classmethod
def find_plugin(cls, path: str) -> typing.Type[DataPersistence]:
"""
Returns a plugin for the given protocol, else raise a TypeError
"""
for k, p in cls._PLUGINS.items():
if path.startswith(k) or path.startswith(k.replace("://", "")):
if cls.get_protocol(path) == k.replace("://", "") or path.startswith(k):
return p
raise TypeError(f"No plugin found for matching protocol of path {path}")

Expand Down Expand Up @@ -408,6 +418,7 @@ def get_data(self, remote_path: str, local_path: str, is_multipart=False):
"""
try:
with PerformanceTimer(f"Copying ({remote_path} -> {local_path})"):
pathlib.Path(local_path).parent.mkdir(parents=True, exist_ok=True)
DataPersistencePlugins.find_plugin(remote_path)().get(remote_path, local_path, recursive=is_multipart)
except Exception as ex:
raise FlyteAssertion(
Expand Down Expand Up @@ -437,8 +448,9 @@ def put_data(self, local_path: Union[str, os.PathLike], remote_path: str, is_mul
DataPersistencePlugins.register_plugin("file://", DiskPersistence)
DataPersistencePlugins.register_plugin("/", DiskPersistence)

# TODO make this use tmpdir
tmp_dir = os.path.join("/tmp/flyte", datetime.datetime.now().strftime("%Y%m%d_%H%M%S"))
tmp_dir_prefix = f"{os.sep}tmp{os.sep}flyte"

tmp_dir = os.path.join(tmp_dir_prefix, datetime.datetime.now().strftime("%Y%m%d_%H%M%S"))
default_local_file_access_provider = FileAccessProvider(
local_sandbox_dir=os.path.join(tmp_dir, "sandbox"),
raw_output_prefix=os.path.join(tmp_dir, "raw"),
Expand Down
3 changes: 2 additions & 1 deletion flytekit/extras/persistence/gcs_gsutil.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import posixpath
import typing
from shutil import which as shell_which

Expand All @@ -14,7 +15,7 @@ def _update_cmd_config_and_execute(cmd):


def _amend_path(path):
return os.path.join(path, "*") if not path.endswith("*") else path
return posixpath.join(path, "*") if not path.endswith("*") else path


class GCSPersistence(DataPersistence):
Expand Down
5 changes: 4 additions & 1 deletion flytekit/extras/tasks/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ def execute(self, **kwargs) -> typing.Any:
for v in self._output_locs:
outputs[v.var] = self._interpolizer.interpolate(v.location, inputs=kwargs)

if os.name == "nt":
self._script = self._script.lstrip().rstrip().replace("\n", "&&")

gen_script = self._interpolizer.interpolate(self._script, inputs=kwargs, outputs=outputs)
if self._debug:
print("\n==============================================\n")
Expand All @@ -210,7 +213,7 @@ def execute(self, **kwargs) -> typing.Any:
try:
subprocess.check_call(gen_script, shell=True)
except subprocess.CalledProcessError as e:
files = os.listdir("./")
files = os.listdir(".")
fstr = "\n-".join(files)
logger.error(
f"Failed to Execute Script, return-code {e.returncode} \n"
Expand Down
3 changes: 2 additions & 1 deletion flytekit/tools/fast_registration.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os as _os
import posixpath
import subprocess as _subprocess
import tarfile as _tarfile
import tempfile as _tempfile
Expand Down Expand Up @@ -50,7 +51,7 @@ def get_additional_distribution_loc(remote_location: str, identifier: str) -> st
:param Text identifier:
:return Text:
"""
return _os.path.join(remote_location, "{}.{}".format(identifier, "tar.gz"))
return posixpath.join(remote_location, "{}.{}".format(identifier, "tar.gz"))


def upload_package(source_dir: _os.PathLike, identifier: str, remote_location: str, dry_run=False) -> str:
Expand Down
11 changes: 3 additions & 8 deletions plugins/flytekit-data-fsspec/flytekitplugins/fsspec/persist.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import typing

import fsspec
from fsspec.core import split_protocol
from fsspec.registry import known_implementations

from flytekit.configuration import DataConfig, S3Config
Expand Down Expand Up @@ -51,13 +50,9 @@ def __init__(self, default_prefix=None, data_config: typing.Optional[DataConfig]
@staticmethod
def get_protocol(path: typing.Optional[str] = None):
if path:
protocol, _ = split_protocol(path)
if protocol is None and path.startswith("/"):
logger.info("Setting protocol to file")
protocol = "file"
else:
protocol = "file"
return protocol
return DataPersistencePlugins.get_protocol(path)
logger.info("Setting protocol to file")
return "file"

def get_filesystem(self, path: str) -> fsspec.AbstractFileSystem:
protocol = FSSpecPersistence.get_protocol(path)
Expand Down
9 changes: 4 additions & 5 deletions tests/flytekit/unit/bin/test_python_entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
from collections import OrderedDict

import mock
import pytest
from flyteidl.core.errors_pb2 import ErrorDocument

from flytekit.bin.entrypoint import _dispatch_execute, normalize_inputs, setup_execution
from flytekit.core import context_manager
from flytekit.core.base_task import IgnoreOutputs
from flytekit.core.data_persistence import DiskPersistence
from flytekit.core.dynamic_workflow_task import dynamic
from flytekit.core.promise import VoidPromise
from flytekit.core.task import task
Expand Down Expand Up @@ -277,10 +277,9 @@ def test_dispatch_execute_system_error(mock_write_to_file, mock_upload_dir, mock
assert ed.error.origin == execution_models.ExecutionError.ErrorKind.SYSTEM


def test_setup_bad_prefix():
with pytest.raises(TypeError):
with setup_execution("qwerty"):
...
def test_setup_disk_prefix():
with setup_execution("qwerty") as ctx:
assert isinstance(ctx.file_access._default_remote, DiskPersistence)


def test_setup_cloud_prefix():
Expand Down
16 changes: 8 additions & 8 deletions tests/flytekit/unit/core/functools/test_decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
def test_wrapped_tasks_happy_path(capfd):
subprocess.run(
[sys.executable, str(test_module_dir / "simple_decorator.py")],
env={"SCRIPT_INPUT": "10"},
env={"SCRIPT_INPUT": "10", "SYSTEMROOT": "C:\\Windows", "HOMEPATH": "C:\\Windows"},
text=True,
)
out = capfd.readouterr().out

assert out.strip().split("\n") == [
assert out.replace("\r", "").strip().split("\n") == [
"before running my_task",
"try running my_task",
"running my_task",
Expand All @@ -32,12 +32,12 @@ def test_wrapped_tasks_happy_path(capfd):
def test_wrapped_tasks_error(capfd):
subprocess.run(
[sys.executable, str(test_module_dir / "simple_decorator.py")],
env={"SCRIPT_INPUT": "0"},
env={"SCRIPT_INPUT": "0", "SYSTEMROOT": "C:\\Windows", "HOMEPATH": "C:\\Windows"},
text=True,
)
out = capfd.readouterr().out

assert out.strip().split("\n") == [
assert out.replace("\r", "").strip().split("\n") == [
"before running my_task",
"try running my_task",
"error running my_task: my_task failed with input: 0",
Expand All @@ -49,11 +49,11 @@ def test_wrapped_tasks_error(capfd):
def test_stacked_wrapped_tasks(capfd):
subprocess.run(
[sys.executable, str(test_module_dir / "stacked_decorators.py")],
env={"SCRIPT_INPUT": "10"},
env={"SCRIPT_INPUT": "10", "SYSTEMROOT": "C:\\Windows", "HOMEPATH": "C:\\Windows"},
text=True,
)
out = capfd.readouterr().out
assert out.strip().split("\n") == [
assert out.replace("\r", "").strip().split("\n") == [
"running task_decorator_1",
"running task_decorator_2",
"running task_decorator_3",
Expand All @@ -65,7 +65,7 @@ def test_stacked_wrapped_tasks(capfd):
def test_unwrapped_task():
completed_process = subprocess.run(
[sys.executable, str(test_module_dir / "unwrapped_decorator.py")],
env={"SCRIPT_INPUT": "10"},
env={"SCRIPT_INPUT": "10", "SYSTEMROOT": "C:\\Windows", "HOMEPATH": "C:\\Windows"},
text=True,
capture_output=True,
)
Expand All @@ -81,7 +81,7 @@ def test_unwrapped_task():
def test_nested_function(script):
completed_process = subprocess.run(
[sys.executable, str(test_module_dir / script)],
env={"SCRIPT_INPUT": "10"},
env={"SCRIPT_INPUT": "10", "SYSTEMROOT": "C:\\Windows", "HOMEPATH": "C:\\Windows"},
text=True,
capture_output=True,
)
Expand Down
8 changes: 4 additions & 4 deletions tests/flytekit/unit/core/test_flyte_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from flytekit.configuration import Image, ImageConfig
from flytekit.core import context_manager
from flytekit.core.context_manager import ExecutionState
from flytekit.core.data_persistence import FileAccessProvider
from flytekit.core.data_persistence import FileAccessProvider, tmp_dir_prefix
from flytekit.core.dynamic_workflow_task import dynamic
from flytekit.core.launch_plan import LaunchPlan
from flytekit.core.task import task
Expand Down Expand Up @@ -207,7 +207,7 @@ def my_wf() -> FlyteFile:

# While the literal returned by t1 does contain the web address as the uri, because it's a remote address,
# flytekit will translate it back into a FlyteFile object on the local drive (but not download it)
assert workflow_output.path.startswith(f"{random_dir}/local_flytekit")
assert workflow_output.path.startswith(f"{random_dir}{os.sep}local_flytekit")
# But the remote source should still be the https address
assert workflow_output.remote_source == SAMPLE_DATA

Expand Down Expand Up @@ -422,7 +422,7 @@ def dyn(fs: FlyteFile):
@task
def t2(ff: FlyteFile) -> os.PathLike:
assert ff.remote_source == "s3://somewhere"
assert "/tmp/flyte/" in ff.path
assert tmp_dir_prefix in ff.path

return ff.path

Expand All @@ -432,4 +432,4 @@ def wf(path: str) -> os.PathLike:
dyn(fs=n1)
return t2(ff=n1)

assert "/tmp/flyte/" in wf(path="s3://somewhere").path
assert tmp_dir_prefix in wf(path="s3://somewhere").path
16 changes: 8 additions & 8 deletions tests/flytekit/unit/core/test_shim_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ def test_resolver_load_task():

resolver = TaskTemplateResolver()
ts = get_serializable(OrderedDict(), serialization_settings, square)
with tempfile.NamedTemporaryFile() as f:
write_proto_to_file(ts.template.to_flyte_idl(), f.name)
# load_task should create an instance of the path to the object given, doesn't need to be a real executor
shim_task = resolver.load_task([f.name, f"{Placeholder.__module__}.Placeholder"])
assert isinstance(shim_task.executor, Placeholder)
assert shim_task.task_template.id.name == "square"
assert shim_task.task_template.interface.inputs["val"] is not None
assert shim_task.task_template.interface.outputs["out"] is not None
file = tempfile.NamedTemporaryFile().name
# load_task should create an instance of the path to the object given, doesn't need to be a real executor
write_proto_to_file(ts.template.to_flyte_idl(), file)
shim_task = resolver.load_task([file, f"{Placeholder.__module__}.Placeholder"])
assert isinstance(shim_task.executor, Placeholder)
assert shim_task.task_template.id.name == "square"
assert shim_task.task_template.interface.inputs["val"] is not None
assert shim_task.task_template.interface.outputs["out"] is not None


@mock.patch("flytekit.core.python_customized_container_task.PythonCustomizedContainerTask.get_config")
Expand Down
7 changes: 4 additions & 3 deletions tests/flytekit/unit/core/test_type_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from flytekit import kwtypes
from flytekit.core.annotation import FlyteAnnotation
from flytekit.core.context_manager import FlyteContext, FlyteContextManager
from flytekit.core.data_persistence import tmp_dir_prefix
from flytekit.core.dynamic_workflow_task import dynamic
from flytekit.core.hash import HashMethod
from flytekit.core.task import task
Expand Down Expand Up @@ -654,7 +655,7 @@ def test_structured_dataset_type():

ctx = FlyteContextManager.current_context()
lv = tf.to_literal(ctx, df, pd.DataFrame, lt)
assert "/tmp/flyte" in lv.scalar.structured_dataset.uri
assert tmp_dir_prefix in lv.scalar.structured_dataset.uri
metadata = lv.scalar.structured_dataset.metadata
assert metadata.structured_dataset_type.format == "parquet"
v1 = tf.to_python_value(ctx, lv, pd.DataFrame)
Expand All @@ -666,7 +667,7 @@ def test_structured_dataset_type():
assert subset_lt.structured_dataset_type is not None

subset_lv = tf.to_literal(ctx, df, pd.DataFrame, subset_lt)
assert "/tmp/flyte" in subset_lv.scalar.structured_dataset.uri
assert tmp_dir_prefix in subset_lv.scalar.structured_dataset.uri
v1 = tf.to_python_value(ctx, subset_lv, pd.DataFrame)
v2 = tf.to_python_value(ctx, subset_lv, pa.Table)
subset_data = pd.DataFrame({name: ["Tom", "Joseph"]})
Expand Down Expand Up @@ -724,7 +725,7 @@ def __init__(self, number: int):

ctx = FlyteContextManager.current_context()
lv = TypeEngine.to_literal(ctx, Foo(1), FlytePickle, lt)
assert "/tmp/flyte/" in lv.scalar.blob.uri
assert tmp_dir_prefix in lv.scalar.blob.uri

transformer = FlytePickleTransformer()
gt = transformer.guess_python_type(lt)
Expand Down
16 changes: 8 additions & 8 deletions tests/flytekit/unit/core/test_type_hints.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from flytekit.core import context_manager, launch_plan, promise
from flytekit.core.condition import conditional
from flytekit.core.context_manager import ExecutionState
from flytekit.core.data_persistence import FileAccessProvider
from flytekit.core.data_persistence import FileAccessProvider, tmp_dir_prefix
from flytekit.core.hash import HashMethod
from flytekit.core.node import Node
from flytekit.core.promise import NodeOutput, Promise, VoidPromise
Expand Down Expand Up @@ -57,7 +57,7 @@ def test_default_wf_params_works():
def my_task(a: int):
wf_params = flytekit.current_context()
assert wf_params.execution_id == "ex:local:local:local"
assert "/tmp/flyte/" in wf_params.raw_output_prefix
assert tmp_dir_prefix in wf_params.raw_output_prefix

my_task(a=3)
assert context_manager.FlyteContextManager.size() == 1
Expand Down Expand Up @@ -387,9 +387,9 @@ def t2(fs: FileStruct) -> os.PathLike:
assert fs.a.remote_source == "s3://somewhere"
assert fs.b.a.remote_source == "s3://somewhere"
assert fs.b.b.remote_source == "s3://somewhere"
assert "/tmp/flyte/" in fs.a.path
assert "/tmp/flyte/" in fs.b.a.path
assert "/tmp/flyte/" in fs.b.b.path
assert tmp_dir_prefix in fs.a.path
assert tmp_dir_prefix in fs.b.a.path
assert tmp_dir_prefix in fs.b.b.path

return fs.a.path

Expand All @@ -403,8 +403,8 @@ def wf(path: str) -> (os.PathLike, FlyteFile):
dyn(fs=n1)
return t2(fs=n1), t3(fs=n1)

assert "/tmp/flyte/" in wf(path="s3://somewhere")[0].path
assert "/tmp/flyte/" in wf(path="s3://somewhere")[1].path
assert tmp_dir_prefix in wf(path="s3://somewhere")[0].path
assert tmp_dir_prefix in wf(path="s3://somewhere")[1].path
assert "s3://somewhere" == wf(path="s3://somewhere")[1].remote_source


Expand Down Expand Up @@ -436,7 +436,7 @@ def wf(path: str) -> os.PathLike:
n1 = t1(path=path)
return t2(fs=n1)

assert "/tmp/flyte/" in wf(path="s3://somewhere").path
assert tmp_dir_prefix in wf(path="s3://somewhere").path


def test_structured_dataset_in_dataclass():
Expand Down
Loading

0 comments on commit 66e1007

Please sign in to comment.