Skip to content

Commit

Permalink
profiler update
Browse files Browse the repository at this point in the history
  • Loading branch information
blublinsky committed Jun 25, 2024
1 parent 76c9d1f commit afcfc5b
Show file tree
Hide file tree
Showing 10 changed files with 137 additions and 73 deletions.
6 changes: 5 additions & 1 deletion transforms/universal/ededup/ray/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ build-dist:: set-versions .defaults.build-dist

publish-dist:: .defaults.publish-dist

run-cli-sample: .transforms.run-cli-ray-sample
run-cli-sample:
$(MAKE) RUN_FILE=$(TRANSFORM_NAME)_transform_ray.py \
RUN_ARGS="--run_locally True --data_local_config \"{ 'input_folder' : '../test-data/input', 'output_folder' : '../output'}\" \
--ededup_num_hashes 2" \
.transforms.run-src-file

run-local-sample: .transforms.run-local-ray-sample

Expand Down
6 changes: 5 additions & 1 deletion transforms/universal/fdedup/ray/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ build-dist:: set-versions .defaults.build-dist

publish-dist:: .defaults.publish-dist

run-cli-sample: .transforms.run-cli-ray-sample
run-cli-sample:
$(MAKE) RUN_FILE=$(TRANSFORM_NAME)_transform_ray.py \
RUN_ARGS="--run_locally True --data_local_config \"{ 'input_folder' : '../test-data/input', 'output_folder' : '../output'}\" \
--fdedup_id_column int_id_column" \
.transforms.run-src-file

run-local-sample: .transforms.run-local-ray-sample

Expand Down
22 changes: 18 additions & 4 deletions transforms/universal/fdedup/ray/src/compute_shingles.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
# (C) Copyright IBM Corp. 2024.
# Licensed under the Apache License, Version 2.0 (the “License”);
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an “AS IS” BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

import string

"""
This implements the most simplistic splitting of document based on the white spaces
that can be overwritten by a different document splitter (tokenizer). This method is
Expand All @@ -6,7 +20,6 @@
import compute_shingles
compute_shingles.compute_shingles = my_local_compute_shingles
"""


Expand All @@ -20,16 +33,17 @@ def _find(s: str, ch: str) -> list[int]:
return [i for i, ltr in enumerate(s) if ltr == ch]


def compute_shingles(text: str, word_shingle_size: int, delimiter: str = " ") -> list[str]:
def compute_shingles(txt: str, word_shingle_size: int, delimiter: str = " ") -> list[str]:
"""
Generate word shingles
:param text: document
:param txt: document
:param delimiter: delimiter to split document
:param word_shingle_size: size of shingle in words
:return: list of shingles
"""
text = txt.replace("\n", "").lower().translate(str.maketrans("", "", string.punctuation))
separators = _find(text, delimiter)
if len(separators) + 1 <= word_shingle_size:
return [text]
bounds = [-1] + separators + [len(text)]
return [text[bounds[i] + 1 : bounds[i + word_shingle_size]] for i in range(0, len(bounds) - word_shingle_size)]
return [text[bounds[i] + 1: bounds[i + word_shingle_size]] for i in range(0, len(bounds) - word_shingle_size)]
5 changes: 2 additions & 3 deletions transforms/universal/fdedup/ray/src/fdedup_transform_ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
################################################################################

import random
import string
import time
from argparse import ArgumentParser, Namespace
from typing import Any
Expand Down Expand Up @@ -186,9 +185,9 @@ def flush(limit: int) -> None:
doc_ids = table[self.doc_id_column]
# for every document/its integer id
for n in range(table.num_rows):
doc = docs[n].as_py().replace("\n", "").lower().translate(str.maketrans("", "", string.punctuation))
doc = docs[n].as_py()
doc_id = doc_ids[n].as_py()
shingles = compute_shingles(text=doc, word_shingle_size=self.word_shingle_size, delimiter=self.delimiter)
shingles = compute_shingles(txt=doc, word_shingle_size=self.word_shingle_size, delimiter=self.delimiter)
if len(shingles) > 0:
mh = self._generate_minhashes(shingles)
minhashes.append((doc_id, len(doc), mh))
Expand Down
7 changes: 5 additions & 2 deletions transforms/universal/profiler/ray/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ RUN cd data-processing-lib-python && pip install --no-cache-dir -e .
COPY --chown=ray:users data-processing-lib-ray/ data-processing-lib-ray/
RUN cd data-processing-lib-ray && pip install --no-cache-dir -e .

