Skip to content

Commit

Permalink
CryptLayerWrappedIOSkel 进行了大量改进和修复:
Browse files Browse the repository at this point in the history
1、修复了 `readblock()` 仅在 `tell() == 0` 时可正常读取一次,之后只返回空字节串的问题。
2、现在可以接受 `encrypt()`、`decrypt`、`keystream()` 实现不完整(抛出 `NotImplementedError`)的 `Cipher` 对象作为内置透明加密层:`decrypt()`、`encrypt()` 是否实现会影响创建的对象通过透明加密层读或写的支持;`keystream()` 未实现,则在读/写操作时会回退为使用 `decrypt()` 或 `encrypt()`。
3、移除了从未使用但伴随每个对象的创建而产生的锁属性 `self._lock`。(尽管未来可能会重新添加)
4、更新和补充了文档字符串。
  • Loading branch information
nukemiko committed Oct 23, 2022
1 parent 3b60c12 commit 1cfd98f
Showing 1 changed file with 141 additions and 17 deletions.
158 changes: 141 additions & 17 deletions src/libtakiyasha/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

from abc import ABCMeta, abstractmethod
from functools import lru_cache
from threading import Lock
from typing import Generator, Iterable, Literal

from .miscutils import bytestrxor
Expand Down Expand Up @@ -94,8 +93,16 @@ class CryptLayerWrappedIOSkel(io.BytesIO):
可绕过透明加密层,访问缓冲区内的原始加密数据。
``__init__()`` 方法的第一个位置参数 ``cipher`` 必须拥有
``encrypt()``、``decrypt()`` 和 ``keystream()`` 方法;
第二个参数 ``initial_bytes`` 会被用于对象内置缓冲区的初始数据。
``encrypt()``、``decrypt()`` 和 ``keystream()`` 方法,且这些方法必须能接受两个位置参数。
其中,``encrypt()`` 和 ``decrypt()`` 的第一个位置参数接受字节对象,第二个位置参数接受非负整数;
``keystream()`` 的两个位置参数均只接受非负整数。
如果 ``cipher`` 未实现这些方法中的任何一个,都需要明确抛出 ``NotImplementedError``。
未实现的 ``encrypt()``/``decrypt()`` 方法会导致创建的对象不可通过透明加密层读/写;
未实现的 ``keystream()`` 方法不会影响对读写的支持,但可能会极大影响读取的速度。
``__init__()`` 方法的第二个参数 ``initial_bytes``
会在转换为 ``bytes`` 后作为对象内置缓冲区的初始数据。
基于本类的子类可能拥有自己的构造器方法或函数,而不是直接调用
``__init__()``;详情请参考该类的文档字符串。
Expand All @@ -106,10 +113,32 @@ class CryptLayerWrappedIOSkel(io.BytesIO):

@property
def name(self) -> str | None:
"""当前对象来源文件的路径。
在此类的对象中,此属性总是 ``None``。
如果是通过子类的构造器方法或函数创建的对象,此属性可能会为来源文件的路径字符串。
"""
if hasattr(self, '_name'):
name: str = self._name
return name

@property
def encryptable(self) -> bool:
"""此对象的内置透明加密层是否支持加密(内置 ``Cipher`` 对象的 ``encrypt()`` 方法是否可用)。
这会影响到写入相关方法在参数 ``nocryptlayer=False`` 时是否可用。
"""
return self._encrypt_available

@property
def decryptable(self) -> bool:
"""此对象的内置透明加密层是否支持解密(内置 ``Cipher`` 对象的 ``decrypt()`` 方法是否可用)。
这会影响到读取相关方法在参数 ``nocryptlayer=False`` 时是否可用,以及此对象是否可迭代。
"""
return self._decrypt_available

