diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..457a03c --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,15 @@ +## 变更记录 + +### 版本 2.1.0 + +- 减少了一些重复代码的使用,删除了大量不再使用的代码 +- 优化了判断是否为 QMCv1 文件的逻辑、QMCv2 文件的主密钥探测逻辑 +- `libtakiyasha.qmc.QMCv2` 的 `open()` 和 `save()` 现在可接受多个混淆密钥,通过关键字参数 `garble_keys` 在需要时传入。 + - **因此,上述方法中原来的关键字参数 `garble_key1` 和 `garble_key2` 已经被干掉了,请及时修改你的工具链。** + - 如果提供此参数,需要提供一个产生至少一个混淆密钥(类字节对象)的可迭代对象(例如列表),且混淆密钥的顺序必须正确。 + +在[这里](https://github.com/nukemiko/libtakiyasha/compare/2.1.0rc2...2.1.0)查看更详细的变更记录。 + +### 版本 2.0.1 至 2.1.0rc2 + +在[这里](https://github.com/nukemiko/libtakiyasha/compare/2.0.1...2.1.0rc2)查看更详细的变更记录。 diff --git a/README.md b/README.md index d80df11..8ea13c3 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ LibTakiyasha 是一个 Python 音频加密/解密工具库(当然也可用于 **本项目是以学习和技术研究的初衷创建的,修改、再分发时请遵循 [License](LICENSE)。** -本项目的设计灵感,以及部分解密方案,来源于同类项目: +本项目的设计灵感,以及部分解密方案,来源于同类项目: - [Unlock Music Project - CLI Edition](https://git.unlock-music.dev/um/cli) - [parakeet-rs/libparakeet](https://github.com/parakeet-rs/libparakeet) @@ -19,37 +19,39 @@ LibTakiyasha 是一个 Python 音频加密/解密工具库(当然也可用于 --- +## 新变化? + +请参阅[变更记录](CHANGELOG.md)。 + +(如果你是在 PyPI 上浏览本项目,它可能会出现在页面的底部,[点按此处跳转](#变更记录)。) + ## 特性 - 使用纯 Python 代码编写 - - **兼容 Python 3.8 及后续版本**,兼容多种 Python 解释器实现(见下文 [#性能测试](#性能测试)) + - **兼容 Python 3.8 及后续版本**,兼容多种 Python 解释器实现 + - 可在[此处](https://github.com/nukemiko/libtakiyasha/wiki/%E6%80%A7%E8%83%BD%E8%A1%A8%E7%8E%B0)查看具体兼容哪些实现 - 易于阅读,方便 Python 爱好者学习 - (包括依赖库)无任何 C/C++ 扩展模块,跨平台性强 +- 支持四种加密文件: + - 网易云音乐加密文件 `.ncm` + - QQ 音乐加密文件 QMCv1 `.qmc[0-9]`、`.qmcflac`、`.qmcogg`、`.qmcra` 等 + - QQ 音乐加密文件 QMCv2 `.mflac[0-9]`、`.mgg[0-9]` 等 + - 酷狗音乐加密文件 KGM/VPR `.kgm`、`.vpr` + - 不支持创建新加密文件 + - 酷我音乐加密文件 `.kwm` + - 更多信息,请参见[此处](https://github.com/nukemiko/libtakiyasha/wiki/%E6%94%AF%E6%8C%81%E7%9A%84%E6%A0%BC%E5%BC%8F%E4%BB%A5%E5%8F%8A%E6%89%80%E9%9C%80%E7%9A%84%E5%8F%82%E6%95%B0) -### 性能测试 +### 性能表现 -由于 Python 语言自身原因,LibTakiyasha 相较于同类项目,运行速度较慢。因此我们使用不同解释器实现,对常用操作做了一些性能测试: +参见[此处](https://github.com/nukemiko/libtakiyasha/wiki/%E6%80%A7%E8%83%BD%E8%A1%A8%E7%8E%B0)。 -| 操作 | 测试大小 | Python 3.10.9 (CPython) | Python 3.8.12 (Pyston 2.3.5) | Python 3.9.16 (PyPy 7.3.11) | -| :------------: | :------: | :---------------------: | :--------------------------: | :-------------------------: | -| NCM 加密 | 36.8 MiB | 4.159s | 2.159s | 1.366s | -| NCM 解密 | 36.8 MiB | 4.393s | 2.360s | 1.480s | -| QMCv1 加密 | 36.8 MiB | 3.841s | 2.116s | 1.594s | -| QMCv1 解密 | 36.8 MiB | 3.813s | 2.331s | 1.406s | -| QMCv2 掩码加密 | 36.8 MiB | 4.065s | 2.201s | 1.727s | -| QMCv2 掩码解密 | 36.8 MiB | 3.990s | 2.200s | 1.848s | -| QMCv2 RC4 加密 | 36.8 MiB | 12.820s | 5.596s | 2.717s | -| QMCv2 RC4 解密 | 36.8 MiB | 12.588s | 5.913s | 2.552s | -| KGM 解密 | 64.4 MiB | 49.014s | 22.053s | 8.376s | -| VPR 解密 | 87.9 MiB | 70.030s | 32.252s | 11.902s | - -仅在你对速度有要求时,可以考虑在调用 LibTakiyasha 时使用 PyPy/Pyston 解释器。 +## 安装 -一般情况下,建议使用官方解释器实现(CPython)。 +可用的最新版本:2.1.0,[GitHub 发布页面](https://github.com/nukemiko/libtakiyasha/releases/tag/2.1.0),[PyPI](https://pypi.org/project/libtakiyasha/2.1.0/) -## 安装 +### 安装方式 -可用的最新版本:2.1.0rc2,可前往[发布页面](https://github.com/nukemiko/libtakiyasha/releases/tag/2.1.0rc2)或 [PyPI](https://pypi.org/project/libtakiyasha/2.1.0rc2/) 下载。 +- 使用 `pip`,通过 PyPI 安装最新版本:`python -m pip install -U libtakiyasha` 如果你要下载其他版本: @@ -61,8 +63,14 @@ LibTakiyasha 是一个 Python 音频加密/解密工具库(当然也可用于 LibTakiyasha 依赖以下包,均可从 PyPI 获取: -- [pyaes](https://pypi.org/project/pyaes/) -- [mutagen](https://pypi.org/project/mutagen/) +- [pyaes](https://pypi.org/project/pyaes/) - 用于加解密 NCM 文件内嵌的主密钥和元数据 +- [mutagen](https://pypi.org/project/mutagen/) - 用于以 `mutagen` 可接受的形式导出 NCM 文件内嵌的元数据 + +## 如何使用? + +在[这里](https://github.com/nukemiko/libtakiyasha/wiki/%E5%A6%82%E4%BD%95%E4%BD%BF%E7%94%A8%E5%8F%8A%E7%A4%BA%E4%BE%8B)可以找到使用方法和示例。 + +同时,在[本项目的 Wiki 主页](https://github.com/nukemiko/libtakiyasha/wiki)可以找到其他一些可能对你有用的东西。 ## 常见问题 @@ -73,78 +81,3 @@ LibTakiyasha 依赖以下包,均可从 PyPI 获取: - LibTakiyasha 是一个加解密库,当然需要为用户提供自定义密钥的权利 - 为了保护本项目不受美国数字千年版权法Digital Millennium Copyright Act(DMCA)影响,避免仓库被误杀 - 因此,本仓库所有 1.x 及更早版本的提交和发布版本都已删除。 - -> 如何使用? - -当你 `import libtakiyasha` 时,`libtakiyasha` 下有四个子模块 `ncm`、`qmc`、`kgmvpr`、`kwm` 会被自动导入。这些子模块下各有一个加密文件对象类(`qmc` 除外,有两个),和一个 `probe` 开头的探测函数(`qmc` 除外,有三个),用于确认目标文件是否被该模块支持: - -| 模块 | 加密文件对象类 | 探测函数 | -| :-------------------: | :----------------: | :-----------------------------------------------: | -| `libtakiyasha.ncm` | `NCM` | `probe_ncm()` | -| `libtakiyasha.qmc` | `QMCv1` 和 `QMCv2` | `probe_qmc()`、`probe_qmcv1()` 和 `probe_qmcv2()` | -| `libtakiyasha.kgmvpr` | `KGMorVPR` | `probe_kgmvpr()` | -| `libtakiyasha.kwm` | `KWM` | `probe_kwm()` | - -每个探测函数都会返回一个内含两个元素的元组: - -- 第一个元素为文件路径或文件对象,取决于探测函数收到的参数; -- 在探测到受支持的文件时,第二个元素为文件的信息,否则为 `None` - -每一个加密文件对象都可以按照普通文件对象对待(拥有 `read()`、`write()`、`seek()` 等方法),也拥有一个 `save()` 方法,以便将该加密文件对象保存到文件。 - -以 `libtakiyasha.ncm.NCM` 为例,以下是简单的使用示例: - -- 要想打开外部加密文件,或新建空加密文件,使用对应加密文件对象类的构造器方法 `open()` 或 `new()`: - - ```pycon - >>> # 打开外部加密文件 - >>> ncmfile_from_open = libtakiyasha.ncm.NCM.open('/path/to/ncmmfile.ncm', core_key=..., tag_key=...) - >>> ncmfile_from_open - , source '/path/to/ncmfile.ncm'> - >>> # 新建空加密文件对象 - >>> ncmfile_new = libtakiyasha.ncm.NCM.new() - >>> ncmfile_new - - >>> - ``` - -- 从加密文件中读取和写入数据: - - ```pycon - >>> # 读取 16 字节 - >>> ncmfile_from_open.read(16) - b'fLaC\\x00\\x00\\x00"\\x12\\x00\\x12\\x00\\x00\\x07)\\x00' - >>> # 读取一行数据,直到下一个换行符 \n - >>> ncmfile_from_open.readline() - b'\xc4B\xf0\x00\xb6\xe14A\x86nz.\x97\xa8\xe3\xbe\x1d\xb7\xb02?u&\x03\x00\t\x90\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x12\x00\x00\x00\x00\x00\x00\x01V\x00\x00\x00\x00\x00\x00\x01\xd6`\x12\x00\x00\x00\x00\x00\x00\x02\xac\x00\x00\x00\x00\x00\x00\x04\x92B\x12\x00\x00\x00\x00\x00\x00\x04\x02\x00\x00\x00\x00\x00\x00\x07\x0f\xb2\x12\x00\x00\x00\x00\x00\x00\x05X\x00\x00\x00\x00\x00\x00\t\xd4\x8c\x12\x00\x00\x00\x00\x00\x00\x06\xae\x00\x00\x00\x00\x00\x00\x0c\xa3\xb6\x12\x00\x00\x00\x00\x00\x00\x08\x04\x00\x00\x00\x00\x00\x00\x0f|\x90\x12\x00\x00\x00\x00\x00\x00\tZ\x00\x00\x00\x00\x00\x00\x12^T\x12\x00\x00\x00\x00\x00\x00\n' - >>> - >>> # 使用 for 循环按照固定大小迭代加密文件对象 - >>> ncmfile_from_open.seek(0, 0) - 0 - >>> for blk in ncmfile_from_open: - ... print(len(blk)) - ... - 8192 - 8192 - 8192 - 8192 - 8192 - 8192 - [...] - >>> # 向加密文件对象写入数据 - >>> ncmfile_from_open.seek(0, 2) - 36137109 - >>> ncmfile_from_open.write(b'Now I writing something...') - 26 - >>> - ``` - -- 保存加密文件对象到文件: - - ```pycon - >>> # 如果该 NCM 对象不是从文件打开的,还需要 filething 参数 - >>> ncmfile_from_open.save(core_key=..., tag_key=...) - >>> - ``` - -有关每个加密文件的操作示例,请使用 `help()` 查看对应加密文件对象类的文档。 diff --git a/pyproject.toml b/pyproject.toml index 082dc21..0246bba 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,5 +37,5 @@ where = ["src"] [tool.setuptools.dynamic] dependencies = { file = ["requirements.txt"] } -readme = { file = ["README.md"], content-type = 'text/markdown' } +readme = { file = ["README.md", "CHANGELOG.md"], content-type = 'text/markdown' } version = { file = "src/libtakiyasha/VERSION" } diff --git a/src/libtakiyasha/VERSION b/src/libtakiyasha/VERSION index 51c0fcb..7ec1d6d 100644 --- a/src/libtakiyasha/VERSION +++ b/src/libtakiyasha/VERSION @@ -1 +1 @@ -2.1.0rc2 +2.1.0 diff --git a/src/libtakiyasha/kgmvpr/__init__.py b/src/libtakiyasha/kgmvpr/__init__.py index a8fc589..e581d27 100644 --- a/src/libtakiyasha/kgmvpr/__init__.py +++ b/src/libtakiyasha/kgmvpr/__init__.py @@ -3,12 +3,13 @@ import warnings from pathlib import Path -from typing import IO, NamedTuple +from typing import Callable, IO, NamedTuple, overload from .kgmvprdataciphers import KGMCryptoLegacy from ..exceptions import CrypterCreatingError from ..keyutils import make_salt -from ..prototypes import EncryptedBytesIOSkel +from ..miscutils import proberfuncfactory +from ..prototypes import EncryptedBytesIO from ..typedefs import BytesLike, FilePath, KeyStreamBasedStreamCipherProto, StreamCipherProto from ..typeutils import isfilepath, tobytes, verify_fileobj @@ -19,15 +20,38 @@ class KGMorVPRFileInfo(NamedTuple): cipher_data_offset: int + """加密数据在文件中开始的位置。""" cipher_data_len: int + """加密数据在文件中的长度。""" encryption_version: int + """加密方法的版本。目前仅支持 3。""" core_key_slot: int + """加解密数据所需的密钥槽序号。""" core_key_test_data: bytes + """用于验证文件内主密钥合法性的数据。""" master_key: bytes | None + """主密钥。需要配合 ``core_key_slot`` 对应的密钥使用。""" is_vpr: bool + """如果文件使用了 VPR 加密,则为 ``True``。""" + opener: Callable[[tuple[FilePath | IO[bytes], KGMorVPRFileInfo] | FilePath | IO[bytes], ...], KGMorVPR] + """打开文件的方式,为一个可调对象,其会返回一个加密文件对象。""" + opener_kwargs_required: tuple[str, ...] + """通过 ``opener`` 打开文件时,所必需的关键字参数的名称。""" + opener_kwargs_optional: tuple[str, ...] + """通过 ``opener`` 打开文件时,可选的关键字参数的名称。 + + 此属性仅储存可能会影响 ``opener`` 行为的可选关键字参数; + 对 ``opener`` 行为没有影响的可选关键字参数不会出现在此属性中。 + """ +@overload def probe_kgmvpr(filething: FilePath | IO[bytes], /) -> tuple[Path | IO[bytes], KGMorVPRFileInfo | None]: + pass + + +@proberfuncfactory +def probe_kgmvpr(filething, /): """探测源文件 ``filething`` 是否为一个 KGM 或 VPR 文件。 返回一个 2 个元素长度的元组:第一个元素为 ``filething``;如果 @@ -44,53 +68,43 @@ def probe_kgmvpr(filething: FilePath | IO[bytes], /) -> tuple[Path | IO[bytes], 一个 2 个元素长度的元组:第一个元素为 filething;如果 filething 是 KGM 或 VPR 文件,那么第二个元素为一个 KGMorVPRFileInfo 对象;否则为 None。 """ - - def operation(fd: IO[bytes]) -> KGMorVPRFileInfo | None: - total_size = fd.seek(0, 2) - if total_size < 60: - return - fd.seek(0, 0) - - header = fd.read(16) - if header == b'\x05\x28\xbc\x96\xe9\xe4\x5a\x43\x91\xaa\xbd\xd0\x7a\xf5\x36\x31': - is_vpr = True - elif header == b'\x7c\xd5\x32\xeb\x86\x02\x7f\x4b\xa8\xaf\xa6\x8e\x0f\xff\x99\x14': - is_vpr = False - else: - return - - cipher_data_offset = int.from_bytes(fd.read(4), 'little') - encryption_version = int.from_bytes(fd.read(4), 'little') - core_key_slot = int.from_bytes(fd.read(4), 'little') - core_key_test_data = fd.read(16) - master_key = fd.read(16) - - return KGMorVPRFileInfo( - cipher_data_offset=cipher_data_offset, - cipher_data_len=total_size - cipher_data_offset, - encryption_version=encryption_version, - core_key_slot=core_key_slot, - core_key_test_data=core_key_test_data, - master_key=master_key, - is_vpr=is_vpr - ) - - if isfilepath(filething): - with open(filething, mode='rb') as fileobj: - return Path(filething), operation(fileobj) + opener_kwargs_required = ['table1', 'table2', 'tablev2'] + + total_size = filething.seek(0, 2) + if total_size < 60: + return + filething.seek(0, 0) + + header = filething.read(16) + if header == b'\x05\x28\xbc\x96\xe9\xe4\x5a\x43\x91\xaa\xbd\xd0\x7a\xf5\x36\x31': + is_vpr = True + opener_kwargs_required.append('vpr_key') + elif header == b'\x7c\xd5\x32\xeb\x86\x02\x7f\x4b\xa8\xaf\xa6\x8e\x0f\xff\x99\x14': + is_vpr = False else: - fileobj = verify_fileobj(filething, 'binary', - verify_readable=True, - verify_seekable=True - ) - fileobj_origpos = fileobj.tell() - prs = operation(fileobj) - fileobj.seek(fileobj_origpos, 0) - - return fileobj, prs - - -class KGMorVPR(EncryptedBytesIOSkel): + return + + cipher_data_offset = int.from_bytes(filething.read(4), 'little') + encryption_version = int.from_bytes(filething.read(4), 'little') + core_key_slot = int.from_bytes(filething.read(4), 'little') + core_key_test_data = filething.read(16) + master_key = filething.read(16) + + return KGMorVPRFileInfo( + cipher_data_offset=cipher_data_offset, + cipher_data_len=total_size - cipher_data_offset, + encryption_version=encryption_version, + core_key_slot=core_key_slot, + core_key_test_data=core_key_test_data, + master_key=master_key, + is_vpr=is_vpr, + opener=KGMorVPR.open, + opener_kwargs_required=tuple(opener_kwargs_required), + opener_kwargs_optional=() + ) + + +class KGMorVPR(EncryptedBytesIO): """基于 BytesIO 的 KGM/VPR 透明加密二进制流。 所有读写相关方法都会经过透明加密层处理: @@ -201,14 +215,6 @@ def open(cls, tablev2: 解码表 3 vpr_key: 针对 VPR 文件额外所需的密钥 """ - # if table1 is not None: - # table1 = tobytes(table1) - # if table2 is not None: - # table2 = tobytes(table2) - # if tablev2 is not None: - # tablev2 = tobytes(tablev2) - # if vpr_key is not None: - # vpr_key = tobytes(vpr_key) table1 = tobytes(table1) table2 = tobytes(table2) tablev2 = tobytes(tablev2) diff --git a/src/libtakiyasha/kwm/__init__.py b/src/libtakiyasha/kwm/__init__.py index e6a2b70..8331624 100644 --- a/src/libtakiyasha/kwm/__init__.py +++ b/src/libtakiyasha/kwm/__init__.py @@ -4,12 +4,13 @@ import warnings from math import log10 from pathlib import Path -from typing import IO, NamedTuple +from typing import Callable, IO, NamedTuple, overload from .kwmdataciphers import Mask32, Mask32FromRecipe from ..exceptions import CrypterCreatingError, CrypterSavingError from ..keyutils import make_salt -from ..prototypes import EncryptedBytesIOSkel +from ..miscutils import proberfuncfactory +from ..prototypes import EncryptedBytesIO from ..typedefs import BytesLike, FilePath, IntegerLike, KeyStreamBasedStreamCipherProto, StreamCipherProto from ..typeutils import isfilepath, tobytes, toint, verify_fileobj @@ -23,13 +24,34 @@ class KWMFileInfo(NamedTuple): mask_recipe: bytes + """组装主密钥所需的配方。""" cipher_data_offset: int + """加密数据在文件中开始的位置。""" cipher_data_len: int + """加密数据在文件中的长度。""" bitrate: int | None + """加密数据(如果是音频)的比特率。""" suffix: str + """加密数据的格式,可用于未加密形式文件的后缀。""" + opener: Callable[[tuple[FilePath | IO[bytes], KWMFileInfo] | FilePath | IO[bytes], ...], KWM] + """打开文件的方式,为一个可调对象,其会返回一个加密文件对象。""" + opener_kwargs_required: tuple[str, ...] + """通过 ``opener`` 打开文件时,所必需的关键字参数的名称。""" + opener_kwargs_optional: tuple[str, ...] + """通过 ``opener`` 打开文件时,可选的关键字参数的名称。 + + 此属性仅储存可能会影响 ``opener`` 行为的可选关键字参数; + 对 ``opener`` 行为没有影响的可选关键字参数不会出现在此属性中。 + """ +@overload def probe_kwm(filething: FilePath | IO[bytes], /) -> tuple[Path | IO[bytes], KWMFileInfo | None]: + pass + + +@proberfuncfactory +def probe_kwm(filething, /): """探测源文件 ``filething`` 是否为一个 KWM 文件。 返回一个 2 个元素长度的元组:第一个元素为 ``filething``;如果 @@ -43,61 +65,48 @@ def probe_kwm(filething: FilePath | IO[bytes], /) -> tuple[Path | IO[bytes], KWM 一个 2 个元素长度的元组:第一个元素为 filething;如果 filething 是 KWM 文件,那么第二个元素为一个 KWMFileInfo 对象;否则为 None。 """ - - def operation(fd: IO[bytes]) -> KWMFileInfo | None: - fd.seek(0, 0) - - header_data = fd.read(1024) - cipher_data_offset = fd.tell() - cipher_data_len = fd.seek(0, 2) - cipher_data_offset - - if not header_data.startswith(b'yeelion-kuwo'): - return - - mask_recipe = header_data[24:32] - - bitrate = None - bitrate_suffix_serialized = header_data[48:56].rstrip(b'\x00') - bitrate_serialized_len = 0 - for byte in bitrate_suffix_serialized: - if byte in DIGIT_CHARS: - bitrate_serialized_len += 1 - if bitrate_serialized_len > 0: - bitrate = int(bitrate_suffix_serialized[:bitrate_serialized_len]) * 1000 - - suffix = None - suffix_serialized = bitrate_suffix_serialized[bitrate_serialized_len:] - suffix_serialized_len = 0 - for byte in suffix_serialized: - if byte in ASCII_LETTER_CHARS: - suffix_serialized_len += 1 - if suffix_serialized_len > 0: - suffix = suffix_serialized[:suffix_serialized_len].decode('ascii') - - return KWMFileInfo( - mask_recipe=mask_recipe, - cipher_data_offset=cipher_data_offset, - cipher_data_len=cipher_data_len, - bitrate=bitrate, - suffix=suffix - ) - - if isfilepath(filething): - with open(filething, mode='rb') as fileobj: - return Path(filething), operation(fileobj) - else: - fileobj = verify_fileobj(filething, 'binary', - verify_readable=True, - verify_seekable=True - ) - fileobj_origpos = fileobj.tell() - prs = operation(fileobj) - fileobj.seek(fileobj_origpos, 0) - - return fileobj, prs - - -class KWM(EncryptedBytesIOSkel): + filething.seek(0, 0) + + header_data = filething.read(1024) + cipher_data_offset = filething.tell() + cipher_data_len = filething.seek(0, 2) - cipher_data_offset + + if not header_data.startswith(b'yeelion-kuwo'): + return + + mask_recipe = header_data[24:32] + + bitrate = None + bitrate_suffix_serialized = header_data[48:56].rstrip(b'\x00') + bitrate_serialized_len = 0 + for byte in bitrate_suffix_serialized: + if byte in DIGIT_CHARS: + bitrate_serialized_len += 1 + if bitrate_serialized_len > 0: + bitrate = int(bitrate_suffix_serialized[:bitrate_serialized_len]) * 1000 + + suffix = None + suffix_serialized = bitrate_suffix_serialized[bitrate_serialized_len:] + suffix_serialized_len = 0 + for byte in suffix_serialized: + if byte in ASCII_LETTER_CHARS: + suffix_serialized_len += 1 + if suffix_serialized_len > 0: + suffix = suffix_serialized[:suffix_serialized_len].decode('ascii') + + return KWMFileInfo( + mask_recipe=mask_recipe, + cipher_data_offset=cipher_data_offset, + cipher_data_len=cipher_data_len, + bitrate=bitrate, + suffix=suffix, + opener=KWM.open, + opener_kwargs_required=('core_key',), + opener_kwargs_optional=('master_key',) + ) + + +class KWM(EncryptedBytesIO): """基于 BytesIO 的 KWM 透明加密二进制流。 所有读写相关方法都会经过透明加密层处理: diff --git a/src/libtakiyasha/miscutils.py b/src/libtakiyasha/miscutils.py index 047da2a..6716536 100644 --- a/src/libtakiyasha/miscutils.py +++ b/src/libtakiyasha/miscutils.py @@ -93,21 +93,36 @@ def bytestrxor(term1: BytesLike, term2: BytesLike, /) -> bytes: def proberfuncfactory(docstring_from=None): + """本装饰器是一个工厂函数,被装饰的函数必须具备以下行为: + + - 至少接受一个参数(位于第一个),并将此参数视为文件对象进行操作 + - 内含探测文件结构的所有逻辑 + - 如果探测到受支持的文件,返回一个 NamedTuple 对象,该对象包含探测到的信息;否则返回 `None` + + 具备以上行为的函数在被本装饰器装饰后,将会变成一个文件结构探测函数,具备以下行为: + + - 第一个纯位置参数为文件路径或文件对象,随后的任何参数都会被传递给被装饰的函数 + - 返回一个 2 元组: + + - 第一个元素是接受的文件路径或文件对象 + - 第二个元素是探测结果,具体内容见上文 + """ + def prober( operation: Callable[[IO[bytes], ...], _FileInfo | None] ) -> Callable[[FilePath, ...], tuple[FilePath | IO[bytes], _FileInfo | None]]: @wraps(operation) - def wrapper(filething: FilePath | IO[bytes], /, **kwargs) -> tuple[FilePath | IO[bytes], _FileInfo | None]: + def wrapper(filething: FilePath | IO[bytes], /, *args, **kwargs) -> tuple[FilePath | IO[bytes], _FileInfo | None]: if isfilepath(filething): with open(filething, mode='rb') as fileobj: - return Path(filething), operation(fileobj, **kwargs) + return Path(filething), operation(fileobj, *args, **kwargs) else: fileobj = verify_fileobj(filething, 'binary', verify_readable=True, verify_seekable=True ) fileobj_origpos = fileobj.tell() - prs = operation(fileobj, **kwargs) + prs = operation(fileobj, *args, **kwargs) fileobj.seek(fileobj_origpos, 0) return fileobj, prs diff --git a/src/libtakiyasha/ncm.py b/src/libtakiyasha/ncm.py index 89ea676..6ea4441 100644 --- a/src/libtakiyasha/ncm.py +++ b/src/libtakiyasha/ncm.py @@ -14,7 +14,7 @@ from .exceptions import CrypterCreatingError from .keyutils import make_random_ascii_string, make_random_number_string from .miscutils import BINARIES_ROOTDIR, bytestrxor, proberfuncfactory -from .prototypes import EncryptedBytesIOSkel +from .prototypes import EncryptedBytesIO from .stdciphers import ARC4, StreamedAESWithModeECB from .typedefs import BytesLike, FilePath from .typeutils import isfilepath, tobytes, verify_fileobj @@ -329,7 +329,7 @@ class NCMFileInfo(NamedTuple): 要想打开文件,需要使用 ``NCM.open()``。不可以直接通过 ``NCMFileInfo`` 打开文件。 """ master_key_encrypted: bytes - """已加密的主密钥。""" + """受加密保护的主密钥。""" ncm_163key: bytes """歌曲在网易云音乐平台上的歌曲信息,为 AES 加密的 JSON 字符串。""" cipher_ctor: Callable[[...], ARC4] @@ -345,10 +345,14 @@ class NCMFileInfo(NamedTuple): """封面数据在文件中的长度。""" opener: Callable[[tuple[FilePath | IO[bytes], NCMFileInfo] | FilePath | IO[bytes], ...], NCM] """打开文件的方式,为一个可调对象,其会返回一个加密文件对象。""" - opener_kwargs_required: list[str] - """通过 `opener` 打开文件时,所必需的关键字参数的名称。""" - opener_kwargs_optional: list[str] - """通过 `opener` 打开文件时,可选的关键字参数的名称。""" + opener_kwargs_required: tuple[str, ...] + """通过 ``opener`` 打开文件时,所必需的关键字参数的名称。""" + opener_kwargs_optional: tuple[str, ...] + """通过 ``opener`` 打开文件时,可选的关键字参数的名称。 + + 此属性仅储存可能会影响 ``opener`` 行为的可选关键字参数; + 对 ``opener`` 行为没有影响的可选关键字参数不会出现在此属性中。 + """ @overload @@ -413,12 +417,12 @@ def probe_ncm(filething, /): cover_data_offset=cover_data_offset, cover_data_len=cover_data_len, opener=NCM.open, - opener_kwargs_required=['core_key'], - opener_kwargs_optional=['tag_key', 'master_key'] + opener_kwargs_required=('core_key',), + opener_kwargs_optional=('tag_key', 'master_key') ) -class NCM(EncryptedBytesIOSkel): +class NCM(EncryptedBytesIO): """基于 BytesIO 的 NCM 透明加密二进制流。 所有读写相关方法都会经过透明加密层处理: diff --git a/src/libtakiyasha/prototypes.py b/src/libtakiyasha/prototypes.py index 1aadb61..2a51deb 100644 --- a/src/libtakiyasha/prototypes.py +++ b/src/libtakiyasha/prototypes.py @@ -2,7 +2,6 @@ from __future__ import annotations from abc import ABCMeta, abstractmethod -from functools import lru_cache from pathlib import Path from random import randint from typing import Callable, Generator, Iterable, Iterator, Literal, Type @@ -18,8 +17,7 @@ __all__ = [ 'CipherSkel', 'KeyStreamBasedStreamCipherSkel', - 'CryptLayerWrappedIOSkel', - 'EncryptedBytesIOSkel' + 'EncryptedBytesIO' ] @@ -139,451 +137,7 @@ def decrypt(self, cipherdata: BytesLike, offset: IntegerLike = 0, /) -> bytes: return bytes(pd_strm) -class CryptLayerWrappedIOSkel(io.BytesIO): - """基于 BytesIO 的透明加密二进制流。 - - 所有读写相关方法都会经过透明加密层处理: - 读取时,返回解密后的数据;写入时,向缓冲区写入加密后的数据。 - - 调用读写相关方法时,附加参数 ``nocryptlayer=True`` - 可绕过透明加密层,访问缓冲区内的原始加密数据。 - - ``__init__()`` 方法的第一个位置参数 ``cipher`` 必须拥有 - ``encrypt()``、``decrypt()`` 和 ``keystream()`` 方法,且这些方法必须能接受两个位置参数。 - 其中,``encrypt()`` 和 ``decrypt()`` 的第一个位置参数接受字节对象,第二个位置参数接受非负整数; - ``keystream()`` 的两个位置参数均只接受非负整数。 - - 如果 ``cipher`` 未实现这些方法中的任何一个,都需要明确抛出 ``NotImplementedError``。 - 未实现的 ``encrypt()``/``decrypt()`` 方法会导致创建的对象不可通过透明加密层读/写; - 未实现的 ``keystream()`` 方法不会影响对读写的支持,但可能会极大影响读取的速度。 - - ``__init__()`` 方法的第二个参数 ``initial_bytes`` - 会在转换为 ``bytes`` 后作为对象内置缓冲区的初始数据。 - - 基于本类的子类可能拥有自己的构造器方法或函数,而不是直接调用 - ``__init__()``;详情请参考该类的文档字符串。 - - 本类和基于本类的子类,同时兼容 ``IO[bytes]`` - 和 ``typedefs.StreamCipherBasedCryptedIOProto`` 类型。 - """ - - @property - def name(self) -> str | None: - """当前对象来源文件的路径。 - - 在此类的对象中,此属性总是 ``None``。 - - 如果是通过子类的构造器方法或函数创建的对象,此属性可能会为来源文件的路径字符串。 - """ - if hasattr(self, '_name'): - name: str = self._name - return name - - @property - def encryptable(self) -> bool: - """此对象的内置透明加密层是否支持加密(内置 ``Cipher`` 对象的 ``encrypt()`` 方法是否可用)。 - - 这会影响到写入相关方法在参数 ``nocryptlayer=False`` 时是否可用。 - """ - return self._encrypt_available - - @property - def decryptable(self) -> bool: - """此对象的内置透明加密层是否支持解密(内置 ``Cipher`` 对象的 ``decrypt()`` 方法是否可用)。 - - 这会影响到读取相关方法在参数 ``nocryptlayer=False`` 时是否可用,以及此对象是否可迭代。 - """ - return self._decrypt_available - - @property - def iter_nocryptlayer(self) -> bool: - """迭代当前对象时,是否需要绕过透明加密层。默认为 ``False``。""" - return self._iter_nocryptlayer - - @iter_nocryptlayer.setter - def iter_nocryptlayer(self, value: bool): - self._iter_nocryptlayer = bool(value) - - @property - def iter_mode(self) -> Literal['block', 'line']: - """迭代的模式,只能设置为 ``block`` 或 ``line``: - - - ``block``(默认值)- 以块为单位进行迭代:每次迭代时,返回等长的“一块”数据。 - - 每次迭代返回的数据长度由 ``self.iter_block_size`` 决定。 - - ``line`` - 以一行为单位进行迭代:每次迭代时,返回的数据都以 ``b'\\n'`` 结尾。 - - 此模式会极大降低迭代的速度,不推荐使用。 - - 尝试设置为其他值会触发 ``ValueError`` 或 ``TypeError``。 - """ - return self._iter_mode - - @iter_mode.setter - def iter_mode(self, value: Literal['block', 'line']) -> None: - if value in ('block', 'line'): - self._iter_mode = value - elif isinstance(value, str): - raise ValueError(f"attribute 'iter_mode' must be 'block' or 'line', not '{value}'") - else: - raise TypeError(f"attribute 'iter_mode' must be str, not {type(value).__name__}") - - @property - def iter_block_size(self) -> int: - """以块为单位进行迭代时,每次迭代返回的数据长度。 - - 如果尝试设置为负数,会触发 ``ValueError``。 - - 本属性不会影响以一行为单位进行的迭代。 - """ - return self._iter_block_size - - @iter_block_size.setter - def iter_block_size(self, value: IntegerLike) -> None: - size = toint(value) - if size < 0: - raise ValueError("attribute 'iter_block_size' cannot be a negative integer") - self._iter_block_size = size - - def __init__(self, cipher, /, initial_bytes: BytesLike = b'') -> None: - """基于 BytesIO 的透明加密二进制流。 - - 所有读写相关方法都会经过透明加密层处理: - 读取时,返回解密后的数据;写入时,向缓冲区写入加密后的数据。 - - 调用读写相关方法时,附加参数 ``nocryptlayer=True`` - 可绕过透明加密层,访问缓冲区内的原始加密数据。 - - ``__init__()`` 方法的第一个位置参数 ``cipher`` 必须拥有 - ``encrypt()``、``decrypt()`` 和 ``keystream()`` 方法,且这些方法必须能接受两个位置参数。 - 其中,``encrypt()`` 和 ``decrypt()`` 的第一个位置参数接受字节对象,第二个位置参数接受非负整数; - ``keystream()`` 的两个位置参数均只接受非负整数。 - - 如果 ``cipher`` 未实现这些方法中的任何一个,都需要明确抛出 ``NotImplementedError``。 - 未实现的 ``encrypt()``/``decrypt()`` 方法会导致创建的对象不可通过透明加密层读/写; - 未实现的 ``keystream()`` 方法不会影响对读写的支持,但可能会极大影响读取的速度。 - - ``__init__()`` 方法的第二个参数 ``initial_bytes`` - 会在转换为 ``bytes`` 后作为对象内置缓冲区的初始数据。 - - 基于本类的子类可能拥有自己的构造器方法或函数,而不是直接调用 - ``__init__()``;详情请参考该类的文档字符串。 - - 本类和基于本类的子类,同时兼容 ``IO[bytes]`` - 和 ``typedefs.StreamCipherBasedCryptedIOProto`` 类型。 - """ - super().__init__(tobytes(initial_bytes)) - - for method_name in 'keystream', 'encrypt', 'decrypt': - try: - method = getattr(cipher, method_name) - except Exception as exc: - if hasattr(cipher, method_name): - raise exc - else: - raise TypeError(f"{repr(cipher)} is not a StreamCipher object: " - f"method '{method_name}' is missing" - ) - if not callable(method): - raise TypeError(f"{repr(cipher)} is not a StreamCipher object: " - f"method '{method_name}' is not callable" - ) - # 检测 keystream() 是否已实现(可用) - self._keystream_available = True - try: - cipher.keystream(0, 0) - except NotImplementedError: - self._keystream_available = False - # 检测 encrypt() 是否已实现(可用) - self._encrypt_available = True - try: - cipher.encrypt(b'', 0) - except NotImplementedError: - self._encrypt_available = False - # 检测 decrypt() 是否已实现(可用) - self._decrypt_available = True - try: - cipher.decrypt(b'', 0) - except NotImplementedError: - self._decrypt_available = False - - self._cipher = cipher - self._iter_nocryptlayer = False - self._iter_mode: Literal['block', 'line'] = 'block' - self._iter_block_size: int = io.DEFAULT_BUFFER_SIZE - - def __iter__(self): - return self - - def __next__(self) -> bytes: - if self._iter_mode == 'line': - if self._iter_nocryptlayer: - return super().__next__() - elif not self._decrypt_available: - raise io.UnsupportedOperation('iter with crypt layer') - else: - curpos = self.tell() - - target_data = super().getvalue()[curpos:] - if self._keystream_available: - result_data = bytes(self._xor_data_keystream(curpos, target_data, eof=b'\n')) - else: - result_data = bytearray() - start = curpos - while 1: - stop = start + self._iter_block_size - target_data_segment = target_data[start:stop] - if target_data_segment == b'': - break - d = self._cipher.decrypt(target_data_segment, start) - if b'\n' in d: - result_data.append(d[:d.index(b'\n')]) - break - else: - result_data.append(d) - start += self._iter_block_size - if result_data == b'': - raise StopIteration - - self.seek(curpos + len(result_data), 0) - - return result_data - elif self._iter_mode == 'block': - if not self._decrypt_available: - raise io.UnsupportedOperation('iter with crypt layer') - - curpos = self.tell() - - target_data = super().getvalue()[curpos:curpos + self._iter_block_size] - if self._iter_nocryptlayer: - result_data = target_data - elif self._keystream_available: - result_data = bytes(self._xor_data_keystream(curpos, target_data, eof=None)) - else: - result_data = bytes(self._cipher.decrypt(target_data, curpos)) - - if result_data == b'': - raise StopIteration - - self.seek(curpos + len(result_data), 0) - - return result_data - elif isinstance(self._iter_mode, str): - raise ValueError(f"attribute 'iter_mode' must be 'block' or 'line', not '{self._iter_mode}'") - else: - raise TypeError(f"attribute 'iter_mode' must be str, not {type(self._iter_mode).__name__}") - - @lru_cache - def __repr__(self) -> str: - repr_strings = [ - f'<{type(self).__module__}.{type(self).__name__} object', - f' at {hex(id(self))}', - f', cipher={repr(self._cipher)}' - ] - if self.name is not None: - repr_strings.append(f", from '{self.name}'") - repr_strings.append('>') - - return ''.join(repr_strings) - - def _xor_data_keystream(self, - offset: int, - data: bytes, - eof: bytes = None - ) -> Generator[int, None, None]: - if eof is None: - eoford = None - else: - eoford = ord(tobytes(eof)) - - if data == b'': - return - - keystream = self._cipher.keystream(len(data), offset) - for databyteord, streambyteord in zip(data, keystream): - resultbyteord = databyteord ^ streambyteord - yield resultbyteord - if resultbyteord == eoford: - return - - def getvalue(self, nocryptlayer: bool = False) -> bytes: - if nocryptlayer: - return super().getvalue() - elif not self._decrypt_available: - raise io.UnsupportedOperation('getvalue with crypt layer') - else: - return self._cipher.decrypt(super().getvalue()) - - def getbuffer(self, nocryptlayer: bool = False) -> memoryview: - if nocryptlayer: - return super().getbuffer() - else: - raise NotImplementedError('memoryview with crypt layer is not supported') - - def read(self, size: IntegerLike | None = -1, /, nocryptlayer: bool = False) -> bytes: - if nocryptlayer: - return super().read(size) - elif not self._decrypt_available: - raise io.UnsupportedOperation('read with crypt layer') - else: - curpos = self.tell() - if size is None: - size = -1 - size = toint(size) - if size < 0: - target_data = super().getvalue()[curpos:] - else: - target_data = super().getvalue()[curpos:curpos + size] - - if self._keystream_available: - result_data = bytes(self._xor_data_keystream(curpos, target_data)) - else: - result_data = self._cipher.decrypt(target_data, curpos) - self.seek(curpos + len(result_data), 0) - - return result_data - - def readinto(self, buffer: WritableBuffer, /, nocryptlayer: bool = False) -> int: - if nocryptlayer: - return super().readinto(buffer) - elif not self._decrypt_available: - raise io.UnsupportedOperation('readinto with crypt layer') - else: - if isinstance(buffer, memoryview): - memview = buffer - else: - memview = memoryview(buffer) - memview = memview.cast('B') - - data = self.read(len(memview)) - data_len = len(data) - - memview[:data_len] = data - - return data_len - - def read1(self, size: IntegerLike | None = -1, /, nocryptlayer: bool = False) -> bytes: - if nocryptlayer or self._decrypt_available: - return self.read(size, nocryptlayer) - - raise io.UnsupportedOperation('read1 with crypt layer') - - def readinto1(self, buffer: WritableBuffer, /, nocryptlayer: bool = False) -> int: - if nocryptlayer or self._decrypt_available: - return self.readinto(buffer, nocryptlayer) - - raise io.UnsupportedOperation('readinto1 with crypt layer') - - def readblock(self, - size: IntegerLike | None = -1, /, - nocryptlayer: bool = False, *, - block_size: IntegerLike | None = io.DEFAULT_BUFFER_SIZE - ) -> bytes: - if not self._decrypt_available: - raise io.UnsupportedOperation('readblock with crypt layer') - - curpos = self.tell() - if size is None: - size = -1 - size = toint(size) - if block_size is None: - block_size = io.DEFAULT_BUFFER_SIZE - block_size = toint(block_size) - if block_size < 0: - block_size = io.DEFAULT_BUFFER_SIZE - if size < 0: - target_data = super().getvalue()[curpos:curpos + block_size] - else: - target_data = super().getvalue()[curpos:curpos + min([size, block_size])] - - if nocryptlayer: - result_data = target_data - elif self._keystream_available: - result_data = bytes(self._xor_data_keystream(curpos, target_data, eof=None)) - else: - result_data = self._cipher.decrypt(target_data, curpos) - - self.seek(curpos + len(result_data), 0) - - return result_data - - def readline(self, size: IntegerLike | None = -1, /, nocryptlayer: bool = False) -> bytes: - if nocryptlayer: - return super().readline(size) - else: - if not self._decrypt_available: - raise io.UnsupportedOperation('readline with crypt layer') - curpos = self.tell() - if size is None: - size = -1 - size = toint(size) - if size < 0: - target_data = super().getvalue()[curpos:] - else: - target_data = super().getvalue()[curpos:curpos + size] - - if self._keystream_available: - result_data = bytes(self._xor_data_keystream(curpos, target_data, eof=b'\n')) - else: - result_data = bytearray() - start = curpos - while 1: - stop = start + self._iter_block_size - target_data_segment = target_data[start:stop] - if target_data_segment == b'': - break - d = self._cipher.decrypt(target_data_segment, start) - if b'\n' in d: - result_data.append(d[:d.index(b'\n')]) - break - else: - result_data.append(d) - start += self._iter_block_size - self.seek(curpos + len(result_data), 0) - - return bytes(result_data) - - def readlines(self, hint: IntegerLike | None = -1, /, nocryptlayer: bool = False) -> list[bytes]: - if nocryptlayer: - return super().readlines(hint) - elif not self._decrypt_available: - raise io.UnsupportedOperation('readlines with crypt layer') - else: - results_lines = [] - if hint is None: - hint = -1 - hint = toint(hint) - if hint < 0: - while 1: - line = self.readline() - if line == b'': - return results_lines - results_lines.append(line) - else: - for _ in range(hint): - line = self.readline() - if line == b'': - return results_lines - results_lines.append(line) - - def write(self, data: BytesLike, /, nocryptlayer: bool = False) -> int: - if nocryptlayer: - return super().write(data) - elif not self._encrypt_available: - raise io.UnsupportedOperation('write with crypt layer') - else: - curpos = self.tell() - return super().write(self._cipher.encrypt(data, curpos)) - - def writelines(self, lines: Iterable[BytesLike], /, nocryptlayer: bool = False) -> None: - if nocryptlayer: - return super().writelines(lines) - elif not self._encrypt_available: - raise io.UnsupportedOperation('writelines with crypt layer') - else: - for line in lines: - super().write(line) - - -class EncryptedBytesIOSkel(io.BytesIO): +class EncryptedBytesIO(io.BytesIO): def __repr__(self) -> str: reprstr_seq = [ f'<{self.__module__}.{self.__class__.__name__} at {hex(id(self))}, ' diff --git a/src/libtakiyasha/qmc/__init__.py b/src/libtakiyasha/qmc/__init__.py index c7c4597..bb572c7 100644 --- a/src/libtakiyasha/qmc/__init__.py +++ b/src/libtakiyasha/qmc/__init__.py @@ -6,13 +6,14 @@ from base64 import b64decode, b64encode from dataclasses import dataclass from pathlib import Path -from typing import Callable, IO, Iterable, Literal, NamedTuple +from typing import Callable, IO, Iterable, Literal, NamedTuple, overload from .qmcdataciphers import HardenedRC4, Mask128 from .qmckeyciphers import QMCv2KeyEncryptV1, QMCv2KeyEncryptV2 from ..exceptions import CrypterCreatingError from ..keyutils import make_random_ascii_string, make_salt -from ..prototypes import EncryptedBytesIOSkel +from ..miscutils import proberfuncfactory +from ..prototypes import EncryptedBytesIO from ..typedefs import BytesLike, FilePath, IntegerLike from ..typeutils import isfilepath, tobytes, verify_fileobj from ..warns import CrypterSavingWarning @@ -43,9 +44,9 @@ class QMCv2QTag: 可以按照操作数据类(``dataclass``)实例的方式操作本类的实例。 """ song_id: int = 0 - """此歌曲在 QQ 音乐的 ID。""" + """QTag 数据的第二部分,为 QTag 数据所在文件中被加密的歌曲在 QQ 音乐的 ID。""" unknown: int = 2 - """含义未知,在已知所有样本中都为 2。""" + """QTag 数据的第三部分,含义未知,在已知所有样本中都为 2。""" @classmethod def load(cls, qtag_serialized: BytesLike, /): @@ -75,13 +76,16 @@ def dump(self, master_key_encrypted_b64encoded: BytesLike, /) -> bytes: @dataclass class QMCv2STag: - """解析、存储和重建 QMCv2 文件末尾的 STag 数据。""" + """解析、存储和重建 QMCv2 文件末尾的 STag 数据。 + + 可以按照操作数据类(``dataclass``)实例的方式操作本类的实例。 + """ song_id: int = 0 - """此歌曲在 QQ 音乐的 ID。""" + """STag 数据的第一部分,为 QTag 数据所在文件中被加密的歌曲在 QQ 音乐的 ID。""" unknown: int = 2 - """含义未知,在已知所有样本中都为 2。""" + """STag 数据的第二部分,含义未知,在已知所有样本中都为 2。""" song_mid: str = '0' * 14 - """此歌曲在 QQ 音乐的媒体 ID(MId)。""" + """STag 数据的第三部分,为 QTag 数据所在文件中被加密的歌曲在 QQ 音乐的媒体 ID(MId)。""" @classmethod def load(cls, stag_serialized: BytesLike, /): @@ -99,10 +103,7 @@ def load(cls, stag_serialized: BytesLike, /): return cls(song_id=song_id, unknown=unknown, song_mid=song_mid) def dump(self) -> bytes: - """根据当前 QMCv2STag 对象生成并返回一段 STag 数据。 - - 可以按照操作数据类(``dataclass``)实例的方式操作本类的实例。 - """ + """根据当前 QMCv2STag 对象生成并返回一段 STag 数据。""" return b','.join( [ str(self.song_id).encode('ascii'), @@ -115,17 +116,57 @@ def dump(self) -> bytes: class QMCv1FileInfo(NamedTuple): """用于存储 QMCv1 文件的信息。""" cipher_data_offset: int + """加密数据在文件中开始的位置。""" cipher_data_len: int + """加密数据在文件中的长度。""" + opener: Callable[[tuple[FilePath | IO[bytes], QMCv1FileInfo] | FilePath | IO[bytes], ...], QMCv1] + """打开文件的方式,为一个可调对象,其会返回一个加密文件对象。""" + opener_kwargs_required: tuple[str, ...] + """通过 ``opener`` 打开文件时,所必需的关键字参数的名称。""" + opener_kwargs_optional: tuple[str, ...] + """通过 ``opener`` 打开文件时,可选的关键字参数的名称。 + + 此属性仅储存可能会影响 ``opener`` 行为的可选关键字参数; + 对 ``opener`` 行为没有影响的可选关键字参数不会出现在此属性中。 + """ class QMCv2FileInfo(NamedTuple): """用于存储 QMCv2 文件的信息。""" cipher_ctor: Callable[[...], HardenedRC4] | Callable[[...], Mask128] | None + """Cipher 的构造函数,接受一个必需参数(密钥),返回一个用于解密数据的 + Cipher 对象。通常就是 Cipher 类自身。 + + 对于文件尾部嵌有 STag 数据,或者无任何尾部数据的 QMCv2 文件,由于其没有内嵌密钥,此属性为 ``None``。 + """ cipher_data_offset: int + """加密数据在文件中开始的位置。""" cipher_data_len: int - master_key_encrypted: bytes | None + """加密数据在文件中的长度。""" + master_key_encrypted: bytes + """受加密保护的主密钥。 + + 对于文件尾部嵌有 STag 数据,或者无任何尾部数据的 QMCv2 文件,由于其没有内嵌密钥,此属性为 ``None``。 + """ master_key_encryption_ver: int | None + """主密钥的加密保护版本,通常为 1 或 2,也仅支持这两个版本。 + + 对于文件尾部嵌有 STag 数据,或者无任何尾部数据的 QMCv2 文件,由于其没有内嵌密钥,此属性为 ``None``。 + """ extra_info: QMCv2QTag | QMCv2STag | None + """文件尾部嵌有的除主密钥之外的额外数据,经过解析后的结果。 + 如果有,为 ``QMCv2QTag`` 或 ``QMCv2STag`` 对象;如果没有,则为 ``None``。 + """ + opener: Callable[[tuple[FilePath | IO[bytes], QMCv2FileInfo] | FilePath | IO[bytes], ...], QMCv2] + """打开文件的方式,为一个可调对象,其会返回一个加密文件对象。""" + opener_kwargs_required: tuple[str, ...] + """通过 ``opener`` 打开文件时,所必需的关键字参数的名称。""" + opener_kwargs_optional: tuple[str, ...] + """通过 ``opener`` 打开文件时,可选的关键字参数的名称。 + + 此属性仅储存可能会影响 ``opener`` 行为的可选关键字参数; + 对 ``opener`` 行为没有影响的可选关键字参数不会出现在此属性中。 + """ def _guess_cipher_ctor(master_key: BytesLike, /, @@ -147,8 +188,16 @@ def _guess_cipher_ctor(master_key: BytesLike, /, return Mask128 -def probe_qmcv1(filething: FilePath | IO[bytes], /, is_qmcv1: bool = False) -> tuple[Path | IO[bytes], QMCv1FileInfo | None]: - """探测源文件 ``filething`` 是否为一个 QMCv1 文件。 +@overload +def probe_qmcv1(filething: FilePath | IO[bytes], /, + is_qmcv1: bool = False + ) -> tuple[Path | IO[bytes], QMCv1FileInfo | None]: + pass + + +@proberfuncfactory +def probe_qmcv1(filething, /, is_qmcv1=False): + r"""探测源文件 ``filething`` 是否为一个 QMCv1 文件。 返回一个 2 个元素长度的元组: @@ -156,13 +205,14 @@ def probe_qmcv1(filething: FilePath | IO[bytes], /, is_qmcv1: bool = False) -> t - 如果 ``filething`` 是 QMCv1 文件,那么第二个元素为一个 ``QMCv1FileInfo`` 对象; - 否则为 ``None``。 - 目前难以通过文件结构识别 QMCv1 文件,因此本方法通过文件扩展名判断是否为 QMCv1 文件。 - 只要文件扩展名匹配下列正则表达式模式(不区分大小写),本方法就会将此文件视为一个 QMCv1 文件: + 目前无法在没有密钥的情况下通过文件结构识别 QMCv1 文件, + 因此本方法通过文件名模式判断是否为 QMCv1 文件。 + 只要文件名匹配下列正则表达式模式(不区分大小写),本方法就会将此文件视为一个 QMCv1 文件: - ``^\\.qmc[a-zA-Z0-9]{1,4}$`` + ``^.*\.qmc([0-9]|flac|ogg|ra)$`` - 对于不匹配以上正则表达式的文件扩展名(或者无法获取到文件扩展名),如果参数 - ``is_qmcv1=True``,本方法会跳过探测过程,认为此文件是一个 QMCv1 文件,并直接返回结果。 + 对于不匹配以上正则表达式的文件名,如果参数 ``is_qmcv1=True``, + 本方法会跳过探测过程,认为此文件是一个 QMCv1 文件,并直接返回结果。 本方法的返回值可以用于 ``QMCv1.open()`` 的第一个位置参数。 @@ -176,33 +226,27 @@ def probe_qmcv1(filething: FilePath | IO[bytes], /, is_qmcv1: bool = False) -> t filething 是 QMCv1 文件,那么第二个元素为一个 QMCv1FileInfo 对象;否则为 None。 """ - def operation(fd: IO[bytes]) -> QMCv1FileInfo | None: - filename = getattr(fd, 'name', None) - if filename is None and not is_qmcv1: - return - filepath = Path(filename) - if QMCV1_FILENAME_PATTERN.fullmatch(filepath.name) or is_qmcv1: - return QMCv1FileInfo( - cipher_data_offset=0, - cipher_data_len=fd.seek(0, 2) - ) + filename = getattr(filething, 'name', None) + if filename is None and not is_qmcv1: + return + filepath = Path(filename) + if QMCV1_FILENAME_PATTERN.fullmatch(filepath.name) or is_qmcv1: + return QMCv1FileInfo( + cipher_data_offset=0, + cipher_data_len=filething.seek(0, 2), + opener=QMCv1.open, + opener_kwargs_required=('mask',), + opener_kwargs_optional=() + ) - if isfilepath(filething): - with open(filething, mode='rb') as fileobj: - return Path(filething), operation(fileobj) - else: - fileobj = verify_fileobj(filething, 'binary', - verify_readable=True, - verify_seekable=True - ) - fileobj_origpos = fileobj.tell() - prs = operation(fileobj) - fileobj.seek(fileobj_origpos, 0) - return fileobj, prs +@overload +def probe_qmcv2(filething: FilePath | IO[bytes], /) -> tuple[Path | IO[bytes], QMCv2FileInfo | None]: + pass -def probe_qmcv2(filething: FilePath | IO[bytes], /) -> tuple[Path | IO[bytes], QMCv2FileInfo | None]: +@proberfuncfactory +def probe_qmcv2(filething, /): """探测源文件 ``filething`` 是否为一个 QMCv2 文件。 返回一个 2 个元素长度的元组: @@ -221,78 +265,89 @@ def probe_qmcv2(filething: FilePath | IO[bytes], /) -> tuple[Path | IO[bytes], Q 一个 2 个元素长度的元组:第一个元素为 filething;如果 filething 是 QMCv2 文件,那么第二个元素为一个 QMCv2FileInfo 对象;否则为 None。 """ + opener_kwargs_required = [] + opener_kwargs_optional = [] - def operation(fd: IO[bytes]) -> QMCv2FileInfo | None: - total_size = fd.seek(-4, 2) + 4 - tail_data = fd.read(4) - - if tail_data == b'STag': - fd.seek(-8, 2) - tag_serialized_len = int.from_bytes(fd.read(4), 'big') - if tag_serialized_len > (total_size - 8): - return - cipher_data_len = fd.seek(-(tag_serialized_len + 8), 2) - extra_info = QMCv2STag.load(fd.read(tag_serialized_len)) - - cipher_ctor = None - master_key_encrypted = None - master_key_encryption_ver = None - elif tail_data == b'QTag': - fd.seek(-8, 2) - tag_serialized_len = int.from_bytes(fd.read(4), 'big') - if tag_serialized_len > (total_size - 8): - return - cipher_data_len = fd.seek(-(tag_serialized_len + 8), 2) - master_key_encrypted_b64encoded, extra_info = QMCv2QTag.load(fd.read(tag_serialized_len)) - master_key_encrypted = b64decode(master_key_encrypted_b64encoded) - - cipher_ctor = _guess_cipher_ctor(master_key_encrypted) - master_key_encryption_ver = 1 - else: - extra_info = None - master_key_encrypted_b64encoded_len = int.from_bytes(tail_data, 'little') - if master_key_encrypted_b64encoded_len > total_size - 4: - return - cipher_data_len = fd.seek(-(master_key_encrypted_b64encoded_len + 4), 2) - master_key_encrypted_b64encoded = fd.read(master_key_encrypted_b64encoded_len) - try: - master_key_encrypted_b64encoded.decode('ascii') - except UnicodeDecodeError: - return - master_key_encrypted_b64decoded = b64decode(master_key_encrypted_b64encoded) - if master_key_encrypted_b64decoded.startswith(b'QQMusic EncV2,Key:'): - master_key_encrypted = master_key_encrypted_b64decoded[18:] - master_key_encryption_ver = 2 - else: - master_key_encrypted = master_key_encrypted_b64decoded - master_key_encryption_ver = 1 - cipher_ctor = _guess_cipher_ctor(master_key_encrypted) - - return QMCv2FileInfo(cipher_ctor=cipher_ctor, - cipher_data_offset=0, - cipher_data_len=cipher_data_len, - master_key_encrypted=master_key_encrypted, - master_key_encryption_ver=master_key_encryption_ver, - extra_info=extra_info - ) + total_size = filething.seek(-4, 2) + 4 + tail_data = filething.read(4) + + if tail_data == b'STag': + filething.seek(-8, 2) + tag_serialized_len = int.from_bytes(filething.read(4), 'big') + if tag_serialized_len > (total_size - 8): + return + cipher_data_len = filething.seek(-(tag_serialized_len + 8), 2) + extra_info = QMCv2STag.load(filething.read(tag_serialized_len)) + + cipher_ctor = None + master_key_encrypted = None + master_key_encryption_ver = None + + opener_kwargs_required.append('master_key') + elif tail_data == b'QTag': + opener_kwargs_required.append('core_key') + opener_kwargs_optional.append('master_key') - if isfilepath(filething): - with open(filething, mode='rb') as fileobj: - return Path(filething), operation(fileobj) + filething.seek(-8, 2) + tag_serialized_len = int.from_bytes(filething.read(4), 'big') + if tag_serialized_len > (total_size - 8): + return + cipher_data_len = filething.seek(-(tag_serialized_len + 8), 2) + master_key_encrypted_b64encoded, extra_info = QMCv2QTag.load(filething.read(tag_serialized_len)) + try: + master_key_encrypted_b64encoded.decode('ascii') + except UnicodeDecodeError: + return + master_key_encrypted_b64decoded = b64decode(master_key_encrypted_b64encoded) + if master_key_encrypted_b64decoded.startswith(b'QQMusic EncV2,Key:'): + master_key_encrypted = master_key_encrypted_b64decoded[18:] + master_key_encryption_ver = 2 + + opener_kwargs_required.append('garble_keys') + else: + master_key_encrypted = master_key_encrypted_b64decoded + master_key_encryption_ver = 1 + cipher_ctor = _guess_cipher_ctor(master_key_encrypted) else: - fileobj = verify_fileobj(filething, 'binary', - verify_readable=True, - verify_seekable=True - ) - fileobj_origpos = fileobj.tell() - prs = operation(fileobj) - fileobj.seek(fileobj_origpos, 0) + opener_kwargs_required.append('core_key') + opener_kwargs_optional.append('master_key') - return fileobj, prs + extra_info = None + master_key_encrypted_b64encoded_len = int.from_bytes(tail_data, 'little') + if master_key_encrypted_b64encoded_len > total_size - 4: + return + cipher_data_len = filething.seek(-(master_key_encrypted_b64encoded_len + 4), 2) + master_key_encrypted_b64encoded = filething.read(master_key_encrypted_b64encoded_len) + try: + master_key_encrypted_b64encoded.decode('ascii') + except UnicodeDecodeError: + return + master_key_encrypted_b64decoded = b64decode(master_key_encrypted_b64encoded) + if master_key_encrypted_b64decoded.startswith(b'QQMusic EncV2,Key:'): + master_key_encrypted = master_key_encrypted_b64decoded[18:] + master_key_encryption_ver = 2 + + opener_kwargs_required.append('garble_keys') + else: + master_key_encrypted = master_key_encrypted_b64decoded + master_key_encryption_ver = 1 + cipher_ctor = _guess_cipher_ctor(master_key_encrypted) + + return QMCv2FileInfo(cipher_ctor=cipher_ctor, + cipher_data_offset=0, + cipher_data_len=cipher_data_len, + master_key_encrypted=master_key_encrypted, + master_key_encryption_ver=master_key_encryption_ver, + extra_info=extra_info, + opener=QMCv2.open, + opener_kwargs_required=tuple(opener_kwargs_required), + opener_kwargs_optional=tuple(opener_kwargs_optional) + ) def probe_qmc( - filething: FilePath | IO[bytes], / + filething: FilePath | IO[bytes], /, + is_qmcv1: bool = False ) -> tuple[Path | IO[bytes], QMCv1FileInfo | None] | tuple[Path | IO[bytes], QMCv2FileInfo | None]: """探测源文件 ``filething`` 是否为一个 QMCv1 或 QMCv2 文件。 @@ -307,6 +362,7 @@ def probe_qmc( Args: filething: 源文件的路径或文件对象 + is_qmcv1: (针对 QMCv1 文件)跳过探测过程,认为源文件是一个 QMCv1 文件;QMCv2 文件不受此参数影响 Returns: 一个 2 个元素长度的元组:第一个元素为 filething;如果 filething 是 QMCv1 文件,那么第二个元素为一个 QMCv1FileInfo 对象;如果 @@ -315,10 +371,10 @@ def probe_qmc( fthing, fileinfo = probe_qmcv2(filething) if fileinfo: return fthing, fileinfo - return probe_qmcv1(filething) + return probe_qmcv1(filething, is_qmcv1=is_qmcv1) -class QMCv1(EncryptedBytesIOSkel): +class QMCv1(EncryptedBytesIO): """基于 BytesIO 的 QMCv1 透明加密二进制流。 所有读写相关方法都会经过透明加密层处理: @@ -542,7 +598,7 @@ def new(cls, mask: BytesLike = None): return cls(Mask128(mask)) -class QMCv2(EncryptedBytesIOSkel): +class QMCv2(EncryptedBytesIO): """基于 BytesIO 的 QMCv2 透明加密二进制流。 所有读写相关方法都会经过透明加密层处理: