diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 92adfec2..eafe8551 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -36,7 +36,7 @@ jobs: run: python -m flake8 pdfplumber tests - name: Check type annotations via mypy - run: python -m mypy --strict pdfplumber + run: python -m mypy --strict --implicit-reexport pdfplumber test: needs: lint diff --git a/CHANGELOG.md b/CHANGELOG.md index 897bd397..ed9cc22d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/). +## [Unreleased] + +### Development Changes + +- Converted `utils.py` into `utils/` submodules. Retains same interface, just an improvement in organization. + ## [0.7.6] - 2022-11-22 ### Changed diff --git a/Makefile b/Makefile index 2aa6afae..bdcaffd7 100644 --- a/Makefile +++ b/Makefile @@ -23,7 +23,7 @@ check-flake: ${PYTHON} -m flake8 pdfplumber tests check-mypy: - ${PYTHON} -m mypy --strict pdfplumber + ${PYTHON} -m mypy --strict --implicit-reexport pdfplumber lint: check-flake check-mypy check-black check-isort diff --git a/pdfplumber/utils/__init__.py b/pdfplumber/utils/__init__.py new file mode 100644 index 00000000..92109e3c --- /dev/null +++ b/pdfplumber/utils/__init__.py @@ -0,0 +1,45 @@ +from .clustering import cluster_list, cluster_objects, make_cluster_dict # noqa: F401 +from .generic import to_list # noqa: F401 +from .geometry import ( # noqa: F401 + bbox_to_rect, + calculate_area, + clip_obj, + crop_to_bbox, + curve_to_edges, + filter_edges, + get_bbox_overlap, + intersects_bbox, + line_to_edge, + merge_bboxes, + move_object, + obj_to_bbox, + obj_to_edges, + objects_to_bbox, + objects_to_rect, + outside_bbox, + rect_to_edges, + resize_object, + snap_objects, + within_bbox, +) +from .pdfinternals import ( # noqa: F401 + decode_psl_list, + decode_text, + resolve, + resolve_all, + resolve_and_decode, +) +from .text import ( # noqa: F401 + DEFAULT_X_DENSITY, + DEFAULT_X_TOLERANCE, + DEFAULT_Y_DENSITY, + DEFAULT_Y_TOLERANCE, + LayoutEngine, + TextLayout, + WordExtractor, + chars_to_layout, + collate_line, + dedupe_chars, + extract_text, + extract_words, +) diff --git a/pdfplumber/utils/clustering.py b/pdfplumber/utils/clustering.py new file mode 100644 index 00000000..34fd876f --- /dev/null +++ b/pdfplumber/utils/clustering.py @@ -0,0 +1,58 @@ +import itertools +from collections.abc import Hashable +from operator import itemgetter +from typing import Callable, Dict, Iterable, List, TypeVar, Union + +from .._typing import T_num + + +def cluster_list(xs: List[T_num], tolerance: T_num = 0) -> List[List[T_num]]: + if tolerance == 0: + return [[x] for x in sorted(xs)] + if len(xs) < 2: + return [[x] for x in sorted(xs)] + groups = [] + xs = list(sorted(xs)) + current_group = [xs[0]] + last = xs[0] + for x in xs[1:]: + if x <= (last + tolerance): + current_group.append(x) + else: + groups.append(current_group) + current_group = [x] + last = x + groups.append(current_group) + return groups + + +def make_cluster_dict(values: Iterable[T_num], tolerance: T_num) -> Dict[T_num, int]: + clusters = cluster_list(list(set(values)), tolerance) + + nested_tuples = [ + [(val, i) for val in value_cluster] for i, value_cluster in enumerate(clusters) + ] + + return dict(itertools.chain(*nested_tuples)) + + +R = TypeVar("R") + + +def cluster_objects( + xs: List[R], key_fn: Union[Hashable, Callable[[R], T_num]], tolerance: T_num +) -> List[List[R]]: + + if not callable(key_fn): + key_fn = itemgetter(key_fn) + + values = map(key_fn, xs) + cluster_dict = make_cluster_dict(values, tolerance) + + get_0, get_1 = itemgetter(0), itemgetter(1) + + cluster_tuples = sorted(((x, cluster_dict.get(key_fn(x))) for x in xs), key=get_1) + + grouped = itertools.groupby(cluster_tuples, key=get_1) + + return [list(map(get_0, v)) for k, v in grouped] diff --git a/pdfplumber/utils/generic.py b/pdfplumber/utils/generic.py new file mode 100644 index 00000000..311c5862 --- /dev/null +++ b/pdfplumber/utils/generic.py @@ -0,0 +1,21 @@ +from collections.abc import Sequence +from typing import TYPE_CHECKING, Any, Dict, List, Union + +from .._typing import T_seq + +if TYPE_CHECKING: # pragma: nocover + from pandas.core.frame import DataFrame + + +def to_list(collection: Union[T_seq[Any], "DataFrame"]) -> List[Any]: + if isinstance(collection, list): + return collection + elif isinstance(collection, Sequence): + return list(collection) + elif hasattr(collection, "to_dict"): + res: List[Dict[Union[str, int], Any]] = collection.to_dict( + "records" + ) # pragma: nocover + return res + else: + return list(collection) diff --git a/pdfplumber/utils/geometry.py b/pdfplumber/utils/geometry.py new file mode 100644 index 00000000..8a085976 --- /dev/null +++ b/pdfplumber/utils/geometry.py @@ -0,0 +1,280 @@ +import itertools +from operator import itemgetter +from typing import Dict, List, Optional + +from .._typing import T_bbox, T_num, T_obj, T_obj_list +from .clustering import cluster_objects +from .generic import to_list + + +def objects_to_rect(objects: T_obj_list) -> Dict[str, T_num]: + return { + "x0": min(map(itemgetter("x0"), objects)), + "x1": max(map(itemgetter("x1"), objects)), + "top": min(map(itemgetter("top"), objects)), + "bottom": max(map(itemgetter("bottom"), objects)), + } + + +def objects_to_bbox(objects: T_obj_list) -> T_bbox: + return ( + min(map(itemgetter("x0"), objects)), + min(map(itemgetter("top"), objects)), + max(map(itemgetter("x1"), objects)), + max(map(itemgetter("bottom"), objects)), + ) + + +bbox_getter = itemgetter("x0", "top", "x1", "bottom") + + +def obj_to_bbox(obj: T_obj) -> T_bbox: + return bbox_getter(obj) + + +def bbox_to_rect(bbox: T_bbox) -> Dict[str, T_num]: + return {"x0": bbox[0], "top": bbox[1], "x1": bbox[2], "bottom": bbox[3]} + + +def merge_bboxes(bboxes: List[T_bbox]) -> T_bbox: + """ + Given a set of bounding boxes, return the smallest bounding box that + contains them all. + """ + return ( + min(map(itemgetter(0), bboxes)), + min(map(itemgetter(1), bboxes)), + max(map(itemgetter(2), bboxes)), + max(map(itemgetter(3), bboxes)), + ) + + +def get_bbox_overlap(a: T_bbox, b: T_bbox) -> Optional[T_bbox]: + a_left, a_top, a_right, a_bottom = a + b_left, b_top, b_right, b_bottom = b + o_left = max(a_left, b_left) + o_right = min(a_right, b_right) + o_bottom = min(a_bottom, b_bottom) + o_top = max(a_top, b_top) + o_width = o_right - o_left + o_height = o_bottom - o_top + if o_height >= 0 and o_width >= 0 and o_height + o_width > 0: + return (o_left, o_top, o_right, o_bottom) + else: + return None + + +def calculate_area(bbox: T_bbox) -> T_num: + left, top, right, bottom = bbox + if left > right or top > bottom: + raise ValueError(f"{bbox} has a negative width or height.") + return (right - left) * (bottom - top) + + +def clip_obj(obj: T_obj, bbox: T_bbox) -> Optional[T_obj]: + + overlap = get_bbox_overlap(obj_to_bbox(obj), bbox) + if overlap is None: + return None + + dims = bbox_to_rect(overlap) + copy = dict(obj) + + for attr in ["x0", "top", "x1", "bottom"]: + copy[attr] = dims[attr] + + diff = dims["top"] - obj["top"] + copy["doctop"] = obj["doctop"] + diff + copy["width"] = copy["x1"] - copy["x0"] + copy["height"] = copy["bottom"] - copy["top"] + + return copy + + +def intersects_bbox(objs: T_obj_list, bbox: T_bbox) -> T_obj_list: + """ + Filters objs to only those intersecting the bbox + """ + initial_type = type(objs) + objs = to_list(objs) + matching = [ + obj for obj in objs if get_bbox_overlap(obj_to_bbox(obj), bbox) is not None + ] + return initial_type(matching) + + +def within_bbox(objs: T_obj_list, bbox: T_bbox) -> T_obj_list: + """ + Filters objs to only those fully within the bbox + """ + return [ + obj + for obj in objs + if get_bbox_overlap(obj_to_bbox(obj), bbox) == obj_to_bbox(obj) + ] + + +def outside_bbox(objs: T_obj_list, bbox: T_bbox) -> T_obj_list: + """ + Filters objs to only those fully outside the bbox + """ + return [obj for obj in objs if get_bbox_overlap(obj_to_bbox(obj), bbox) is None] + + +def crop_to_bbox(objs: T_obj_list, bbox: T_bbox) -> T_obj_list: + """ + Filters objs to only those intersecting the bbox, + and crops the extent of the objects to the bbox. + """ + return list(filter(None, (clip_obj(obj, bbox) for obj in objs))) + + +def move_object(obj: T_obj, axis: str, value: T_num) -> T_obj: + assert axis in ("h", "v") + if axis == "h": + new_items = [ + ("x0", obj["x0"] + value), + ("x1", obj["x1"] + value), + ] + if axis == "v": + new_items = [ + ("top", obj["top"] + value), + ("bottom", obj["bottom"] + value), + ] + if "doctop" in obj: + new_items += [("doctop", obj["doctop"] + value)] + if "y0" in obj: + new_items += [ + ("y0", obj["y0"] - value), + ("y1", obj["y1"] - value), + ] + return obj.__class__(tuple(obj.items()) + tuple(new_items)) + + +def snap_objects(objs: T_obj_list, attr: str, tolerance: T_num) -> T_obj_list: + axis = {"x0": "h", "x1": "h", "top": "v", "bottom": "v"}[attr] + clusters = cluster_objects(objs, itemgetter(attr), tolerance) + avgs = [sum(map(itemgetter(attr), objs)) / len(objs) for objs in clusters] + snapped_clusters = [ + [move_object(obj, axis, avg - obj[attr]) for obj in cluster] + for cluster, avg in zip(clusters, avgs) + ] + return list(itertools.chain(*snapped_clusters)) + + +def resize_object(obj: T_obj, key: str, value: T_num) -> T_obj: + assert key in ("x0", "x1", "top", "bottom") + old_value = obj[key] + diff = value - old_value + new_items = [ + (key, value), + ] + if key == "x0": + assert value <= obj["x1"] + new_items.append(("width", obj["x1"] - value)) + elif key == "x1": + assert value >= obj["x0"] + new_items.append(("width", value - obj["x0"])) + elif key == "top": + assert value <= obj["bottom"] + new_items.append(("doctop", obj["doctop"] + diff)) + new_items.append(("height", obj["height"] - diff)) + if "y1" in obj: + new_items.append(("y1", obj["y1"] - diff)) + elif key == "bottom": + assert value >= obj["top"] + new_items.append(("height", obj["height"] + diff)) + if "y0" in obj: + new_items.append(("y0", obj["y0"] - diff)) + return obj.__class__(tuple(obj.items()) + tuple(new_items)) + + +def curve_to_edges(curve: T_obj) -> T_obj_list: + point_pairs = zip(curve["points"], curve["points"][1:]) + return [ + { + "x0": min(p0[0], p1[0]), + "x1": max(p0[0], p1[0]), + "top": min(p0[1], p1[1]), + "doctop": min(p0[1], p1[1]) + (curve["doctop"] - curve["top"]), + "bottom": max(p0[1], p1[1]), + "width": abs(p0[0] - p1[0]), + "height": abs(p0[1] - p1[1]), + "orientation": "v" if p0[0] == p1[0] else ("h" if p0[1] == p1[1] else None), + } + for p0, p1 in point_pairs + ] + + +def rect_to_edges(rect: T_obj) -> T_obj_list: + top, bottom, left, right = [dict(rect) for x in range(4)] + top.update( + { + "object_type": "rect_edge", + "height": 0, + "y0": rect["y1"], + "bottom": rect["top"], + "orientation": "h", + } + ) + bottom.update( + { + "object_type": "rect_edge", + "height": 0, + "y1": rect["y0"], + "top": rect["top"] + rect["height"], + "doctop": rect["doctop"] + rect["height"], + "orientation": "h", + } + ) + left.update( + { + "object_type": "rect_edge", + "width": 0, + "x1": rect["x0"], + "orientation": "v", + } + ) + right.update( + { + "object_type": "rect_edge", + "width": 0, + "x0": rect["x1"], + "orientation": "v", + } + ) + return [top, bottom, left, right] + + +def line_to_edge(line: T_obj) -> T_obj: + edge = dict(line) + edge["orientation"] = "h" if (line["top"] == line["bottom"]) else "v" + return edge + + +def obj_to_edges(obj: T_obj) -> T_obj_list: + return { + "line": lambda x: [line_to_edge(x)], + "rect": rect_to_edges, + "rect_edge": rect_to_edges, + "curve": curve_to_edges, + }[obj["object_type"]](obj) + + +def filter_edges( + edges: T_obj_list, + orientation: Optional[str] = None, + edge_type: Optional[str] = None, + min_length: T_num = 1, +) -> T_obj_list: + + if orientation not in ("v", "h", None): + raise ValueError("Orientation must be 'v' or 'h'") + + def test(e: T_obj) -> bool: + dim = "height" if e["orientation"] == "v" else "width" + et_correct = e["object_type"] == edge_type if edge_type is not None else True + orient_correct = orientation is None or e["orientation"] == orientation + return bool(et_correct and orient_correct and (e[dim] >= min_length)) + + return list(filter(test, edges)) diff --git a/pdfplumber/utils/pdfinternals.py b/pdfplumber/utils/pdfinternals.py new file mode 100644 index 00000000..a53807c0 --- /dev/null +++ b/pdfplumber/utils/pdfinternals.py @@ -0,0 +1,79 @@ +from typing import Any, List, Optional, Union + +from pdfminer.pdftypes import PDFObjRef +from pdfminer.psparser import PSLiteral +from pdfminer.utils import PDFDocEncoding + + +def decode_text(s: Union[bytes, str]) -> str: + """ + Decodes a PDFDocEncoding string to Unicode. + Adds py3 compatibility to pdfminer's version. + """ + if isinstance(s, bytes) and s.startswith(b"\xfe\xff"): + return str(s[2:], "utf-16be", "ignore") + ords = (ord(c) if isinstance(c, str) else c for c in s) + return "".join(PDFDocEncoding[o] for o in ords) + + +def resolve_and_decode(obj: Any) -> Any: + """Recursively resolve the metadata values.""" + if hasattr(obj, "resolve"): + obj = obj.resolve() + if isinstance(obj, list): + return list(map(resolve_and_decode, obj)) + elif isinstance(obj, PSLiteral): + return decode_text(obj.name) + elif isinstance(obj, (str, bytes)): + return decode_text(obj) + elif isinstance(obj, dict): + for k, v in obj.items(): + obj[k] = resolve_and_decode(v) + return obj + + return obj + + +def decode_psl_list(_list: List[Union[PSLiteral, str]]) -> List[str]: + return [ + decode_text(value.name) if isinstance(value, PSLiteral) else value + for value in _list + ] + + +def resolve(x: Any) -> Any: + if isinstance(x, PDFObjRef): + return x.resolve() + else: + return x + + +def get_dict_type(d: Any) -> Optional[str]: + if not isinstance(d, dict): + return None + t = d.get("Type") + if isinstance(t, PSLiteral): + return decode_text(t.name) + else: + return t + + +def resolve_all(x: Any) -> Any: + """ + Recursively resolves the given object and all the internals. + """ + if isinstance(x, PDFObjRef): + resolved = x.resolve() + + # Avoid infinite recursion + if get_dict_type(resolved) == "Page": + return x + + return resolve_all(resolved) + elif isinstance(x, (list, tuple)): + return type(x)(resolve_all(v) for v in x) + elif isinstance(x, dict): + exceptions = ["Parent"] if get_dict_type(x) == "Annot" else [] + return {k: v if k in exceptions else resolve_all(v) for k, v in x.items()} + else: + return x diff --git a/pdfplumber/utils.py b/pdfplumber/utils/text.py similarity index 56% rename from pdfplumber/utils.py rename to pdfplumber/utils/text.py index 3370b4bd..899c54c9 100644 --- a/pdfplumber/utils.py +++ b/pdfplumber/utils/text.py @@ -1,32 +1,13 @@ import itertools import re import string -from collections.abc import Hashable, Sequence from operator import itemgetter -from typing import ( - TYPE_CHECKING, - Any, - Callable, - Dict, - Generator, - Iterable, - List, - Match, - Optional, - Pattern, - Tuple, - TypeVar, - Union, -) - -from pdfminer.pdftypes import PDFObjRef -from pdfminer.psparser import PSLiteral -from pdfminer.utils import PDFDocEncoding - -from ._typing import T_bbox, T_num, T_obj, T_obj_iter, T_obj_list, T_seq - -if TYPE_CHECKING: # pragma: nocover - from pandas.core.frame import DataFrame +from typing import Any, Dict, Generator, List, Match, Optional, Pattern, Tuple, Union + +from .._typing import T_bbox, T_num, T_obj, T_obj_iter, T_obj_list +from .clustering import cluster_objects +from .generic import to_list +from .geometry import merge_bboxes, obj_to_bbox, objects_to_bbox DEFAULT_X_TOLERANCE = 3 DEFAULT_Y_TOLERANCE = 3 @@ -34,211 +15,6 @@ DEFAULT_Y_DENSITY = 13 -def cluster_list(xs: List[T_num], tolerance: T_num = 0) -> List[List[T_num]]: - if tolerance == 0: - return [[x] for x in sorted(xs)] - if len(xs) < 2: - return [[x] for x in sorted(xs)] - groups = [] - xs = list(sorted(xs)) - current_group = [xs[0]] - last = xs[0] - for x in xs[1:]: - if x <= (last + tolerance): - current_group.append(x) - else: - groups.append(current_group) - current_group = [x] - last = x - groups.append(current_group) - return groups - - -def make_cluster_dict(values: Iterable[T_num], tolerance: T_num) -> Dict[T_num, int]: - clusters = cluster_list(list(set(values)), tolerance) - - nested_tuples = [ - [(val, i) for val in value_cluster] for i, value_cluster in enumerate(clusters) - ] - - return dict(itertools.chain(*nested_tuples)) - - -R = TypeVar("R") - - -def cluster_objects( - xs: List[R], key_fn: Union[Hashable, Callable[[R], T_num]], tolerance: T_num -) -> List[List[R]]: - - if not callable(key_fn): - key_fn = itemgetter(key_fn) - - values = map(key_fn, xs) - cluster_dict = make_cluster_dict(values, tolerance) - - get_0, get_1 = itemgetter(0), itemgetter(1) - - cluster_tuples = sorted(((x, cluster_dict.get(key_fn(x))) for x in xs), key=get_1) - - grouped = itertools.groupby(cluster_tuples, key=get_1) - - return [list(map(get_0, v)) for k, v in grouped] - - -def decode_text(s: Union[bytes, str]) -> str: - """ - Decodes a PDFDocEncoding string to Unicode. - Adds py3 compatibility to pdfminer's version. - """ - if isinstance(s, bytes) and s.startswith(b"\xfe\xff"): - return str(s[2:], "utf-16be", "ignore") - ords = (ord(c) if isinstance(c, str) else c for c in s) - return "".join(PDFDocEncoding[o] for o in ords) - - -def resolve_and_decode(obj: Any) -> Any: - """Recursively resolve the metadata values.""" - if hasattr(obj, "resolve"): - obj = obj.resolve() - if isinstance(obj, list): - return list(map(resolve_and_decode, obj)) - elif isinstance(obj, PSLiteral): - return decode_text(obj.name) - elif isinstance(obj, (str, bytes)): - return decode_text(obj) - elif isinstance(obj, dict): - for k, v in obj.items(): - obj[k] = resolve_and_decode(v) - return obj - - return obj - - -def decode_psl_list(_list: List[Union[PSLiteral, str]]) -> List[str]: - return [ - decode_text(value.name) if isinstance(value, PSLiteral) else value - for value in _list - ] - - -def resolve(x: Any) -> Any: - if isinstance(x, PDFObjRef): - return x.resolve() - else: - return x - - -def get_dict_type(d: Any) -> Optional[str]: - if not isinstance(d, dict): - return None - t = d.get("Type") - if isinstance(t, PSLiteral): - return decode_text(t.name) - else: - return t - - -def resolve_all(x: Any) -> Any: - """ - Recursively resolves the given object and all the internals. - """ - if isinstance(x, PDFObjRef): - resolved = x.resolve() - - # Avoid infinite recursion - if get_dict_type(resolved) == "Page": - return x - - return resolve_all(resolved) - elif isinstance(x, (list, tuple)): - return type(x)(resolve_all(v) for v in x) - elif isinstance(x, dict): - exceptions = ["Parent"] if get_dict_type(x) == "Annot" else [] - return {k: v if k in exceptions else resolve_all(v) for k, v in x.items()} - else: - return x - - -def to_list(collection: Union[T_seq[Any], "DataFrame"]) -> List[Any]: - if isinstance(collection, list): - return collection - elif isinstance(collection, Sequence): - return list(collection) - elif hasattr(collection, "to_dict"): - res: List[Dict[Union[str, int], Any]] = collection.to_dict( - "records" - ) # pragma: nocover - return res - else: - return list(collection) - - -def dedupe_chars(chars: T_obj_list, tolerance: T_num = 1) -> T_obj_list: - """ - Removes duplicate chars — those sharing the same text, fontname, size, - and positioning (within `tolerance`) as other characters in the set. - """ - key = itemgetter("fontname", "size", "upright", "text") - pos_key = itemgetter("doctop", "x0") - - def yield_unique_chars(chars: T_obj_list) -> Generator[T_obj, None, None]: - sorted_chars = sorted(chars, key=key) - for grp, grp_chars in itertools.groupby(sorted_chars, key=key): - for y_cluster in cluster_objects( - list(grp_chars), itemgetter("doctop"), tolerance - ): - for x_cluster in cluster_objects( - y_cluster, itemgetter("x0"), tolerance - ): - yield sorted(x_cluster, key=pos_key)[0] - - deduped = yield_unique_chars(chars) - return sorted(deduped, key=chars.index) - - -def objects_to_rect(objects: T_obj_list) -> Dict[str, T_num]: - return { - "x0": min(map(itemgetter("x0"), objects)), - "x1": max(map(itemgetter("x1"), objects)), - "top": min(map(itemgetter("top"), objects)), - "bottom": max(map(itemgetter("bottom"), objects)), - } - - -def objects_to_bbox(objects: T_obj_list) -> T_bbox: - return ( - min(map(itemgetter("x0"), objects)), - min(map(itemgetter("top"), objects)), - max(map(itemgetter("x1"), objects)), - max(map(itemgetter("bottom"), objects)), - ) - - -bbox_getter = itemgetter("x0", "top", "x1", "bottom") - - -def obj_to_bbox(obj: T_obj) -> T_bbox: - return bbox_getter(obj) - - -def bbox_to_rect(bbox: T_bbox) -> Dict[str, T_num]: - return {"x0": bbox[0], "top": bbox[1], "x1": bbox[2], "bottom": bbox[3]} - - -def merge_bboxes(bboxes: List[T_bbox]) -> T_bbox: - """ - Given a set of bounding boxes, return the smallest bounding box that - contains them all. - """ - return ( - min(map(itemgetter(0), bboxes)), - min(map(itemgetter(1), bboxes)), - max(map(itemgetter(2), bboxes)), - max(map(itemgetter(3), bboxes)), - ) - - class WordExtractor: def __init__( self, @@ -656,232 +432,24 @@ def extract_text( return "\n".join(lines) -def get_bbox_overlap(a: T_bbox, b: T_bbox) -> Optional[T_bbox]: - a_left, a_top, a_right, a_bottom = a - b_left, b_top, b_right, b_bottom = b - o_left = max(a_left, b_left) - o_right = min(a_right, b_right) - o_bottom = min(a_bottom, b_bottom) - o_top = max(a_top, b_top) - o_width = o_right - o_left - o_height = o_bottom - o_top - if o_height >= 0 and o_width >= 0 and o_height + o_width > 0: - return (o_left, o_top, o_right, o_bottom) - else: - return None - - -def calculate_area(bbox: T_bbox) -> T_num: - left, top, right, bottom = bbox - if left > right or top > bottom: - raise ValueError(f"{bbox} has a negative width or height.") - return (right - left) * (bottom - top) - - -def clip_obj(obj: T_obj, bbox: T_bbox) -> Optional[T_obj]: - - overlap = get_bbox_overlap(obj_to_bbox(obj), bbox) - if overlap is None: - return None - - dims = bbox_to_rect(overlap) - copy = dict(obj) - - for attr in ["x0", "top", "x1", "bottom"]: - copy[attr] = dims[attr] - - diff = dims["top"] - obj["top"] - copy["doctop"] = obj["doctop"] + diff - copy["width"] = copy["x1"] - copy["x0"] - copy["height"] = copy["bottom"] - copy["top"] - - return copy - - -def intersects_bbox(objs: T_obj_list, bbox: T_bbox) -> T_obj_list: - """ - Filters objs to only those intersecting the bbox - """ - initial_type = type(objs) - objs = to_list(objs) - matching = [ - obj for obj in objs if get_bbox_overlap(obj_to_bbox(obj), bbox) is not None - ] - return initial_type(matching) - - -def within_bbox(objs: T_obj_list, bbox: T_bbox) -> T_obj_list: - """ - Filters objs to only those fully within the bbox - """ - return [ - obj - for obj in objs - if get_bbox_overlap(obj_to_bbox(obj), bbox) == obj_to_bbox(obj) - ] - - -def outside_bbox(objs: T_obj_list, bbox: T_bbox) -> T_obj_list: - """ - Filters objs to only those fully outside the bbox - """ - return [obj for obj in objs if get_bbox_overlap(obj_to_bbox(obj), bbox) is None] - - -def crop_to_bbox(objs: T_obj_list, bbox: T_bbox) -> T_obj_list: +def dedupe_chars(chars: T_obj_list, tolerance: T_num = 1) -> T_obj_list: """ - Filters objs to only those intersecting the bbox, - and crops the extent of the objects to the bbox. + Removes duplicate chars — those sharing the same text, fontname, size, + and positioning (within `tolerance`) as other characters in the set. """ - return list(filter(None, (clip_obj(obj, bbox) for obj in objs))) - - -def move_object(obj: T_obj, axis: str, value: T_num) -> T_obj: - assert axis in ("h", "v") - if axis == "h": - new_items = [ - ("x0", obj["x0"] + value), - ("x1", obj["x1"] + value), - ] - if axis == "v": - new_items = [ - ("top", obj["top"] + value), - ("bottom", obj["bottom"] + value), - ] - if "doctop" in obj: - new_items += [("doctop", obj["doctop"] + value)] - if "y0" in obj: - new_items += [ - ("y0", obj["y0"] - value), - ("y1", obj["y1"] - value), - ] - return obj.__class__(tuple(obj.items()) + tuple(new_items)) - - -def snap_objects(objs: T_obj_list, attr: str, tolerance: T_num) -> T_obj_list: - axis = {"x0": "h", "x1": "h", "top": "v", "bottom": "v"}[attr] - clusters = cluster_objects(objs, itemgetter(attr), tolerance) - avgs = [sum(map(itemgetter(attr), objs)) / len(objs) for objs in clusters] - snapped_clusters = [ - [move_object(obj, axis, avg - obj[attr]) for obj in cluster] - for cluster, avg in zip(clusters, avgs) - ] - return list(itertools.chain(*snapped_clusters)) - - -def resize_object(obj: T_obj, key: str, value: T_num) -> T_obj: - assert key in ("x0", "x1", "top", "bottom") - old_value = obj[key] - diff = value - old_value - new_items = [ - (key, value), - ] - if key == "x0": - assert value <= obj["x1"] - new_items.append(("width", obj["x1"] - value)) - elif key == "x1": - assert value >= obj["x0"] - new_items.append(("width", value - obj["x0"])) - elif key == "top": - assert value <= obj["bottom"] - new_items.append(("doctop", obj["doctop"] + diff)) - new_items.append(("height", obj["height"] - diff)) - if "y1" in obj: - new_items.append(("y1", obj["y1"] - diff)) - elif key == "bottom": - assert value >= obj["top"] - new_items.append(("height", obj["height"] + diff)) - if "y0" in obj: - new_items.append(("y0", obj["y0"] - diff)) - return obj.__class__(tuple(obj.items()) + tuple(new_items)) - - -def curve_to_edges(curve: T_obj) -> T_obj_list: - point_pairs = zip(curve["points"], curve["points"][1:]) - return [ - { - "x0": min(p0[0], p1[0]), - "x1": max(p0[0], p1[0]), - "top": min(p0[1], p1[1]), - "doctop": min(p0[1], p1[1]) + (curve["doctop"] - curve["top"]), - "bottom": max(p0[1], p1[1]), - "width": abs(p0[0] - p1[0]), - "height": abs(p0[1] - p1[1]), - "orientation": "v" if p0[0] == p1[0] else ("h" if p0[1] == p1[1] else None), - } - for p0, p1 in point_pairs - ] - - -def rect_to_edges(rect: T_obj) -> T_obj_list: - top, bottom, left, right = [dict(rect) for x in range(4)] - top.update( - { - "object_type": "rect_edge", - "height": 0, - "y0": rect["y1"], - "bottom": rect["top"], - "orientation": "h", - } - ) - bottom.update( - { - "object_type": "rect_edge", - "height": 0, - "y1": rect["y0"], - "top": rect["top"] + rect["height"], - "doctop": rect["doctop"] + rect["height"], - "orientation": "h", - } - ) - left.update( - { - "object_type": "rect_edge", - "width": 0, - "x1": rect["x0"], - "orientation": "v", - } - ) - right.update( - { - "object_type": "rect_edge", - "width": 0, - "x0": rect["x1"], - "orientation": "v", - } - ) - return [top, bottom, left, right] - - -def line_to_edge(line: T_obj) -> T_obj: - edge = dict(line) - edge["orientation"] = "h" if (line["top"] == line["bottom"]) else "v" - return edge - - -def obj_to_edges(obj: T_obj) -> T_obj_list: - return { - "line": lambda x: [line_to_edge(x)], - "rect": rect_to_edges, - "rect_edge": rect_to_edges, - "curve": curve_to_edges, - }[obj["object_type"]](obj) - - -def filter_edges( - edges: T_obj_list, - orientation: Optional[str] = None, - edge_type: Optional[str] = None, - min_length: T_num = 1, -) -> T_obj_list: - - if orientation not in ("v", "h", None): - raise ValueError("Orientation must be 'v' or 'h'") + key = itemgetter("fontname", "size", "upright", "text") + pos_key = itemgetter("doctop", "x0") - def test(e: T_obj) -> bool: - dim = "height" if e["orientation"] == "v" else "width" - et_correct = e["object_type"] == edge_type if edge_type is not None else True - orient_correct = orientation is None or e["orientation"] == orientation - return bool(et_correct and orient_correct and (e[dim] >= min_length)) + def yield_unique_chars(chars: T_obj_list) -> Generator[T_obj, None, None]: + sorted_chars = sorted(chars, key=key) + for grp, grp_chars in itertools.groupby(sorted_chars, key=key): + for y_cluster in cluster_objects( + list(grp_chars), itemgetter("doctop"), tolerance + ): + for x_cluster in cluster_objects( + y_cluster, itemgetter("x0"), tolerance + ): + yield sorted(x_cluster, key=pos_key)[0] - return list(filter(test, edges)) + deduped = yield_unique_chars(chars) + return sorted(deduped, key=chars.index)