@property
def iter_nocryptlayer(self) -> bool:
"""迭代当前对象时,是否需要绕过透明加密层。默认为 ``False``。"""
Expand Down Expand Up @@ -168,8 +197,16 @@ def __init__(self, cipher, /, initial_bytes: BytesLike = b'') -> None:
可绕过透明加密层,访问缓冲区内的原始加密数据。
``__init__()`` 方法的第一个位置参数 ``cipher`` 必须拥有
``encrypt()``、``decrypt()`` 和 ``keystream()`` 方法;
第二个参数 ``initial_bytes`` 会被用于对象内置缓冲区的初始数据。
``encrypt()``、``decrypt()`` 和 ``keystream()`` 方法,且这些方法必须能接受两个位置参数。
其中,``encrypt()`` 和 ``decrypt()`` 的第一个位置参数接受字节对象,第二个位置参数接受非负整数;
``keystream()`` 的两个位置参数均只接受非负整数。
如果 ``cipher`` 未实现这些方法中的任何一个,都需要明确抛出 ``NotImplementedError``。
未实现的 ``encrypt()``/``decrypt()`` 方法会导致创建的对象不可通过透明加密层读/写;
未实现的 ``keystream()`` 方法不会影响对读写的支持,但可能会极大影响读取的速度。
``__init__()`` 方法的第二个参数 ``initial_bytes``
会在转换为 ``bytes`` 后作为对象内置缓冲区的初始数据。
基于本类的子类可能拥有自己的构造器方法或函数,而不是直接调用
``__init__()``;详情请参考该类的文档字符串。
Expand All @@ -193,39 +230,79 @@ def __init__(self, cipher, /, initial_bytes: BytesLike = b'') -> None:
raise TypeError(f"{repr(cipher)} is not a StreamCipher object: "
f"method '{method_name}' is not callable"
)
# 检测 keystream() 是否已实现(可用)
self._keystream_available = True
try:
cipher.keystream(0, 0)
except NotImplementedError:
self._keystream_available = False
# 检测 encrypt() 是否已实现(可用)
self._encrypt_available = True
try:
cipher.encrypt(b'', 0)
except NotImplementedError:
self._encrypt_available = False
# 检测 decrypt() 是否已实现(可用)
self._decrypt_available = True
try:
cipher.decrypt(b'', 0)
except NotImplementedError:
self._decrypt_available = False

self._cipher = cipher
self._iter_nocryptlayer = False
self._iter_mode: Literal['block', 'line'] = 'block'
self._iter_block_size: int = io.DEFAULT_BUFFER_SIZE
self._lock = Lock()

def __iter__(self):
return self

def __next__(self):
def __next__(self) -> bytes:
if self._iter_mode == 'line':
if self._iter_nocryptlayer:
return super().__next__()
elif not self._decrypt_available:
raise io.UnsupportedOperation('iter with crypt layer')
else:
curpos = self.tell()

target_data = super().getvalue()[curpos:]
result_data = bytes(self._xor_data_keystream(curpos, target_data, eof=b'\n'))

if self._keystream_available:
result_data = bytes(self._xor_data_keystream(curpos, target_data, eof=b'\n'))
else:
result_data = bytearray()
start = curpos
while 1:
stop = start + self._iter_block_size
target_data_segment = target_data[start:stop]
if target_data_segment == b'':
break
d = self._cipher.decrypt(target_data_segment, start)
if b'\n' in d:
result_data.append(d[:d.index(b'\n')])
break
else:
result_data.append(d)
start += self._iter_block_size
if result_data == b'':
raise StopIteration

self.seek(curpos + len(result_data), 0)

return result_data
elif self._iter_mode == 'block':
if not self._decrypt_available:
raise io.UnsupportedOperation('iter with crypt layer')

curpos = self.tell()

target_data = super().getvalue()[curpos:curpos + self._iter_block_size]
if self._iter_nocryptlayer:
result_data = target_data
else:
elif self._keystream_available:
result_data = bytes(self._xor_data_keystream(curpos, target_data, eof=None))
else:
result_data = bytes(self._cipher.decrypt(target_data, curpos))

