Skip to content

Commit

Permalink
SEC: Infinite recursion when using PdfWriter(clone_from=reader) (#2264)
Browse files Browse the repository at this point in the history
Use a visited memo to check if the current object in the clone operation has already been visited, and if so, do not add it to the list of objects.

This avoids infinite recursion in case there are links to the same object inside a PDF.
  • Loading branch information
Alexhuszagh committed Oct 29, 2023
1 parent 56e191d commit 9b23ac3
Showing 1 changed file with 19 additions and 5 deletions.
24 changes: 19 additions & 5 deletions pypdf/generic/_data_structures.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
List,
Optional,
Sequence,
Set,
Tuple,
Union,
cast,
Expand Down Expand Up @@ -187,14 +188,15 @@ def clone(
except Exception:
pass

visited: Set[Tuple[int, int]] = set()
d__ = cast(
"DictionaryObject",
self._reference_clone(self.__class__(), pdf_dest, force_duplicate),
)
if ignore_fields is None:
ignore_fields = []
if len(d__.keys()) == 0:
d__._clone(self, pdf_dest, force_duplicate, ignore_fields)
d__._clone(self, pdf_dest, force_duplicate, ignore_fields, visited)
return d__

def _clone(
Expand All @@ -203,6 +205,7 @@ def _clone(
pdf_dest: PdfWriterProtocol,
force_duplicate: bool,
ignore_fields: Optional[Sequence[Union[str, int]]],
visited: Set[Tuple[int, int]],
) -> None:
"""
Update the object from src.
Expand Down Expand Up @@ -270,6 +273,14 @@ def _clone(
cur_obj.__class__(), pdf_dest, force_duplicate
),
)
# check to see if we've previously processed our item
if clon.indirect_reference is not None:
idnum = clon.indirect_reference.idnum
generation = clon.indirect_reference.generation
if (idnum, generation) in visited:
cur_obj = None
break
visited.add((idnum, generation))
objs.append((cur_obj, clon))
assert prev_obj is not None
prev_obj[NameObject(k)] = clon.indirect_reference
Expand All @@ -282,7 +293,7 @@ def _clone(
except Exception:
cur_obj = None
for s, c in objs:
c._clone(s, pdf_dest, force_duplicate, ignore_fields)
c._clone(s, pdf_dest, force_duplicate, ignore_fields, visited)

for k, v in src.items():
if k not in ignore_fields:
Expand Down Expand Up @@ -798,6 +809,7 @@ def _clone(
pdf_dest: PdfWriterProtocol,
force_duplicate: bool,
ignore_fields: Optional[Sequence[Union[str, int]]],
visited: Set[Tuple[int, int]],
) -> None:
"""
Update the object from src.
Expand All @@ -820,7 +832,7 @@ def _clone(
)
except Exception:
pass
super()._clone(src, pdf_dest, force_duplicate, ignore_fields)
super()._clone(src, pdf_dest, force_duplicate, ignore_fields, visited)

def get_data(self) -> Union[bytes, str]:
return self._data
Expand Down Expand Up @@ -1048,6 +1060,7 @@ def clone(
except Exception:
pass

visited: Set[Tuple[int, int]] = set()
d__ = cast(
"ContentStream",
self._reference_clone(
Expand All @@ -1056,7 +1069,7 @@ def clone(
)
if ignore_fields is None:
ignore_fields = []
d__._clone(self, pdf_dest, force_duplicate, ignore_fields)
d__._clone(self, pdf_dest, force_duplicate, ignore_fields, visited)
return d__

def _clone(
Expand All @@ -1065,6 +1078,7 @@ def _clone(
pdf_dest: PdfWriterProtocol,
force_duplicate: bool,
ignore_fields: Optional[Sequence[Union[str, int]]],
visited: Set[Tuple[int, int]],
) -> None:
"""
Update the object from src.
Expand All @@ -1081,7 +1095,7 @@ def _clone(
self._operations = list(src_cs._operations)
self.forced_encoding = src_cs.forced_encoding
# no need to call DictionaryObjection or anything
# like super(DictionaryObject,self)._clone(src, pdf_dest, force_duplicate, ignore_fields)
# like super(DictionaryObject,self)._clone(src, pdf_dest, force_duplicate, ignore_fields, visited)

def _parse_content_stream(self, stream: StreamType) -> None:
# 7.8.2 Content Streams
Expand Down

0 comments on commit 9b23ac3

Please sign in to comment.