From 1cfd98fa3987c6992b1ea9882225a7370b8d536d Mon Sep 17 00:00:00 2001 From: nukemiko Date: Mon, 24 Oct 2022 00:36:10 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AF=B9=20`CryptLayerWrappedIOSkel`=20?= =?UTF-8?q?=E8=BF=9B=E8=A1=8C=E4=BA=86=E5=A4=A7=E9=87=8F=E6=94=B9=E8=BF=9B?= =?UTF-8?q?=E5=92=8C=E4=BF=AE=E5=A4=8D=EF=BC=9A=201=E3=80=81=E4=BF=AE?= =?UTF-8?q?=E5=A4=8D=E4=BA=86=20`readblock()`=20=E4=BB=85=E5=9C=A8=20`tell?= =?UTF-8?q?()=20=3D=3D=200`=20=E6=97=B6=E5=8F=AF=E6=AD=A3=E5=B8=B8?= =?UTF-8?q?=E8=AF=BB=E5=8F=96=E4=B8=80=E6=AC=A1=EF=BC=8C=E4=B9=8B=E5=90=8E?= =?UTF-8?q?=E5=8F=AA=E8=BF=94=E5=9B=9E=E7=A9=BA=E5=AD=97=E8=8A=82=E4=B8=B2?= =?UTF-8?q?=E7=9A=84=E9=97=AE=E9=A2=98=E3=80=82=202=E3=80=81=E7=8E=B0?= =?UTF-8?q?=E5=9C=A8=E5=8F=AF=E4=BB=A5=E6=8E=A5=E5=8F=97=20`encrypt()`?= =?UTF-8?q?=E3=80=81`decrypt`=E3=80=81`keystream()`=20=E5=AE=9E=E7=8E=B0?= =?UTF-8?q?=E4=B8=8D=E5=AE=8C=E6=95=B4=EF=BC=88=E6=8A=9B=E5=87=BA=20`NotIm?= =?UTF-8?q?plementedError`=EF=BC=89=E7=9A=84=20`Cipher`=20=E5=AF=B9?= =?UTF-8?q?=E8=B1=A1=E4=BD=9C=E4=B8=BA=E5=86=85=E7=BD=AE=E9=80=8F=E6=98=8E?= =?UTF-8?q?=E5=8A=A0=E5=AF=86=E5=B1=82=EF=BC=9A`decrypt()`=E3=80=81`encryp?= =?UTF-8?q?t()`=20=E6=98=AF=E5=90=A6=E5=AE=9E=E7=8E=B0=E4=BC=9A=E5=BD=B1?= =?UTF-8?q?=E5=93=8D=E5=88=9B=E5=BB=BA=E7=9A=84=E5=AF=B9=E8=B1=A1=E9=80=9A?= =?UTF-8?q?=E8=BF=87=E9=80=8F=E6=98=8E=E5=8A=A0=E5=AF=86=E5=B1=82=E8=AF=BB?= =?UTF-8?q?=E6=88=96=E5=86=99=E7=9A=84=E6=94=AF=E6=8C=81=EF=BC=9B`keystrea?= =?UTF-8?q?m()`=20=E6=9C=AA=E5=AE=9E=E7=8E=B0=EF=BC=8C=E5=88=99=E5=9C=A8?= =?UTF-8?q?=E8=AF=BB/=E5=86=99=E6=93=8D=E4=BD=9C=E6=97=B6=E4=BC=9A?= =?UTF-8?q?=E5=9B=9E=E9=80=80=E4=B8=BA=E4=BD=BF=E7=94=A8=20`decrypt()`=20?= =?UTF-8?q?=E6=88=96=20`encrypt()`=E3=80=82=203=E3=80=81=E7=A7=BB=E9=99=A4?= =?UTF-8?q?=E4=BA=86=E4=BB=8E=E6=9C=AA=E4=BD=BF=E7=94=A8=E4=BD=86=E4=BC=B4?= =?UTF-8?q?=E9=9A=8F=E6=AF=8F=E4=B8=AA=E5=AF=B9=E8=B1=A1=E7=9A=84=E5=88=9B?= =?UTF-8?q?=E5=BB=BA=E8=80=8C=E4=BA=A7=E7=94=9F=E7=9A=84=E9=94=81=E5=B1=9E?= =?UTF-8?q?=E6=80=A7=20`self.=5Flock`=E3=80=82=EF=BC=88=E5=B0=BD=E7=AE=A1?= =?UTF-8?q?=E6=9C=AA=E6=9D=A5=E5=8F=AF=E8=83=BD=E4=BC=9A=E9=87=8D=E6=96=B0?= =?UTF-8?q?=E6=B7=BB=E5=8A=A0=EF=BC=89=204=E3=80=81=E6=9B=B4=E6=96=B0?= =?UTF-8?q?=E5=92=8C=E8=A1=A5=E5=85=85=E4=BA=86=E6=96=87=E6=A1=A3=E5=AD=97?= =?UTF-8?q?=E7=AC=A6=E4=B8=B2=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/libtakiyasha/common.py | 158 +++++++++++++++++++++++++++++++++---- 1 file changed, 141 insertions(+), 17 deletions(-) diff --git a/src/libtakiyasha/common.py b/src/libtakiyasha/common.py index 36fa79b..bf6a620 100644 --- a/src/libtakiyasha/common.py +++ b/src/libtakiyasha/common.py @@ -3,7 +3,6 @@ from abc import ABCMeta, abstractmethod from functools import lru_cache -from threading import Lock from typing import Generator, Iterable, Literal from .miscutils import bytestrxor @@ -94,8 +93,16 @@ class CryptLayerWrappedIOSkel(io.BytesIO): 可绕过透明加密层,访问缓冲区内的原始加密数据。 ``__init__()`` 方法的第一个位置参数 ``cipher`` 必须拥有 - ``encrypt()``、``decrypt()`` 和 ``keystream()`` 方法; - 第二个参数 ``initial_bytes`` 会被用于对象内置缓冲区的初始数据。 + ``encrypt()``、``decrypt()`` 和 ``keystream()`` 方法,且这些方法必须能接受两个位置参数。 + 其中,``encrypt()`` 和 ``decrypt()`` 的第一个位置参数接受字节对象,第二个位置参数接受非负整数; + ``keystream()`` 的两个位置参数均只接受非负整数。 + + 如果 ``cipher`` 未实现这些方法中的任何一个,都需要明确抛出 ``NotImplementedError``。 + 未实现的 ``encrypt()``/``decrypt()`` 方法会导致创建的对象不可通过透明加密层读/写; + 未实现的 ``keystream()`` 方法不会影响对读写的支持,但可能会极大影响读取的速度。 + + ``__init__()`` 方法的第二个参数 ``initial_bytes`` + 会在转换为 ``bytes`` 后作为对象内置缓冲区的初始数据。 基于本类的子类可能拥有自己的构造器方法或函数,而不是直接调用 ``__init__()``;详情请参考该类的文档字符串。 @@ -106,10 +113,32 @@ class CryptLayerWrappedIOSkel(io.BytesIO): @property def name(self) -> str | None: + """当前对象来源文件的路径。 + + 在此类的对象中,此属性总是 ``None``。 + + 如果是通过子类的构造器方法或函数创建的对象,此属性可能会为来源文件的路径字符串。 + """ if hasattr(self, '_name'): name: str = self._name return name + @property + def encryptable(self) -> bool: + """此对象的内置透明加密层是否支持加密(内置 ``Cipher`` 对象的 ``encrypt()`` 方法是否可用)。 + + 这会影响到写入相关方法在参数 ``nocryptlayer=False`` 时是否可用。 + """ + return self._encrypt_available + + @property + def decryptable(self) -> bool: + """此对象的内置透明加密层是否支持解密(内置 ``Cipher`` 对象的 ``decrypt()`` 方法是否可用)。 + + 这会影响到读取相关方法在参数 ``nocryptlayer=False`` 时是否可用,以及此对象是否可迭代。 + """ + return self._decrypt_available + @property def iter_nocryptlayer(self) -> bool: """迭代当前对象时,是否需要绕过透明加密层。默认为 ``False``。""" @@ -168,8 +197,16 @@ def __init__(self, cipher, /, initial_bytes: BytesLike = b'') -> None: 可绕过透明加密层,访问缓冲区内的原始加密数据。 ``__init__()`` 方法的第一个位置参数 ``cipher`` 必须拥有 - ``encrypt()``、``decrypt()`` 和 ``keystream()`` 方法; - 第二个参数 ``initial_bytes`` 会被用于对象内置缓冲区的初始数据。 + ``encrypt()``、``decrypt()`` 和 ``keystream()`` 方法,且这些方法必须能接受两个位置参数。 + 其中,``encrypt()`` 和 ``decrypt()`` 的第一个位置参数接受字节对象,第二个位置参数接受非负整数; + ``keystream()`` 的两个位置参数均只接受非负整数。 + + 如果 ``cipher`` 未实现这些方法中的任何一个,都需要明确抛出 ``NotImplementedError``。 + 未实现的 ``encrypt()``/``decrypt()`` 方法会导致创建的对象不可通过透明加密层读/写; + 未实现的 ``keystream()`` 方法不会影响对读写的支持,但可能会极大影响读取的速度。 + + ``__init__()`` 方法的第二个参数 ``initial_bytes`` + 会在转换为 ``bytes`` 后作为对象内置缓冲区的初始数据。 基于本类的子类可能拥有自己的构造器方法或函数,而不是直接调用 ``__init__()``;详情请参考该类的文档字符串。 @@ -193,25 +230,60 @@ def __init__(self, cipher, /, initial_bytes: BytesLike = b'') -> None: raise TypeError(f"{repr(cipher)} is not a StreamCipher object: " f"method '{method_name}' is not callable" ) + # 检测 keystream() 是否已实现(可用) + self._keystream_available = True + try: + cipher.keystream(0, 0) + except NotImplementedError: + self._keystream_available = False + # 检测 encrypt() 是否已实现(可用) + self._encrypt_available = True + try: + cipher.encrypt(b'', 0) + except NotImplementedError: + self._encrypt_available = False + # 检测 decrypt() 是否已实现(可用) + self._decrypt_available = True + try: + cipher.decrypt(b'', 0) + except NotImplementedError: + self._decrypt_available = False + self._cipher = cipher self._iter_nocryptlayer = False self._iter_mode: Literal['block', 'line'] = 'block' self._iter_block_size: int = io.DEFAULT_BUFFER_SIZE - self._lock = Lock() def __iter__(self): return self - def __next__(self): + def __next__(self) -> bytes: if self._iter_mode == 'line': if self._iter_nocryptlayer: return super().__next__() + elif not self._decrypt_available: + raise io.UnsupportedOperation('iter with crypt layer') else: curpos = self.tell() target_data = super().getvalue()[curpos:] - result_data = bytes(self._xor_data_keystream(curpos, target_data, eof=b'\n')) - + if self._keystream_available: + result_data = bytes(self._xor_data_keystream(curpos, target_data, eof=b'\n')) + else: + result_data = bytearray() + start = curpos + while 1: + stop = start + self._iter_block_size + target_data_segment = target_data[start:stop] + if target_data_segment == b'': + break + d = self._cipher.decrypt(target_data_segment, start) + if b'\n' in d: + result_data.append(d[:d.index(b'\n')]) + break + else: + result_data.append(d) + start += self._iter_block_size if result_data == b'': raise StopIteration @@ -219,13 +291,18 @@ def __next__(self): return result_data elif self._iter_mode == 'block': + if not self._decrypt_available: + raise io.UnsupportedOperation('iter with crypt layer') + curpos = self.tell() target_data = super().getvalue()[curpos:curpos + self._iter_block_size] if self._iter_nocryptlayer: result_data = target_data - else: + elif self._keystream_available: result_data = bytes(self._xor_data_keystream(curpos, target_data, eof=None)) + else: + result_data = bytes(self._cipher.decrypt(target_data, curpos)) if result_data == b'': raise StopIteration @@ -261,6 +338,9 @@ def _xor_data_keystream(self, else: eoford = ord(tobytes(eof)) + if data == b'': + return + keystream = self._cipher.keystream(offset, len(data)) for databyteord, streambyteord in zip(data, keystream): resultbyteord = databyteord ^ streambyteord @@ -271,6 +351,8 @@ def _xor_data_keystream(self, def getvalue(self, nocryptlayer: bool = False) -> bytes: if nocryptlayer: return super().getvalue() + elif not self._decrypt_available: + raise io.UnsupportedOperation('getvalue with crypt layer') else: return self._cipher.decrypt(super().getvalue()) @@ -283,6 +365,8 @@ def getbuffer(self, nocryptlayer: bool = False) -> memoryview: def read(self, size: IntegerLike | None = -1, /, nocryptlayer: bool = False) -> bytes: if nocryptlayer: return super().read(size) + elif not self._decrypt_available: + raise io.UnsupportedOperation('read with crypt layer') else: curpos = self.tell() if size is None: @@ -293,7 +377,10 @@ def read(self, size: IntegerLike | None = -1, /, nocryptlayer: bool = False) -> else: target_data = super().getvalue()[curpos:curpos + size] - result_data = bytes(self._xor_data_keystream(curpos, target_data)) + if self._keystream_available: + result_data = bytes(self._xor_data_keystream(curpos, target_data)) + else: + result_data = self._cipher.decrypt(target_data, curpos) self.seek(curpos + len(result_data), 0) return result_data @@ -301,6 +388,8 @@ def read(self, size: IntegerLike | None = -1, /, nocryptlayer: bool = False) -> def readinto(self, buffer: WritableBuffer, /, nocryptlayer: bool = False) -> int: if nocryptlayer: return super().readinto(buffer) + elif not self._decrypt_available: + raise io.UnsupportedOperation('readinto with crypt layer') else: if isinstance(buffer, memoryview): memview = buffer @@ -316,16 +405,25 @@ def readinto(self, buffer: WritableBuffer, /, nocryptlayer: bool = False) -> int return data_len def read1(self, size: IntegerLike | None = -1, /, nocryptlayer: bool = False) -> bytes: - return self.read(size, nocryptlayer) + if nocryptlayer or self._decrypt_available: + return self.read(size, nocryptlayer) + + raise io.UnsupportedOperation('read1 with crypt layer') def readinto1(self, buffer: WritableBuffer, /, nocryptlayer: bool = False) -> int: - return self.readinto(buffer, nocryptlayer) + if nocryptlayer or self._decrypt_available: + return self.readinto(buffer, nocryptlayer) + + raise io.UnsupportedOperation('readinto1 with crypt layer') def readblock(self, size: IntegerLike | None = -1, /, nocryptlayer: bool = False, *, block_size: IntegerLike | None = io.DEFAULT_BUFFER_SIZE ) -> bytes: + if not self._decrypt_available: + raise io.UnsupportedOperation('readblock with crypt layer') + curpos = self.tell() if size is None: size = -1 @@ -336,14 +434,16 @@ def readblock(self, if block_size < 0: block_size = io.DEFAULT_BUFFER_SIZE if size < 0: - target_data = super().getvalue()[curpos:block_size] + target_data = super().getvalue()[curpos:curpos + block_size] else: target_data = super().getvalue()[curpos:curpos + min([size, block_size])] if nocryptlayer: result_data = target_data - else: + elif self._keystream_available: result_data = bytes(self._xor_data_keystream(curpos, target_data, eof=None)) + else: + result_data = self._cipher.decrypt(target_data, curpos) self.seek(curpos + len(result_data), 0) @@ -353,6 +453,8 @@ def readline(self, size: IntegerLike | None = -1, /, nocryptlayer: bool = False) if nocryptlayer: return super().readline(size) else: + if not self._decrypt_available: + raise io.UnsupportedOperation('readline with crypt layer') curpos = self.tell() if size is None: size = -1 @@ -362,14 +464,32 @@ def readline(self, size: IntegerLike | None = -1, /, nocryptlayer: bool = False) else: target_data = super().getvalue()[curpos:curpos + size] - result_data = bytes(self._xor_data_keystream(curpos, target_data, eof=b'\n')) + if self._keystream_available: + result_data = bytes(self._xor_data_keystream(curpos, target_data, eof=b'\n')) + else: + result_data = bytearray() + start = curpos + while 1: + stop = start + self._iter_block_size + target_data_segment = target_data[start:stop] + if target_data_segment == b'': + break + d = self._cipher.decrypt(target_data_segment, start) + if b'\n' in d: + result_data.append(d[:d.index(b'\n')]) + break + else: + result_data.append(d) + start += self._iter_block_size self.seek(curpos + len(result_data), 0) - return result_data + return bytes(result_data) def readlines(self, hint: IntegerLike | None = -1, /, nocryptlayer: bool = False) -> list[bytes]: if nocryptlayer: return super().readlines(hint) + elif not self._decrypt_available: + raise io.UnsupportedOperation('readlines with crypt layer') else: results_lines = [] if hint is None: @@ -391,6 +511,8 @@ def readlines(self, hint: IntegerLike | None = -1, /, nocryptlayer: bool = False def write(self, data: BytesLike, /, nocryptlayer: bool = False) -> int: if nocryptlayer: return super().write(data) + elif not self._encrypt_available: + raise io.UnsupportedOperation('write with crypt layer') else: curpos = self.tell() return super().write(self._cipher.encrypt(data, curpos)) @@ -398,6 +520,8 @@ def write(self, data: BytesLike, /, nocryptlayer: bool = False) -> int: def writelines(self, lines: Iterable[BytesLike], /, nocryptlayer: bool = False) -> None: if nocryptlayer: return super().writelines(lines) + elif not self._encrypt_available: + raise io.UnsupportedOperation('writelines with crypt layer') else: for line in lines: super().write(line)