if result_data == b'':
raise StopIteration
Expand Down Expand Up @@ -261,6 +338,9 @@ def _xor_data_keystream(self,
else:
eoford = ord(tobytes(eof))

if data == b'':
return

keystream = self._cipher.keystream(offset, len(data))
for databyteord, streambyteord in zip(data, keystream):
resultbyteord = databyteord ^ streambyteord
Expand All @@ -271,6 +351,8 @@ def _xor_data_keystream(self,
def getvalue(self, nocryptlayer: bool = False) -> bytes:
if nocryptlayer:
return super().getvalue()
elif not self._decrypt_available:
raise io.UnsupportedOperation('getvalue with crypt layer')
else:
return self._cipher.decrypt(super().getvalue())

Expand All @@ -283,6 +365,8 @@ def getbuffer(self, nocryptlayer: bool = False) -> memoryview:
def read(self, size: IntegerLike | None = -1, /, nocryptlayer: bool = False) -> bytes:
if nocryptlayer:
return super().read(size)
elif not self._decrypt_available:
raise io.UnsupportedOperation('read with crypt layer')
else:
curpos = self.tell()
if size is None:
Expand All @@ -293,14 +377,19 @@ def read(self, size: IntegerLike | None = -1, /, nocryptlayer: bool = False) ->
else:
target_data = super().getvalue()[curpos:curpos + size]

result_data = bytes(self._xor_data_keystream(curpos, target_data))
if self._keystream_available:
result_data = bytes(self._xor_data_keystream(curpos, target_data))
else:
result_data = self._cipher.decrypt(target_data, curpos)
self.seek(curpos + len(result_data), 0)

return result_data

def readinto(self, buffer: WritableBuffer, /, nocryptlayer: bool = False) -> int:
if nocryptlayer:
return super().readinto(buffer)
elif not self._decrypt_available:
raise io.UnsupportedOperation('readinto with crypt layer')
else:
if isinstance(buffer, memoryview):
memview = buffer
Expand All @@ -316,16 +405,25 @@ def readinto(self, buffer: WritableBuffer, /, nocryptlayer: bool = False) -> int
return data_len

def read1(self, size: IntegerLike | None = -1, /, nocryptlayer: bool = False) -> bytes:
return self.read(size, nocryptlayer)
if nocryptlayer or self._decrypt_available:
return self.read(size, nocryptlayer)

raise io.UnsupportedOperation('read1 with crypt layer')

def readinto1(self, buffer: WritableBuffer, /, nocryptlayer: bool = False) -> int:
return self.readinto(buffer, nocryptlayer)
if nocryptlayer or self._decrypt_available:
return self.readinto(buffer, nocryptlayer)

raise io.UnsupportedOperation('readinto1 with crypt layer')

def readblock(self,
size: IntegerLike | None = -1, /,
nocryptlayer: bool = False, *,
block_size: IntegerLike | None = io.DEFAULT_BUFFER_SIZE
) -> bytes:
if not self._decrypt_available:
raise io.UnsupportedOperation('readblock with crypt layer')

curpos = self.tell()
if size is None:
size = -1
Expand All @@ -336,14 +434,16 @@ def readblock(self,
if block_size < 0:
block_size = io.DEFAULT_BUFFER_SIZE
if size < 0:
target_data = super().getvalue()[curpos:block_size]
target_data = super().getvalue()[curpos:curpos + block_size]
else:
target_data = super().getvalue()[curpos:curpos + min([size, block_size])]

if nocryptlayer:
result_data = target_data
else:
elif self._keystream_available:
result_data = bytes(self._xor_data_keystream(curpos, target_data, eof=None))
else:
result_data = self._cipher.decrypt(target_data, curpos)

self.seek(curpos + len(result_data), 0)

Expand All @@ -353,6 +453,8 @@ def readline(self, size: IntegerLike | None = -1, /, nocryptlayer: bool = False)
if nocryptlayer:
return super().readline(size)
else:
if not self._decrypt_available:
raise io.UnsupportedOperation('readline with crypt layer')
curpos = self.tell()
if size is None:
size = -1
Expand All @@ -362,14 +464,32 @@ def readline(self, size: IntegerLike | None = -1, /, nocryptlayer: bool = False)
else:
target_data = super().getvalue()[curpos:curpos + size]

result_data = bytes(self._xor_data_keystream(curpos, target_data, eof=b'\n'))
if self._keystream_available:
result_data = bytes(self._xor_data_keystream(curpos, target_data, eof=b'\n'))
else:
result_data = bytearray()
start = curpos
while 1:
stop = start + self._iter_block_size
target_data_segment = target_data[start:stop]
if target_data_segment == b'':
break
d = self._cipher.decrypt(target_data_segment, start)
if b'\n' in d:
result_data.append(d[:d.index(b'\n')])
break
else:
result_data.append(d)
start += self._iter_block_size
self.seek(curpos + len(result_data), 0)

return result_data
return bytes(result_data)

def readlines(self, hint: IntegerLike | None = -1, /, nocryptlayer: bool = False) -> list[bytes]:
if nocryptlayer:
return super().readlines(hint)
elif not self._decrypt_available:
raise io.UnsupportedOperation('readlines with crypt layer')
else:
results_lines = []
if hint is None:
Expand All @@ -391,13 +511,17 @@ def readlines(self, hint: IntegerLike | None = -1, /, nocryptlayer: bool = False
def write(self, data: BytesLike, /, nocryptlayer: bool = False) -> int:
if nocryptlayer:
return super().write(data)
elif not self._encrypt_available:
raise io.UnsupportedOperation('write with crypt layer')
else:
curpos = self.tell()
return super().write(self._cipher.encrypt(data, curpos))

def writelines(self, lines: Iterable[BytesLike], /, nocryptlayer: bool = False) -> None:
if nocryptlayer:
return super().writelines(lines)
elif not self._encrypt_available:
raise io.UnsupportedOperation('writelines with crypt layer')
else:
for line in lines:
super().write(line)

0 comments on commit 1cfd98f

Please sign in to comment.