COPY requirements.txt requirements.txt
RUN pip install --no-cache-dir -r requirements.txt
# Install ray project source
COPY --chown=ray:users src/ src/
COPY --chown=ray:users pyproject.toml pyproject.toml
COPY --chown=ray:users Readme.md Readme.md
RUN pip install --no-cache-dir -e .

# copy source data
COPY src/profiler_transform_ray.py .
Expand Down
36 changes: 16 additions & 20 deletions transforms/universal/profiler/ray/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,38 +22,34 @@ test-src:: .transforms.test-src

setup:: .transforms.setup

test-image:: .transforms.ray-test-image

build:: build-dist image

publish:: publish-dist publish-image

publish-image:: .transforms.publish-image-ray

set-versions:
$(MAKE) TRANSFORM_PYTHON_VERSION=not-used .transforms.set-versions

build-dist:: set-versions .defaults.build-dist

publish-dist:: .defaults.publish-dist

test-image:: image .transforms.test-image-help
@echo WARNING: Skipping pytest version of this test until we have a test/test_ededup.py file.

run-cli-sample:
$(MAKE) RUN_FILE=$(TRANSFORM_NAME)_transform_ray.py \
RUN_ARGS="--run_locally True --data_local_config \"{ 'input_folder' : '../test-data/input', 'output_folder' : '../output'}\" \
--profiler_num_aggregators 2" \
.transforms.run-src-file
setup:: .transforms.setup

# set the version of python transform that this depends on.
set-versions:
$(MAKE) TRANSFORM_PYTHON_VERSION=${CODE2PARQUET_PYTHON_VERSION} TOML_VERSION=$(CODE2PARQUET_PYTHON_VERSION) .transforms.set-versions

build-dist:: set-versions .defaults.build-dist

run-local-python-only-sample:
@# Help: Not implemented yet
@echo "No src/profiler_local.py yet :("
publish-dist:: .defaults.publish-dist

run-local-python-only-sample: .transforms.run-local-sample
run-local-sample: .transforms.run-local-ray-sample

run-s3-sample: .transforms.run-s3-sample
run-s3-sample: .transforms.run-s3-ray-sample

minio-start: .minio-start

load-image:: .transforms.load-image

run-cli-sample:
$(MAKE) RUN_FILE=$(TRANSFORM_NAME)_transform_ray.py \
RUN_ARGS="--run_locally True --data_local_config \"{ 'input_folder' : '../test-data/input', 'output_folder' : '../output'}\" \
--profiler_num_aggregators 2" \
.transforms.run-src-file
48 changes: 48 additions & 0 deletions transforms/universal/profiler/ray/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
[project]
name = "dpk_profiler_transform_ray"
version = "0.4.0.dev6"
requires-python = ">=3.10"
description = "profiler Ray Transform"
license = {text = "Apache-2.0"}
readme = {file = "README.md", content-type = "text/markdown"}
authors = [
{ name = "David Wood", email = "dawood@us.ibm.com" },
{ name = "Boris Lublinsky", email = "blublinsky@ibm.com" },
]
dependencies = [
"data-prep-toolkit-ray==0.2.0.dev6",
"mmh3==4.1.0",
"xxhash==3.4.1",
"tqdm==4.66.3",
]

[build-system]
requires = ["setuptools>=68.0.0", "wheel", "setuptools_scm[toml]>=7.1.0"]
build-backend = "setuptools.build_meta"

[project.optional-dependencies]
dev = [
"twine",
"pytest>=7.3.2",
"pytest-dotenv>=0.5.2",
"pytest-env>=1.0.0",
"pre-commit>=3.3.2",
"pytest-cov>=4.1.0",
"pytest-mock>=3.10.0",
"moto==5.0.5",
"markupsafe==2.0.1",
]

[options]
package_dir = ["src","test"]

[options.packages.find]
where = ["src/"]

