Skip to content

Commit

Permalink
Merge pull request #269 from IBM/aggregator
Browse files Browse the repository at this point in the history
Initial version of profiler
  • Loading branch information
blublinsky committed Jun 18, 2024
2 parents 3c916ff + f7bb008 commit e21a6ab
Show file tree
Hide file tree
Showing 28 changed files with 1,586 additions and 54 deletions.
4 changes: 3 additions & 1 deletion .make.versions
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ FILTER_SPARK_VERSION=0.2.0$(RELEASE_VERSION_SUFFIX)
NOOP_PYTHON_VERSION=0.9.0$(RELEASE_VERSION_SUFFIX)
NOOP_RAY_VERSION=0.9.0$(RELEASE_VERSION_SUFFIX)
NOOP_SPARK_VERSION=0.2.0$(RELEASE_VERSION_SUFFIX)
PROFILER_VERSION=0.2.0$(RELEASE_VERSION_SUFFIX)

RESIZE_VERSION=0.4.0$(RELEASE_VERSION_SUFFIX)

Expand All @@ -48,8 +49,9 @@ PROGLANG_SELECT_RAY_VERSION=0.4.0$(RELEASE_VERSION_SUFFIX)

CODE_QUALITY_RAY_VERSION=0.4.0$(RELEASE_VERSION_SUFFIX)

INGEST_TO_PARQUET_VERSION=0.4.0$(RELEASE_VERSION_SUFFIX)
INGEST_TO_PARQUET_RAY_VERSION=0.4.0$(RELEASE_VERSION_SUFFIX)
INGEST_TO_PARQUET_VERSION=0.4.0$(RELEASE_VERSION_SUFFIX)


