Skip to content

Commit

Permalink
添加了 KWM 的加密/解密支持
Browse files Browse the repository at this point in the history
  • Loading branch information
nukemiko committed Oct 26, 2022
1 parent 3d01c9f commit 2144649
Show file tree
Hide file tree
Showing 2 changed files with 221 additions and 0 deletions.
131 changes: 131 additions & 0 deletions src/libtakiyasha/kwm/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# -*- coding: utf-8 -*-
from __future__ import annotations

from typing import IO

from .kwmdataciphers import Mask32
from ..common import CryptLayerWrappedIOSkel
from ..keyutils import make_salt
from ..typedefs import BytesLike, FilePath
from ..typeutils import is_filepath, tobytes, verify_fileobj


class KWM(CryptLayerWrappedIOSkel):
"""基于 BytesIO 的 KWM 透明加密二进制流。
所有读写相关方法都会经过透明加密层处理:
读取时,返回解密后的数据;写入时,向缓冲区写入加密后的数据。
调用读写相关方法时,附加参数 ``nocryptlayer=True``
可绕过透明加密层,访问缓冲区内的原始加密数据。
如果你要新建一个 KWM 对象,不要直接调用 ``__init__()``,而是使用构造器方法
``KWM.new()`` 和 ``KWM.from_file()`` 新建或打开已有 KWM 文件。
已有 KWM 对象的 ``self.to_file()`` 方法可用于将对象内数据保存到文件,但目前尚未实现。
尝试调用此方法会触发 ``NotImplementedError``。
"""

@property
def cipher(self) -> Mask32:
return self._cipher

@property
def core_key(self) -> bytes:
return self.cipher.core_key

@property
def master_key(self) -> bytes:
return self.cipher.master_key

def __init__(self, cipher: Mask32, /, initial_bytes: BytesLike = b'') -> None:
"""基于 BytesIO 的 KWM 透明加密二进制流。
所有读写相关方法都会经过透明加密层处理:
读取时,返回解密后的数据;写入时,向缓冲区写入加密后的数据。
调用读写相关方法时,附加参数 ``nocryptlayer=True``
可绕过透明加密层,访问缓冲区内的原始加密数据。
如果你要新建一个 KWM 对象,不要直接调用 ``__init__()``,而是使用构造器方法
``KWM.new()`` 和 ``KWM.from_file()`` 新建或打开已有 KWM 文件。
已有 KWM 对象的 ``self.to_file()`` 方法可用于将对象内数据保存到文件,但目前尚未实现。
尝试调用此方法会触发 ``NotImplementedError``。
"""
super().__init__(cipher, initial_bytes)
if not isinstance(cipher, Mask32):
raise TypeError('unsupported Cipher: '
f'supports {Mask32.__module__}.{Mask32.__name__}, '
f'not {type(cipher).__name__}'
)

@classmethod
def new(cls, core_key: BytesLike) -> KWM:
"""创建并返回一个全新的空 KWM 对象。
第一个参数 ``core_key`` 是必需的,它被用于还原和解密主密钥。
"""
core_key = tobytes(core_key)

master_key = make_salt(8)
cipher = Mask32(core_key, master_key)

return cls(cipher)

@classmethod
def from_file(cls,
kwm_filething: FilePath | IO[bytes], /,
core_key: BytesLike
):
"""打开一个 KWM 文件或文件对象 ``kwm_filething``。
第一个位置参数 ``kwm_filething`` 可以是文件路径(``str``、``bytes``
或任何拥有方法 ``__fspath__()`` 的对象)。``kwm_filething``
也可以是一个文件对象,但必须可读、可跳转(``kwm_filething.seekable() == True``)。
第二个参数 ``core_key`` 是必需的,它被用于还原和解密主密钥。
"""

def operation(fileobj: IO[bytes]) -> cls:
if not fileobj.read(24).startswith(b'yeelion-kuwo-tme'):
raise ValueError(f"{repr(kwm_filething)} is not a KWM file")

master_key = fileobj.read(8)
cipher = Mask32(core_key, master_key)

fileobj.seek(1024, 0)
initial_bytes = fileobj.read()

return cls(cipher, initial_bytes)

core_key = tobytes(core_key)

if is_filepath(kwm_filething):
with open(kwm_filething, mode='rb') as kwm_fileobj:
instance = operation(kwm_fileobj)
else:
kwm_fileobj = verify_fileobj(kwm_filething, 'binary',
verify_readable=True,
verify_seekable=True
)