[tool.pytest.ini_options]
# Currently we use low coverage since we have to run tests separately (see makefile)
#addopts = "--cov --cov-report term-missing --cov-fail-under 25"
markers = ["unit: unit tests", "integration: integration tests"]

[tool.coverage.run]
include = ["src/*"]
5 changes: 0 additions & 5 deletions transforms/universal/profiler/ray/requirements.txt

This file was deleted.

35 changes: 35 additions & 0 deletions transforms/universal/profiler/ray/src/base_tokenizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# (C) Copyright IBM Corp. 2024.
# Licensed under the Apache License, Version 2.0 (the “License”);
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an “AS IS” BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

import string

"""
This implements the most simplistic tokenizer based on the white spaces
that can be overwritten by a different a different one. This method is
build in the library and can be overwritten using approach described at
https://stackoverflow.com/questions/37553545/how-do-i-override-a-function-of-a-python-library
import base_tokenizer
base_tokenizer.tokenize = my_local_tokenize
"""


def tokenize(text: str) -> list[str]:
"""
Tokenize string
:param text: source text
:return: list of tokens (words)
"""
# start from normalizing string
normal = text.strip().lower().translate(str.maketrans("", "", string.punctuation))
return normal.split()
40 changes: 3 additions & 37 deletions transforms/universal/profiler/ray/src/profiler_transform_ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

import csv
import io
import string
import uuid
from argparse import ArgumentParser, Namespace
from typing import Any
Expand All @@ -31,6 +30,7 @@
RayTransformRuntimeConfiguration,
)
from ray.actor import ActorHandle
from base_tokenizer import tokenize


REQUEST_LEN = 8192
Expand Down Expand Up @@ -109,36 +109,6 @@ def save_data(self) -> tuple[dict[str, Any], int]:
return self.data_access.save_file(path=output_path, data=s.read().encode("utf-8"))


class AbstractTokenizer:
"""
Abstract tokenizer class
"""

def tokenize(self, text: str) -> list[str]:
"""
Tokenize string
:param text: source text
:return: list of tokens (words)
"""
pass


class SimpleTokenizer(AbstractTokenizer):
"""
Simple implementation of the abstract tokenizer
"""

def tokenize(self, text: str) -> list[str]:
"""
Tokenize string
:param text: source text
:return: list of tokens (words)
"""
# start from normalizing string
normal = text.strip().lower().translate(str.maketrans("", "", string.punctuation))
return normal.split()


class ProfilerTransform(AbstractTableTransform):
"""
Implements Aggregator table transformer.
Expand All @@ -149,17 +119,13 @@ def __init__(self, config: dict):
Initialize based on the dictionary of configuration information.
The dictionary should contain the following:
doc_column - name of the doc column
tokenizer - tokenizer to use
aggregators - list of aggregator actors, references
"""
# Make sure that the param name corresponds to the name used in apply_input_params method
# of AggregateTableTransformConfiguration class
super().__init__(config)
self.doc_column = config.get("doc_column", "contents")
self.aggregators = config.get("aggregators", [])
self.tokenizer = config.get("tokenizer", None)
if self.tokenizer is None:
raise RuntimeError("tokenizer is not provided")
if len(self.aggregators) == 0:
raise RuntimeError("No aggregators are available")

Expand All @@ -177,7 +143,7 @@ def transform(self, table: pa.Table, file_name: str = None) -> tuple[list[pa.Tab
# Compute words count
for text in table[self.doc_column]:
# Compute doc hash
tokens = self.tokenizer.tokenize(str(text))
tokens = tokenize(text=str(text))
for token in tokens:
words[token] = words.get(token, 0) + 1
# submit word counts to cache
Expand Down Expand Up @@ -247,7 +213,7 @@ def get_transform_config(
actor_options={"num_cpus": self.params.get("aggregator_cpu", 0.5)},
n_actors=self.params.get("num_aggregators", 1),
)
return {"aggregators": self.aggregators, "tokenizer": SimpleTokenizer()} | self.params
return {"aggregators": self.aggregators} | self.params

def compute_execution_stats(self, stats: dict[str, Any]) -> dict[str, Any]:
"""
Expand Down

0 comments on commit afcfc5b

Please sign in to comment.