KFP_DOCKER_VERSION=0.2.0$(RELEASE_VERSION_SUFFIX)
KFP_DOCKER_VERSION_v2=0.2.0$(RELEASE_VERSION_SUFFIX)
12 changes: 1 addition & 11 deletions .secrets.baseline
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"files": null,
"lines": null
},
"generated_at": "2024-05-18T19:39:54Z",
"generated_at": "2024-06-17T13:56:26Z",
"plugins_used": [
{
"name": "AWSKeyDetector"
Expand Down Expand Up @@ -87,16 +87,6 @@
"verified_result": null
}
],
"kfp/kfp_ray_components/README.md": [
{
"hashed_secret": "f15e1014a6f4234f0d394a979a45f5983c9fbc7f",
"is_secret": false,
"is_verified": false,
"line_number": 55,
"type": "Secret Keyword",
"verified_result": null
}
],
"kfp/kfp_support_lib/src/kfp_support/workflow_support/utils/workflow_utils.py": [
{
"hashed_secret": "5dae15fedc81823a7080b5ab36b7a3d666336170",
Expand Down
48 changes: 26 additions & 22 deletions data-processing-lib/doc/advanced-transform-tutorial.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,16 @@ adds their handles to the transform parameters
```python
def get_transform_config(
self, data_access_factory: DataAccessFactory, statistics: ActorHandle, files: list[str]
) -> dict[str, Any]:
self.filters = RayUtils.create_actors(
clazz=HashFilter,
params={},
actor_options={"num_cpus": self.params.get("hash_cpu", 0.5)},
n_actors=self.params.get("num_hashes", 1),
)
return {"hashes": self.filters} | self.params
) -> dict[str, Any]:


self.aggregators = RayUtils.create_actors(
clazz=HashFilter,
params={},
actor_options={"num_cpus": self.params.get("hash_cpu", 0.5)},
n_actors=self.params.get("num_hashes", 1),
)
return {"hashes": self.aggregators} | self.params
```
Inputs to this method includes a set of parameters, that moght not be needed for this transformer, but
rather a superset of all parameters that can be used by different implementations of transform runtime (
Expand All @@ -187,20 +189,22 @@ class

```python
def compute_execution_stats(self, stats: dict[str, Any]) -> dict[str, Any]:
# Get filters stats
sum_hash = 0
sum_hash_mem = 0
remote_replies = [f.get_hash_size.remote() for f in self.filters]
while remote_replies:
# Wait for replies
ready, not_ready = ray.wait(remote_replies)
for r in ready:
h_size, h_memory = ray.get(r)
sum_hash = sum_hash + h_size
sum_hash_mem = sum_hash_mem + h_memory
remote_replies = not_ready
dedup_prst = 100 * (1.0 - stats.get("result_documents", 1) / stats.get("source_documents", 1))
return {"number of hashes": sum_hash, "hash memory, GB": sum_hash_mem, "de duplication %": dedup_prst} | stats


# Get filters stats
sum_hash = 0
sum_hash_mem = 0
remote_replies = [f.get_size.remote() for f in self.aggregators]
while remote_replies:
# Wait for replies
ready, not_ready = ray.wait(remote_replies)
for r in ready:
h_size, h_memory = ray.get(r)
sum_hash = sum_hash + h_size
sum_hash_mem = sum_hash_mem + h_memory
remote_replies = not_ready
dedup_prst = 100 * (1.0 - stats.get("result_documents", 1) / stats.get("source_documents", 1))
return {"number of hashes": sum_hash, "hash memory, GB": sum_hash_mem, "de duplication %": dedup_prst} | stats
```
Input to this method is a dictionary of metadata collected by statistics object. It then enhances it by information
collected by hash actors and custom computations based on statistics data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def orchestrate(
# create statistics
statistics = TransformStatistics()
if retries > 0:
statistics.add_stats({"data access retries", retries})
statistics.add_stats({"data access retries": retries})
# create executor
executor = PythonTransformFileProcessor(
data_access_factory=data_access_factory, statistics=statistics, runtime_configuration=runtime_config
Expand Down Expand Up @@ -100,10 +100,8 @@ def orchestrate(
"job_input_params": input_params | data_access_factory.get_input_params(),
"job_output_stats": stats,
}
logger.debug(f"Saved job metadata: {metadata}.")
_, retries = data_access.save_job_metadata(metadata)
if retries > 0:
statistics.add_stats({"data access retries", retries})
logger.debug(f"Saving job metadata: {metadata}.")
data_access.save_job_metadata(metadata)
logger.debug("Saved job metadata.")
return 0
except Exception as e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def process_file(self, f_name: str) -> None:
# Read source file
filedata, retries = self.data_access.get_file(path=f_name)
if retries > 0:
self._publish_stats({"data access retries", retries})
self._publish_stats({"data access retries": retries})
if filedata is None:
self.logger.warning(f"File read resulted in None for {f_name}. Returning.")
self._publish_stats({"failed_reads": 1})
Expand Down Expand Up @@ -126,7 +126,7 @@ def _submit_file(self, t_start: float, out_files: list[tuple[bytes, str]], stats
)
save_res, retries = self.data_access.save_file(path=output_name, data=file_ext[0])
if retries > 0:
self._publish_stats({"data access retries", retries})
self._publish_stats({"data access retries": retries})
if save_res is not None:
# Store execution statistics. Doing this async
self._publish_stats(
Expand Down Expand Up @@ -161,7 +161,7 @@ def _submit_file(self, t_start: float, out_files: list[tuple[bytes, str]], stats
)
save_res, retries = self.data_access.save_file(path=output_name_indexed, data=file_ext[0])
if retries > 0:
self._publish_stats({"data access retries", retries})
self._publish_stats({"data access retries": retries})
if save_res is None:
self.logger.warning(f"Failed to write file {output_name_indexed}")
self._publish_stats({"failed_writes": 1})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,16 @@ def get_tables_in_folder(dir: str) -> list[pa.Table]:
return [TransformUtils.convert_binary_to_arrow(data) for data in files.values()]


def get_files_in_folder(dir: str, ext: str) -> dict[str, bytes]:
def get_files_in_folder(dir: str, ext: str, return_data: bool = True) -> dict[str, bytes]:
"""
Get list of Tables loaded from the parquet files in the given directory. The returned
list is sorted lexigraphically by the name of the file.
:param dir:
Get list of files in folder. Always return file names and conditionally their content
:param dir: directory
:param ext: extension
:param return_data: flag to return file's content - default True
:return:
"""
dal = DataAccessLocal()
files, _ = dal.get_folder_files(dir, extensions=[ext])
files, _ = dal.get_folder_files(dir, extensions=[ext], return_data=return_data)
return files


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from typing import Any

from data_processing.runtime.transform_launcher import AbstractTransformLauncher
from data_processing.test_support.abstract_test import AbstractTest
from data_processing.test_support import AbstractTest
from data_processing.utils import ParamsUtils


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def orchestrate(
# create statistics
statistics = TransformStatisticsRay.remote({})
if retries > 0:
statistics.add_stats.remote({"data access retries", retries})
statistics.add_stats.remote({"data access retries": retries})
# create executors
processor_params = {
"data_access_factory": data_access_factory,
Expand Down Expand Up @@ -137,10 +137,8 @@ def orchestrate(
"execution_stats": resources,
"job_output_stats": stats,
}
logger.debug(f"Saved job metadata: {metadata}.")
_, retries = data_access.save_job_metadata(metadata)
if retries > 0:
statistics.add_stats.remote({"data access retries", retries})
logger.debug(f"Saving job metadata: {metadata}.")
data_access.save_job_metadata(metadata)
logger.debug("Saved job metadata.")
return 0
except Exception as e:
Expand Down
2 changes: 1 addition & 1 deletion transforms/universal/ededup/ray/test/test_ededup_ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from ededup_transform_ray import EdedupRayTransformConfiguration


class TestRayBlocklistTransform(AbstractTransformLauncherTest):
class TestRayEdedupTransform(AbstractTransformLauncherTest):
"""
Extends the super-class to define the test data for the tests defined there.
The name of this class MUST begin with the word Test so that pytest recognizes it as a test class.
Expand Down
65 changes: 65 additions & 0 deletions transforms/universal/profiler/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
REPOROOT=../../..
# Use make help, to see the available rules
include $(REPOROOT)/.make.defaults

setup::
@# Help: Recursively make $@ all subdirs
$(MAKE) RULE=$@ .recurse

clean::
@# Help: Recursively make $@ all subdirs
$(MAKE) RULE=$@ .recurse

build::
@# Help: Recursively make $@ in subdirs
$(MAKE) RULE=$@ .recurse
venv::
@# Help: Recursively make $@ in subdirs
$(MAKE) RULE=$@ .recurse

image::
@# Help: Recursively make $@ in all subdirs
@$(MAKE) RULE=$@ .recurse

set-versions:
@# Help: Recursively $@ in all subdirs
@$(MAKE) RULE=$@ .recurse

publish::
@# Help: Recursively make $@ in all subdirs
@$(MAKE) RULE=$@ .recurse

test-image::
@# Help: Recursively make $@ in all subdirs
@$(MAKE) RULE=$@ .recurse

test::
@# Help: Recursively make $@ in all subdirs
@$(MAKE) RULE=$@ .recurse

test-src::
@# Help: Recursively make $@ in all subdirs
$(MAKE) RULE=$@ .recurse

load-image::
@# Help: Recursively make $@ in all subdirs
$(MAKE) RULE=$@ .recurse

.PHONY: workflow-venv
workflow-venv:
$(MAKE) -C kfp_ray workflow-venv

.PHONY: workflow-build
workflow-build:
$(MAKE) -C kfp_ray workflow-build

.PHONY: workflow-test
workflow-test:
$(MAKE) -C kfp_ray workflow-test

.PHONY: workflow-upload
workflow-upload:
$(MAKE) -C kfp_ray workflow-upload

.PHONY: workflow-reconcile-requirements
$(MAKE) -C kfp_ray workflow-reconcile-requirements
52 changes: 52 additions & 0 deletions transforms/universal/profiler/kfp_ray/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
REPOROOT=${CURDIR}/../../../../
WORKFLOW_VENV_ACTIVATE=${REPOROOT}/transforms/venv/bin/activate
include $(REPOROOT)/transforms/.make.workflows

SRC_DIR=${CURDIR}/../ray/

PYTHON_WF := $(shell find ./ -name '*_wf.py')
YAML_WF := $(patsubst %.py, %.yaml, ${PYTHON_WF})

workflow-venv: .check_python_version ${WORKFLOW_VENV_ACTIVATE}

clean:: .defaults.clean

setup::

venv::

build::

test::

test-src::

test-image::

publish::

image::

load-image::

set-versions: workflow-reconcile-requirements

.PHONY: workflow-build
workflow-build: workflow-venv
$(MAKE) $(YAML_WF)

.PHONY: workflow-test
workflow-test: workflow-build
$(MAKE) .transforms_workflows.test-pipeline TRANSFORM_SRC=${SRC_DIR} PIPELINE_FILE=ededup_wf.yaml

.PHONY: workflow-upload
workflow-upload: workflow-build
@for file in $(YAML_WF); do \
$(MAKE) .transforms_workflows.upload-pipeline PIPELINE_FILE=$$file; \
done

.PHONY: workflow-reconcile-requirements
workflow-reconcile-requirements:
@for file in $(PYTHON_WF); do \
$(MAKE) .transforms_workflows.reconcile-requirements PIPELINE_FILE=$$file; \
done
Loading

0 comments on commit e21a6ab

Please sign in to comment.