instance._name = getattr(kwm_fileobj, 'name', None)

return instance

def to_file(self, kwm_filething: FilePath | IO[bytes]) -> None:
"""警告:尚未完全探明 KWM 文件的结构,因此本方法尚未实现,尝试调用会触发
``NotImplementedError``。预计的参数和行为如下:
将当前 KWM 对象的内容保存到文件 ``kwm_filething``。
第一个位置参数 ``kwm_filething`` 可以是文件路径(``str``、``bytes``
或任何拥有方法 ``__fspath__()`` 的对象)。``kwm_filething``
也可以是一个文件对象,但必须可写。
本方法会首先尝试写入 ``kwm_filething`` 指向的文件。
如果未提供 ``kwm_filething``,则会尝试写入 ``self.name``
指向的文件。如果两者都为空或未提供,则会触发 ``CrypterSavingError``。
"""
raise NotImplementedError('coming soon')
90 changes: 90 additions & 0 deletions src/libtakiyasha/kwm/kwmdataciphers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# -*- coding: utf-8 -*-
from __future__ import annotations

from typing import Generator

from ..common import StreamCipherSkel
from ..miscutils import bytestrxor
from ..typedefs import BytesLike, IntegerLike
from ..typeutils import tobytes, toint_nofloat

__all__ = ['Mask32']


class Mask32(StreamCipherSkel):
@property
def core_key(self) -> bytes:
return self._core_key

@property
def master_key(self) -> bytes:
return self._master_key

@property
def mask32(self) -> bytes:
return self._mask32

def __init__(self, core_key: BytesLike, master_key=BytesLike, /) -> None:
core_key = tobytes(core_key)
master_key = tobytes(master_key)

for varname, var, expectlen in ('core_key', core_key, 32), ('master_key', master_key, 8):
if len(var) != expectlen:
f"invalid length of argument '{varname}': should be {expectlen}, not {len(var)}"

self._core_key = core_key
self._master_key = master_key

mask_stage1 = str(int.from_bytes(master_key, 'little'))
if len(mask_stage1) >= 32:
mask_stage2 = mask_stage1[:32]
else:
mask_stage2_pad_len = 32 - len(mask_stage1)
mask_stage2_stage1_fullpad_count = (mask_stage2_pad_len // len(mask_stage1))
mask_stage2_stage1_fullpad_len = len(mask_stage1) * mask_stage2_stage1_fullpad_count
mask_stage2_remain_len = mask_stage2_pad_len - mask_stage2_stage1_fullpad_len

mask_stage2_composition = [mask_stage1]
for _ in range(mask_stage2_stage1_fullpad_count):
mask_stage2_composition.append(mask_stage1)
mask_stage2_composition.append(mask_stage1[:mask_stage2_remain_len])
mask_stage2 = ''.join(mask_stage2_composition).encode('utf-8')

mask_final = bytestrxor(mask_stage2, core_key)
self._mask32 = mask_final

@classmethod
def cls_keystream(cls,
offset: IntegerLike,
length: IntegerLike, /,
mask32: BytesLike
) -> Generator[int, None, None]:
offset = toint_nofloat(offset)
length = toint_nofloat(length)
if offset < 0:
raise ValueError("first argument 'offset' must be a non-negative integer")
if length < 0:
raise ValueError("second argument 'length' must be a non-negative integer")
maskblk_data: bytes = tobytes(mask32)
maskblk_len = len(maskblk_data)
if maskblk_len != 32:
raise ValueError(f"invalid mask length: should be 32, not {maskblk_len}")

target_in_maskblk_len = length
target_offset_in_maskblk = offset % maskblk_len
if target_offset_in_maskblk == 0:
target_before_maskblk_area_len = 0
else:
target_before_maskblk_area_len = maskblk_len - target_offset_in_maskblk
yield from maskblk_data[target_offset_in_maskblk:target_offset_in_maskblk + target_before_maskblk_area_len]
target_in_maskblk_len -= target_before_maskblk_area_len

target_overrided_whole_maskblk_count = target_in_maskblk_len // maskblk_len
target_after_maskblk_area_len = target_in_maskblk_len % maskblk_len

for _ in range(target_overrided_whole_maskblk_count):
yield from maskblk_data
yield from maskblk_data[:target_after_maskblk_area_len]

def keystream(self, offset: IntegerLike, length: IntegerLike, /) -> Generator[int, None, None]:
yield from self.cls_keystream(offset, length, self._mask32)

0 comments on commit 2144649

Please sign in to comment.