diff --git a/CHANGELOG.md b/CHANGELOG.md
new file mode 100644
index 0000000..457a03c
--- /dev/null
+++ b/CHANGELOG.md
@@ -0,0 +1,15 @@
+## 变更记录
+
+### 版本 2.1.0
+
+- 减少了一些重复代码的使用,删除了大量不再使用的代码
+- 优化了判断是否为 QMCv1 文件的逻辑、QMCv2 文件的主密钥探测逻辑
+- `libtakiyasha.qmc.QMCv2` 的 `open()` 和 `save()` 现在可接受多个混淆密钥,通过关键字参数 `garble_keys` 在需要时传入。
+ - **因此,上述方法中原来的关键字参数 `garble_key1` 和 `garble_key2` 已经被干掉了,请及时修改你的工具链。**
+ - 如果提供此参数,需要提供一个产生至少一个混淆密钥(类字节对象)的可迭代对象(例如列表),且混淆密钥的顺序必须正确。
+
+在[这里](https://github.com/nukemiko/libtakiyasha/compare/2.1.0rc2...2.1.0)查看更详细的变更记录。
+
+### 版本 2.0.1 至 2.1.0rc2
+
+在[这里](https://github.com/nukemiko/libtakiyasha/compare/2.0.1...2.1.0rc2)查看更详细的变更记录。
diff --git a/README.md b/README.md
index d80df11..8ea13c3 100644
--- a/README.md
+++ b/README.md
@@ -6,7 +6,7 @@ LibTakiyasha 是一个 Python 音频加密/解密工具库(当然也可用于
**本项目是以学习和技术研究的初衷创建的,修改、再分发时请遵循 [License](LICENSE)。**
-本项目的设计灵感,以及部分解密方案,来源于同类项目:
+本项目的设计灵感,以及部分解密方案,来源于同类项目:
- [Unlock Music Project - CLI Edition](https://git.unlock-music.dev/um/cli)
- [parakeet-rs/libparakeet](https://github.com/parakeet-rs/libparakeet)
@@ -19,37 +19,39 @@ LibTakiyasha 是一个 Python 音频加密/解密工具库(当然也可用于
---
+## 新变化?
+
+请参阅[变更记录](CHANGELOG.md)。
+
+(如果你是在 PyPI 上浏览本项目,它可能会出现在页面的底部,[点按此处跳转](#变更记录)。)
+
## 特性
- 使用纯 Python 代码编写
- - **兼容 Python 3.8 及后续版本**,兼容多种 Python 解释器实现(见下文 [#性能测试](#性能测试))
+ - **兼容 Python 3.8 及后续版本**,兼容多种 Python 解释器实现
+ - 可在[此处](https://github.com/nukemiko/libtakiyasha/wiki/%E6%80%A7%E8%83%BD%E8%A1%A8%E7%8E%B0)查看具体兼容哪些实现
- 易于阅读,方便 Python 爱好者学习
- (包括依赖库)无任何 C/C++ 扩展模块,跨平台性强
+- 支持四种加密文件:
+ - 网易云音乐加密文件 `.ncm`
+ - QQ 音乐加密文件 QMCv1 `.qmc[0-9]`、`.qmcflac`、`.qmcogg`、`.qmcra` 等
+ - QQ 音乐加密文件 QMCv2 `.mflac[0-9]`、`.mgg[0-9]` 等
+ - 酷狗音乐加密文件 KGM/VPR `.kgm`、`.vpr`
+ - 不支持创建新加密文件
+ - 酷我音乐加密文件 `.kwm`
+ - 更多信息,请参见[此处](https://github.com/nukemiko/libtakiyasha/wiki/%E6%94%AF%E6%8C%81%E7%9A%84%E6%A0%BC%E5%BC%8F%E4%BB%A5%E5%8F%8A%E6%89%80%E9%9C%80%E7%9A%84%E5%8F%82%E6%95%B0)
-### 性能测试
+### 性能表现
-由于 Python 语言自身原因,LibTakiyasha 相较于同类项目,运行速度较慢。因此我们使用不同解释器实现,对常用操作做了一些性能测试:
+参见[此处](https://github.com/nukemiko/libtakiyasha/wiki/%E6%80%A7%E8%83%BD%E8%A1%A8%E7%8E%B0)。
-| 操作 | 测试大小 | Python 3.10.9 (CPython) | Python 3.8.12 (Pyston 2.3.5) | Python 3.9.16 (PyPy 7.3.11) |
-| :------------: | :------: | :---------------------: | :--------------------------: | :-------------------------: |
-| NCM 加密 | 36.8 MiB | 4.159s | 2.159s | 1.366s |
-| NCM 解密 | 36.8 MiB | 4.393s | 2.360s | 1.480s |
-| QMCv1 加密 | 36.8 MiB | 3.841s | 2.116s | 1.594s |
-| QMCv1 解密 | 36.8 MiB | 3.813s | 2.331s | 1.406s |
-| QMCv2 掩码加密 | 36.8 MiB | 4.065s | 2.201s | 1.727s |
-| QMCv2 掩码解密 | 36.8 MiB | 3.990s | 2.200s | 1.848s |
-| QMCv2 RC4 加密 | 36.8 MiB | 12.820s | 5.596s | 2.717s |
-| QMCv2 RC4 解密 | 36.8 MiB | 12.588s | 5.913s | 2.552s |
-| KGM 解密 | 64.4 MiB | 49.014s | 22.053s | 8.376s |
-| VPR 解密 | 87.9 MiB | 70.030s | 32.252s | 11.902s |
-
-仅在你对速度有要求时,可以考虑在调用 LibTakiyasha 时使用 PyPy/Pyston 解释器。
+## 安装
-一般情况下,建议使用官方解释器实现(CPython)。
+可用的最新版本:2.1.0,[GitHub 发布页面](https://github.com/nukemiko/libtakiyasha/releases/tag/2.1.0),[PyPI](https://pypi.org/project/libtakiyasha/2.1.0/)
-## 安装
+### 安装方式
-可用的最新版本:2.1.0rc2,可前往[发布页面](https://github.com/nukemiko/libtakiyasha/releases/tag/2.1.0rc2)或 [PyPI](https://pypi.org/project/libtakiyasha/2.1.0rc2/) 下载。
+- 使用 `pip`,通过 PyPI 安装最新版本:`python -m pip install -U libtakiyasha`
如果你要下载其他版本:
@@ -61,8 +63,14 @@ LibTakiyasha 是一个 Python 音频加密/解密工具库(当然也可用于
LibTakiyasha 依赖以下包,均可从 PyPI 获取:
-- [pyaes](https://pypi.org/project/pyaes/)
-- [mutagen](https://pypi.org/project/mutagen/)
+- [pyaes](https://pypi.org/project/pyaes/) - 用于加解密 NCM 文件内嵌的主密钥和元数据
+- [mutagen](https://pypi.org/project/mutagen/) - 用于以 `mutagen` 可接受的形式导出 NCM 文件内嵌的元数据
+
+## 如何使用?
+
+在[这里](https://github.com/nukemiko/libtakiyasha/wiki/%E5%A6%82%E4%BD%95%E4%BD%BF%E7%94%A8%E5%8F%8A%E7%A4%BA%E4%BE%8B)可以找到使用方法和示例。
+
+同时,在[本项目的 Wiki 主页](https://github.com/nukemiko/libtakiyasha/wiki)可以找到其他一些可能对你有用的东西。
## 常见问题
@@ -73,78 +81,3 @@ LibTakiyasha 依赖以下包,均可从 PyPI 获取:
- LibTakiyasha 是一个加解密库,当然需要为用户提供自定义密钥的权利
- 为了保护本项目不受美国数字千年版权法(DMCA)影响,避免仓库被误杀
- 因此,本仓库所有 1.x 及更早版本的提交和发布版本都已删除。
-
-> 如何使用?
-
-当你 `import libtakiyasha` 时,`libtakiyasha` 下有四个子模块 `ncm`、`qmc`、`kgmvpr`、`kwm` 会被自动导入。这些子模块下各有一个加密文件对象类(`qmc` 除外,有两个),和一个 `probe` 开头的探测函数(`qmc` 除外,有三个),用于确认目标文件是否被该模块支持:
-
-| 模块 | 加密文件对象类 | 探测函数 |
-| :-------------------: | :----------------: | :-----------------------------------------------: |
-| `libtakiyasha.ncm` | `NCM` | `probe_ncm()` |
-| `libtakiyasha.qmc` | `QMCv1` 和 `QMCv2` | `probe_qmc()`、`probe_qmcv1()` 和 `probe_qmcv2()` |
-| `libtakiyasha.kgmvpr` | `KGMorVPR` | `probe_kgmvpr()` |
-| `libtakiyasha.kwm` | `KWM` | `probe_kwm()` |
-
-每个探测函数都会返回一个内含两个元素的元组:
-
-- 第一个元素为文件路径或文件对象,取决于探测函数收到的参数;
-- 在探测到受支持的文件时,第二个元素为文件的信息,否则为 `None`
-
-每一个加密文件对象都可以按照普通文件对象对待(拥有 `read()`、`write()`、`seek()` 等方法),也拥有一个 `save()` 方法,以便将该加密文件对象保存到文件。
-
-以 `libtakiyasha.ncm.NCM` 为例,以下是简单的使用示例:
-
-- 要想打开外部加密文件,或新建空加密文件,使用对应加密文件对象类的构造器方法 `open()` 或 `new()`:
-
- ```pycon
- >>> # 打开外部加密文件
- >>> ncmfile_from_open = libtakiyasha.ncm.NCM.open('/path/to/ncmmfile.ncm', core_key=..., tag_key=...)
- >>> ncmfile_from_open
- , source '/path/to/ncmfile.ncm'>
- >>> # 新建空加密文件对象
- >>> ncmfile_new = libtakiyasha.ncm.NCM.new()
- >>> ncmfile_new
-
- >>>
- ```
-
-- 从加密文件中读取和写入数据:
-
- ```pycon
- >>> # 读取 16 字节
- >>> ncmfile_from_open.read(16)
- b'fLaC\\x00\\x00\\x00"\\x12\\x00\\x12\\x00\\x00\\x07)\\x00'
- >>> # 读取一行数据,直到下一个换行符 \n
- >>> ncmfile_from_open.readline()
- b'\xc4B\xf0\x00\xb6\xe14A\x86nz.\x97\xa8\xe3\xbe\x1d\xb7\xb02?u&\x03\x00\t\x90\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x12\x00\x00\x00\x00\x00\x00\x01V\x00\x00\x00\x00\x00\x00\x01\xd6`\x12\x00\x00\x00\x00\x00\x00\x02\xac\x00\x00\x00\x00\x00\x00\x04\x92B\x12\x00\x00\x00\x00\x00\x00\x04\x02\x00\x00\x00\x00\x00\x00\x07\x0f\xb2\x12\x00\x00\x00\x00\x00\x00\x05X\x00\x00\x00\x00\x00\x00\t\xd4\x8c\x12\x00\x00\x00\x00\x00\x00\x06\xae\x00\x00\x00\x00\x00\x00\x0c\xa3\xb6\x12\x00\x00\x00\x00\x00\x00\x08\x04\x00\x00\x00\x00\x00\x00\x0f|\x90\x12\x00\x00\x00\x00\x00\x00\tZ\x00\x00\x00\x00\x00\x00\x12^T\x12\x00\x00\x00\x00\x00\x00\n'
- >>>
- >>> # 使用 for 循环按照固定大小迭代加密文件对象
- >>> ncmfile_from_open.seek(0, 0)
- 0
- >>> for blk in ncmfile_from_open:
- ... print(len(blk))
- ...
- 8192
- 8192
- 8192
- 8192
- 8192
- 8192
- [...]
- >>> # 向加密文件对象写入数据
- >>> ncmfile_from_open.seek(0, 2)
- 36137109
- >>> ncmfile_from_open.write(b'Now I writing something...')
- 26
- >>>
- ```
-
-- 保存加密文件对象到文件:
-
- ```pycon
- >>> # 如果该 NCM 对象不是从文件打开的,还需要 filething 参数
- >>> ncmfile_from_open.save(core_key=..., tag_key=...)
- >>>
- ```
-
-有关每个加密文件的操作示例,请使用 `help()` 查看对应加密文件对象类的文档。
diff --git a/pyproject.toml b/pyproject.toml
index 082dc21..0246bba 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -37,5 +37,5 @@ where = ["src"]
[tool.setuptools.dynamic]
dependencies = { file = ["requirements.txt"] }
-readme = { file = ["README.md"], content-type = 'text/markdown' }
+readme = { file = ["README.md", "CHANGELOG.md"], content-type = 'text/markdown' }
version = { file = "src/libtakiyasha/VERSION" }
diff --git a/src/libtakiyasha/VERSION b/src/libtakiyasha/VERSION
index 51c0fcb..7ec1d6d 100644
--- a/src/libtakiyasha/VERSION
+++ b/src/libtakiyasha/VERSION
@@ -1 +1 @@
-2.1.0rc2
+2.1.0
diff --git a/src/libtakiyasha/kgmvpr/__init__.py b/src/libtakiyasha/kgmvpr/__init__.py
index a8fc589..e581d27 100644
--- a/src/libtakiyasha/kgmvpr/__init__.py
+++ b/src/libtakiyasha/kgmvpr/__init__.py
@@ -3,12 +3,13 @@
import warnings
from pathlib import Path
-from typing import IO, NamedTuple
+from typing import Callable, IO, NamedTuple, overload
from .kgmvprdataciphers import KGMCryptoLegacy
from ..exceptions import CrypterCreatingError
from ..keyutils import make_salt
-from ..prototypes import EncryptedBytesIOSkel
+from ..miscutils import proberfuncfactory
+from ..prototypes import EncryptedBytesIO
from ..typedefs import BytesLike, FilePath, KeyStreamBasedStreamCipherProto, StreamCipherProto
from ..typeutils import isfilepath, tobytes, verify_fileobj
@@ -19,15 +20,38 @@
class KGMorVPRFileInfo(NamedTuple):
cipher_data_offset: int
+ """加密数据在文件中开始的位置。"""
cipher_data_len: int
+ """加密数据在文件中的长度。"""
encryption_version: int
+ """加密方法的版本。目前仅支持 3。"""
core_key_slot: int
+ """加解密数据所需的密钥槽序号。"""
core_key_test_data: bytes
+ """用于验证文件内主密钥合法性的数据。"""
master_key: bytes | None
+ """主密钥。需要配合 ``core_key_slot`` 对应的密钥使用。"""
is_vpr: bool
+ """如果文件使用了 VPR 加密,则为 ``True``。"""
+ opener: Callable[[tuple[FilePath | IO[bytes], KGMorVPRFileInfo] | FilePath | IO[bytes], ...], KGMorVPR]
+ """打开文件的方式,为一个可调对象,其会返回一个加密文件对象。"""
+ opener_kwargs_required: tuple[str, ...]
+ """通过 ``opener`` 打开文件时,所必需的关键字参数的名称。"""
+ opener_kwargs_optional: tuple[str, ...]
+ """通过 ``opener`` 打开文件时,可选的关键字参数的名称。
+
+ 此属性仅储存可能会影响 ``opener`` 行为的可选关键字参数;
+ 对 ``opener`` 行为没有影响的可选关键字参数不会出现在此属性中。
+ """
+@overload
def probe_kgmvpr(filething: FilePath | IO[bytes], /) -> tuple[Path | IO[bytes], KGMorVPRFileInfo | None]:
+ pass
+
+
+@proberfuncfactory
+def probe_kgmvpr(filething, /):
"""探测源文件 ``filething`` 是否为一个 KGM 或 VPR 文件。
返回一个 2 个元素长度的元组:第一个元素为 ``filething``;如果
@@ -44,53 +68,43 @@ def probe_kgmvpr(filething: FilePath | IO[bytes], /) -> tuple[Path | IO[bytes],
一个 2 个元素长度的元组:第一个元素为 filething;如果
filething 是 KGM 或 VPR 文件,那么第二个元素为一个 KGMorVPRFileInfo 对象;否则为 None。
"""
-
- def operation(fd: IO[bytes]) -> KGMorVPRFileInfo | None:
- total_size = fd.seek(0, 2)
- if total_size < 60:
- return
- fd.seek(0, 0)
-
- header = fd.read(16)
- if header == b'\x05\x28\xbc\x96\xe9\xe4\x5a\x43\x91\xaa\xbd\xd0\x7a\xf5\x36\x31':
- is_vpr = True
- elif header == b'\x7c\xd5\x32\xeb\x86\x02\x7f\x4b\xa8\xaf\xa6\x8e\x0f\xff\x99\x14':
- is_vpr = False
- else:
- return
-
- cipher_data_offset = int.from_bytes(fd.read(4), 'little')
- encryption_version = int.from_bytes(fd.read(4), 'little')
- core_key_slot = int.from_bytes(fd.read(4), 'little')
- core_key_test_data = fd.read(16)
- master_key = fd.read(16)
-
- return KGMorVPRFileInfo(
- cipher_data_offset=cipher_data_offset,
- cipher_data_len=total_size - cipher_data_offset,
- encryption_version=encryption_version,
- core_key_slot=core_key_slot,
- core_key_test_data=core_key_test_data,
- master_key=master_key,
- is_vpr=is_vpr
- )
-
- if isfilepath(filething):
- with open(filething, mode='rb') as fileobj:
- return Path(filething), operation(fileobj)
+ opener_kwargs_required = ['table1', 'table2', 'tablev2']
+
+ total_size = filething.seek(0, 2)
+ if total_size < 60:
+ return
+ filething.seek(0, 0)
+
+ header = filething.read(16)
+ if header == b'\x05\x28\xbc\x96\xe9\xe4\x5a\x43\x91\xaa\xbd\xd0\x7a\xf5\x36\x31':
+ is_vpr = True
+ opener_kwargs_required.append('vpr_key')
+ elif header == b'\x7c\xd5\x32\xeb\x86\x02\x7f\x4b\xa8\xaf\xa6\x8e\x0f\xff\x99\x14':
+ is_vpr = False
else:
- fileobj = verify_fileobj(filething, 'binary',
- verify_readable=True,
- verify_seekable=True
- )
- fileobj_origpos = fileobj.tell()
- prs = operation(fileobj)
- fileobj.seek(fileobj_origpos, 0)
-
- return fileobj, prs
-
-
-class KGMorVPR(EncryptedBytesIOSkel):
+ return
+
+ cipher_data_offset = int.from_bytes(filething.read(4), 'little')
+ encryption_version = int.from_bytes(filething.read(4), 'little')
+ core_key_slot = int.from_bytes(filething.read(4), 'little')
+ core_key_test_data = filething.read(16)
+ master_key = filething.read(16)
+
+ return KGMorVPRFileInfo(
+ cipher_data_offset=cipher_data_offset,
+ cipher_data_len=total_size - cipher_data_offset,
+ encryption_version=encryption_version,
+ core_key_slot=core_key_slot,
+ core_key_test_data=core_key_test_data,
+ master_key=master_key,
+ is_vpr=is_vpr,
+ opener=KGMorVPR.open,
+ opener_kwargs_required=tuple(opener_kwargs_required),
+ opener_kwargs_optional=()
+ )
+
+
+class KGMorVPR(EncryptedBytesIO):
"""基于 BytesIO 的 KGM/VPR 透明加密二进制流。
所有读写相关方法都会经过透明加密层处理:
@@ -201,14 +215,6 @@ def open(cls,
tablev2: 解码表 3
vpr_key: 针对 VPR 文件额外所需的密钥
"""
- # if table1 is not None:
- # table1 = tobytes(table1)
- # if table2 is not None:
- # table2 = tobytes(table2)
- # if tablev2 is not None:
- # tablev2 = tobytes(tablev2)
- # if vpr_key is not None:
- # vpr_key = tobytes(vpr_key)
table1 = tobytes(table1)
table2 = tobytes(table2)
tablev2 = tobytes(tablev2)
diff --git a/src/libtakiyasha/kwm/__init__.py b/src/libtakiyasha/kwm/__init__.py
index e6a2b70..8331624 100644
--- a/src/libtakiyasha/kwm/__init__.py
+++ b/src/libtakiyasha/kwm/__init__.py
@@ -4,12 +4,13 @@
import warnings
from math import log10
from pathlib import Path
-from typing import IO, NamedTuple
+from typing import Callable, IO, NamedTuple, overload
from .kwmdataciphers import Mask32, Mask32FromRecipe
from ..exceptions import CrypterCreatingError, CrypterSavingError
from ..keyutils import make_salt
-from ..prototypes import EncryptedBytesIOSkel
+from ..miscutils import proberfuncfactory
+from ..prototypes import EncryptedBytesIO
from ..typedefs import BytesLike, FilePath, IntegerLike, KeyStreamBasedStreamCipherProto, StreamCipherProto
from ..typeutils import isfilepath, tobytes, toint, verify_fileobj
@@ -23,13 +24,34 @@
class KWMFileInfo(NamedTuple):
mask_recipe: bytes
+ """组装主密钥所需的配方。"""
cipher_data_offset: int
+ """加密数据在文件中开始的位置。"""
cipher_data_len: int
+ """加密数据在文件中的长度。"""
bitrate: int | None
+ """加密数据(如果是音频)的比特率。"""
suffix: str
+ """加密数据的格式,可用于未加密形式文件的后缀。"""
+ opener: Callable[[tuple[FilePath | IO[bytes], KWMFileInfo] | FilePath | IO[bytes], ...], KWM]
+ """打开文件的方式,为一个可调对象,其会返回一个加密文件对象。"""
+ opener_kwargs_required: tuple[str, ...]
+ """通过 ``opener`` 打开文件时,所必需的关键字参数的名称。"""
+ opener_kwargs_optional: tuple[str, ...]
+ """通过 ``opener`` 打开文件时,可选的关键字参数的名称。
+
+ 此属性仅储存可能会影响 ``opener`` 行为的可选关键字参数;
+ 对 ``opener`` 行为没有影响的可选关键字参数不会出现在此属性中。
+ """
+@overload
def probe_kwm(filething: FilePath | IO[bytes], /) -> tuple[Path | IO[bytes], KWMFileInfo | None]:
+ pass
+
+
+@proberfuncfactory
+def probe_kwm(filething, /):
"""探测源文件 ``filething`` 是否为一个 KWM 文件。
返回一个 2 个元素长度的元组:第一个元素为 ``filething``;如果
@@ -43,61 +65,48 @@ def probe_kwm(filething: FilePath | IO[bytes], /) -> tuple[Path | IO[bytes], KWM
一个 2 个元素长度的元组:第一个元素为 filething;如果
filething 是 KWM 文件,那么第二个元素为一个 KWMFileInfo 对象;否则为 None。
"""
-
- def operation(fd: IO[bytes]) -> KWMFileInfo | None:
- fd.seek(0, 0)
-
- header_data = fd.read(1024)
- cipher_data_offset = fd.tell()
- cipher_data_len = fd.seek(0, 2) - cipher_data_offset
-
- if not header_data.startswith(b'yeelion-kuwo'):
- return
-
- mask_recipe = header_data[24:32]
-
- bitrate = None
- bitrate_suffix_serialized = header_data[48:56].rstrip(b'\x00')
- bitrate_serialized_len = 0
- for byte in bitrate_suffix_serialized:
- if byte in DIGIT_CHARS:
- bitrate_serialized_len += 1
- if bitrate_serialized_len > 0:
- bitrate = int(bitrate_suffix_serialized[:bitrate_serialized_len]) * 1000
-
- suffix = None
- suffix_serialized = bitrate_suffix_serialized[bitrate_serialized_len:]
- suffix_serialized_len = 0
- for byte in suffix_serialized:
- if byte in ASCII_LETTER_CHARS:
- suffix_serialized_len += 1
- if suffix_serialized_len > 0:
- suffix = suffix_serialized[:suffix_serialized_len].decode('ascii')
-
- return KWMFileInfo(
- mask_recipe=mask_recipe,
- cipher_data_offset=cipher_data_offset,
- cipher_data_len=cipher_data_len,
- bitrate=bitrate,
- suffix=suffix
- )
-
- if isfilepath(filething):
- with open(filething, mode='rb') as fileobj:
- return Path(filething), operation(fileobj)
- else:
- fileobj = verify_fileobj(filething, 'binary',
- verify_readable=True,
- verify_seekable=True
- )
- fileobj_origpos = fileobj.tell()
- prs = operation(fileobj)
- fileobj.seek(fileobj_origpos, 0)
-
- return fileobj, prs
-
-
-class KWM(EncryptedBytesIOSkel):
+ filething.seek(0, 0)
+
+ header_data = filething.read(1024)
+ cipher_data_offset = filething.tell()
+ cipher_data_len = filething.seek(0, 2) - cipher_data_offset
+
+ if not header_data.startswith(b'yeelion-kuwo'):
+ return
+
+ mask_recipe = header_data[24:32]
+
+ bitrate = None
+ bitrate_suffix_serialized = header_data[48:56].rstrip(b'\x00')
+ bitrate_serialized_len = 0
+ for byte in bitrate_suffix_serialized:
+ if byte in DIGIT_CHARS:
+ bitrate_serialized_len += 1
+ if bitrate_serialized_len > 0:
+ bitrate = int(bitrate_suffix_serialized[:bitrate_serialized_len]) * 1000
+
+ suffix = None
+ suffix_serialized = bitrate_suffix_serialized[bitrate_serialized_len:]
+ suffix_serialized_len = 0
+ for byte in suffix_serialized:
+ if byte in ASCII_LETTER_CHARS:
+ suffix_serialized_len += 1
+ if suffix_serialized_len > 0:
+ suffix = suffix_serialized[:suffix_serialized_len].decode('ascii')
+
+ return KWMFileInfo(
+ mask_recipe=mask_recipe,
+ cipher_data_offset=cipher_data_offset,
+ cipher_data_len=cipher_data_len,
+ bitrate=bitrate,
+ suffix=suffix,
+ opener=KWM.open,
+ opener_kwargs_required=('core_key',),
+ opener_kwargs_optional=('master_key',)
+ )
+
+
+class KWM(EncryptedBytesIO):
"""基于 BytesIO 的 KWM 透明加密二进制流。
所有读写相关方法都会经过透明加密层处理:
diff --git a/src/libtakiyasha/miscutils.py b/src/libtakiyasha/miscutils.py
index 047da2a..6716536 100644
--- a/src/libtakiyasha/miscutils.py
+++ b/src/libtakiyasha/miscutils.py
@@ -93,21 +93,36 @@ def bytestrxor(term1: BytesLike, term2: BytesLike, /) -> bytes:
def proberfuncfactory(docstring_from=None):
+ """本装饰器是一个工厂函数,被装饰的函数必须具备以下行为:
+
+ - 至少接受一个参数(位于第一个),并将此参数视为文件对象进行操作
+ - 内含探测文件结构的所有逻辑
+ - 如果探测到受支持的文件,返回一个 NamedTuple 对象,该对象包含探测到的信息;否则返回 `None`
+
+ 具备以上行为的函数在被本装饰器装饰后,将会变成一个文件结构探测函数,具备以下行为:
+
+ - 第一个纯位置参数为文件路径或文件对象,随后的任何参数都会被传递给被装饰的函数
+ - 返回一个 2 元组:
+
+ - 第一个元素是接受的文件路径或文件对象
+ - 第二个元素是探测结果,具体内容见上文
+ """
+
def prober(
operation: Callable[[IO[bytes], ...], _FileInfo | None]
) -> Callable[[FilePath, ...], tuple[FilePath | IO[bytes], _FileInfo | None]]:
@wraps(operation)
- def wrapper(filething: FilePath | IO[bytes], /, **kwargs) -> tuple[FilePath | IO[bytes], _FileInfo | None]:
+ def wrapper(filething: FilePath | IO[bytes], /, *args, **kwargs) -> tuple[FilePath | IO[bytes], _FileInfo | None]:
if isfilepath(filething):
with open(filething, mode='rb') as fileobj:
- return Path(filething), operation(fileobj, **kwargs)
+ return Path(filething), operation(fileobj, *args, **kwargs)
else:
fileobj = verify_fileobj(filething, 'binary',
verify_readable=True,
verify_seekable=True
)
fileobj_origpos = fileobj.tell()
- prs = operation(fileobj, **kwargs)
+ prs = operation(fileobj, *args, **kwargs)
fileobj.seek(fileobj_origpos, 0)
return fileobj, prs
diff --git a/src/libtakiyasha/ncm.py b/src/libtakiyasha/ncm.py
index 89ea676..6ea4441 100644
--- a/src/libtakiyasha/ncm.py
+++ b/src/libtakiyasha/ncm.py
@@ -14,7 +14,7 @@
from .exceptions import CrypterCreatingError
from .keyutils import make_random_ascii_string, make_random_number_string
from .miscutils import BINARIES_ROOTDIR, bytestrxor, proberfuncfactory
-from .prototypes import EncryptedBytesIOSkel
+from .prototypes import EncryptedBytesIO
from .stdciphers import ARC4, StreamedAESWithModeECB
from .typedefs import BytesLike, FilePath
from .typeutils import isfilepath, tobytes, verify_fileobj
@@ -329,7 +329,7 @@ class NCMFileInfo(NamedTuple):
要想打开文件,需要使用 ``NCM.open()``。不可以直接通过 ``NCMFileInfo`` 打开文件。
"""
master_key_encrypted: bytes
- """已加密的主密钥。"""
+ """受加密保护的主密钥。"""
ncm_163key: bytes
"""歌曲在网易云音乐平台上的歌曲信息,为 AES 加密的 JSON 字符串。"""
cipher_ctor: Callable[[...], ARC4]
@@ -345,10 +345,14 @@ class NCMFileInfo(NamedTuple):
"""封面数据在文件中的长度。"""
opener: Callable[[tuple[FilePath | IO[bytes], NCMFileInfo] | FilePath | IO[bytes], ...], NCM]
"""打开文件的方式,为一个可调对象,其会返回一个加密文件对象。"""
- opener_kwargs_required: list[str]
- """通过 `opener` 打开文件时,所必需的关键字参数的名称。"""
- opener_kwargs_optional: list[str]
- """通过 `opener` 打开文件时,可选的关键字参数的名称。"""
+ opener_kwargs_required: tuple[str, ...]
+ """通过 ``opener`` 打开文件时,所必需的关键字参数的名称。"""
+ opener_kwargs_optional: tuple[str, ...]
+ """通过 ``opener`` 打开文件时,可选的关键字参数的名称。
+
+ 此属性仅储存可能会影响 ``opener`` 行为的可选关键字参数;
+ 对 ``opener`` 行为没有影响的可选关键字参数不会出现在此属性中。
+ """
@overload
@@ -413,12 +417,12 @@ def probe_ncm(filething, /):
cover_data_offset=cover_data_offset,
cover_data_len=cover_data_len,
opener=NCM.open,
- opener_kwargs_required=['core_key'],
- opener_kwargs_optional=['tag_key', 'master_key']
+ opener_kwargs_required=('core_key',),
+ opener_kwargs_optional=('tag_key', 'master_key')
)
-class NCM(EncryptedBytesIOSkel):
+class NCM(EncryptedBytesIO):
"""基于 BytesIO 的 NCM 透明加密二进制流。
所有读写相关方法都会经过透明加密层处理:
diff --git a/src/libtakiyasha/prototypes.py b/src/libtakiyasha/prototypes.py
index 1aadb61..2a51deb 100644
--- a/src/libtakiyasha/prototypes.py
+++ b/src/libtakiyasha/prototypes.py
@@ -2,7 +2,6 @@
from __future__ import annotations
from abc import ABCMeta, abstractmethod
-from functools import lru_cache
from pathlib import Path
from random import randint
from typing import Callable, Generator, Iterable, Iterator, Literal, Type
@@ -18,8 +17,7 @@
__all__ = [
'CipherSkel',
'KeyStreamBasedStreamCipherSkel',
- 'CryptLayerWrappedIOSkel',
- 'EncryptedBytesIOSkel'
+ 'EncryptedBytesIO'
]
@@ -139,451 +137,7 @@ def decrypt(self, cipherdata: BytesLike, offset: IntegerLike = 0, /) -> bytes:
return bytes(pd_strm)
-class CryptLayerWrappedIOSkel(io.BytesIO):
- """基于 BytesIO 的透明加密二进制流。
-
- 所有读写相关方法都会经过透明加密层处理:
- 读取时,返回解密后的数据;写入时,向缓冲区写入加密后的数据。
-
- 调用读写相关方法时,附加参数 ``nocryptlayer=True``
- 可绕过透明加密层,访问缓冲区内的原始加密数据。
-
- ``__init__()`` 方法的第一个位置参数 ``cipher`` 必须拥有
- ``encrypt()``、``decrypt()`` 和 ``keystream()`` 方法,且这些方法必须能接受两个位置参数。
- 其中,``encrypt()`` 和 ``decrypt()`` 的第一个位置参数接受字节对象,第二个位置参数接受非负整数;
- ``keystream()`` 的两个位置参数均只接受非负整数。
-
- 如果 ``cipher`` 未实现这些方法中的任何一个,都需要明确抛出 ``NotImplementedError``。
- 未实现的 ``encrypt()``/``decrypt()`` 方法会导致创建的对象不可通过透明加密层读/写;
- 未实现的 ``keystream()`` 方法不会影响对读写的支持,但可能会极大影响读取的速度。
-
- ``__init__()`` 方法的第二个参数 ``initial_bytes``
- 会在转换为 ``bytes`` 后作为对象内置缓冲区的初始数据。
-
- 基于本类的子类可能拥有自己的构造器方法或函数,而不是直接调用
- ``__init__()``;详情请参考该类的文档字符串。
-
- 本类和基于本类的子类,同时兼容 ``IO[bytes]``
- 和 ``typedefs.StreamCipherBasedCryptedIOProto`` 类型。
- """
-
- @property
- def name(self) -> str | None:
- """当前对象来源文件的路径。
-
- 在此类的对象中,此属性总是 ``None``。
-
- 如果是通过子类的构造器方法或函数创建的对象,此属性可能会为来源文件的路径字符串。
- """
- if hasattr(self, '_name'):
- name: str = self._name
- return name
-
- @property
- def encryptable(self) -> bool:
- """此对象的内置透明加密层是否支持加密(内置 ``Cipher`` 对象的 ``encrypt()`` 方法是否可用)。
-
- 这会影响到写入相关方法在参数 ``nocryptlayer=False`` 时是否可用。
- """
- return self._encrypt_available
-
- @property
- def decryptable(self) -> bool:
- """此对象的内置透明加密层是否支持解密(内置 ``Cipher`` 对象的 ``decrypt()`` 方法是否可用)。
-
- 这会影响到读取相关方法在参数 ``nocryptlayer=False`` 时是否可用,以及此对象是否可迭代。
- """
- return self._decrypt_available
-
- @property
- def iter_nocryptlayer(self) -> bool:
- """迭代当前对象时,是否需要绕过透明加密层。默认为 ``False``。"""
- return self._iter_nocryptlayer
-
- @iter_nocryptlayer.setter
- def iter_nocryptlayer(self, value: bool):
- self._iter_nocryptlayer = bool(value)
-
- @property
- def iter_mode(self) -> Literal['block', 'line']:
- """迭代的模式,只能设置为 ``block`` 或 ``line``:
-
- - ``block``(默认值)- 以块为单位进行迭代:每次迭代时,返回等长的“一块”数据。
- - 每次迭代返回的数据长度由 ``self.iter_block_size`` 决定。
- - ``line`` - 以一行为单位进行迭代:每次迭代时,返回的数据都以 ``b'\\n'`` 结尾。
- - 此模式会极大降低迭代的速度,不推荐使用。
-
- 尝试设置为其他值会触发 ``ValueError`` 或 ``TypeError``。
- """
- return self._iter_mode
-
- @iter_mode.setter
- def iter_mode(self, value: Literal['block', 'line']) -> None:
- if value in ('block', 'line'):
- self._iter_mode = value
- elif isinstance(value, str):
- raise ValueError(f"attribute 'iter_mode' must be 'block' or 'line', not '{value}'")
- else:
- raise TypeError(f"attribute 'iter_mode' must be str, not {type(value).__name__}")
-
- @property
- def iter_block_size(self) -> int:
- """以块为单位进行迭代时,每次迭代返回的数据长度。
-
- 如果尝试设置为负数,会触发 ``ValueError``。
-
- 本属性不会影响以一行为单位进行的迭代。
- """
- return self._iter_block_size
-
- @iter_block_size.setter
- def iter_block_size(self, value: IntegerLike) -> None:
- size = toint(value)
- if size < 0:
- raise ValueError("attribute 'iter_block_size' cannot be a negative integer")
- self._iter_block_size = size
-
- def __init__(self, cipher, /, initial_bytes: BytesLike = b'') -> None:
- """基于 BytesIO 的透明加密二进制流。
-
- 所有读写相关方法都会经过透明加密层处理:
- 读取时,返回解密后的数据;写入时,向缓冲区写入加密后的数据。
-
- 调用读写相关方法时,附加参数 ``nocryptlayer=True``
- 可绕过透明加密层,访问缓冲区内的原始加密数据。
-
- ``__init__()`` 方法的第一个位置参数 ``cipher`` 必须拥有
- ``encrypt()``、``decrypt()`` 和 ``keystream()`` 方法,且这些方法必须能接受两个位置参数。
- 其中,``encrypt()`` 和 ``decrypt()`` 的第一个位置参数接受字节对象,第二个位置参数接受非负整数;
- ``keystream()`` 的两个位置参数均只接受非负整数。
-
- 如果 ``cipher`` 未实现这些方法中的任何一个,都需要明确抛出 ``NotImplementedError``。
- 未实现的 ``encrypt()``/``decrypt()`` 方法会导致创建的对象不可通过透明加密层读/写;
- 未实现的 ``keystream()`` 方法不会影响对读写的支持,但可能会极大影响读取的速度。
-
- ``__init__()`` 方法的第二个参数 ``initial_bytes``
- 会在转换为 ``bytes`` 后作为对象内置缓冲区的初始数据。
-
- 基于本类的子类可能拥有自己的构造器方法或函数,而不是直接调用
- ``__init__()``;详情请参考该类的文档字符串。
-
- 本类和基于本类的子类,同时兼容 ``IO[bytes]``
- 和 ``typedefs.StreamCipherBasedCryptedIOProto`` 类型。
- """
- super().__init__(tobytes(initial_bytes))
-
- for method_name in 'keystream', 'encrypt', 'decrypt':
- try:
- method = getattr(cipher, method_name)
- except Exception as exc:
- if hasattr(cipher, method_name):
- raise exc
- else:
- raise TypeError(f"{repr(cipher)} is not a StreamCipher object: "
- f"method '{method_name}' is missing"
- )
- if not callable(method):
- raise TypeError(f"{repr(cipher)} is not a StreamCipher object: "
- f"method '{method_name}' is not callable"
- )
- # 检测 keystream() 是否已实现(可用)
- self._keystream_available = True
- try:
- cipher.keystream(0, 0)
- except NotImplementedError:
- self._keystream_available = False
- # 检测 encrypt() 是否已实现(可用)
- self._encrypt_available = True
- try:
- cipher.encrypt(b'', 0)
- except NotImplementedError:
- self._encrypt_available = False
- # 检测 decrypt() 是否已实现(可用)
- self._decrypt_available = True
- try:
- cipher.decrypt(b'', 0)
- except NotImplementedError:
- self._decrypt_available = False
-
- self._cipher = cipher
- self._iter_nocryptlayer = False
- self._iter_mode: Literal['block', 'line'] = 'block'
- self._iter_block_size: int = io.DEFAULT_BUFFER_SIZE
-
- def __iter__(self):
- return self
-
- def __next__(self) -> bytes:
- if self._iter_mode == 'line':
- if self._iter_nocryptlayer:
- return super().__next__()
- elif not self._decrypt_available:
- raise io.UnsupportedOperation('iter with crypt layer')
- else:
- curpos = self.tell()
-
- target_data = super().getvalue()[curpos:]
- if self._keystream_available:
- result_data = bytes(self._xor_data_keystream(curpos, target_data, eof=b'\n'))
- else:
- result_data = bytearray()
- start = curpos
- while 1:
- stop = start + self._iter_block_size
- target_data_segment = target_data[start:stop]
- if target_data_segment == b'':
- break
- d = self._cipher.decrypt(target_data_segment, start)
- if b'\n' in d:
- result_data.append(d[:d.index(b'\n')])
- break
- else:
- result_data.append(d)
- start += self._iter_block_size
- if result_data == b'':
- raise StopIteration
-
- self.seek(curpos + len(result_data), 0)
-
- return result_data
- elif self._iter_mode == 'block':
- if not self._decrypt_available:
- raise io.UnsupportedOperation('iter with crypt layer')
-
- curpos = self.tell()
-
- target_data = super().getvalue()[curpos:curpos + self._iter_block_size]
- if self._iter_nocryptlayer:
- result_data = target_data
- elif self._keystream_available:
- result_data = bytes(self._xor_data_keystream(curpos, target_data, eof=None))
- else:
- result_data = bytes(self._cipher.decrypt(target_data, curpos))
-
- if result_data == b'':
- raise StopIteration
-
- self.seek(curpos + len(result_data), 0)
-
- return result_data
- elif isinstance(self._iter_mode, str):
- raise ValueError(f"attribute 'iter_mode' must be 'block' or 'line', not '{self._iter_mode}'")
- else:
- raise TypeError(f"attribute 'iter_mode' must be str, not {type(self._iter_mode).__name__}")
-
- @lru_cache
- def __repr__(self) -> str:
- repr_strings = [
- f'<{type(self).__module__}.{type(self).__name__} object',
- f' at {hex(id(self))}',
- f', cipher={repr(self._cipher)}'
- ]
- if self.name is not None:
- repr_strings.append(f", from '{self.name}'")
- repr_strings.append('>')
-
- return ''.join(repr_strings)
-
- def _xor_data_keystream(self,
- offset: int,
- data: bytes,
- eof: bytes = None
- ) -> Generator[int, None, None]:
- if eof is None:
- eoford = None
- else:
- eoford = ord(tobytes(eof))
-
- if data == b'':
- return
-
- keystream = self._cipher.keystream(len(data), offset)
- for databyteord, streambyteord in zip(data, keystream):
- resultbyteord = databyteord ^ streambyteord
- yield resultbyteord
- if resultbyteord == eoford:
- return
-
- def getvalue(self, nocryptlayer: bool = False) -> bytes:
- if nocryptlayer:
- return super().getvalue()
- elif not self._decrypt_available:
- raise io.UnsupportedOperation('getvalue with crypt layer')
- else:
- return self._cipher.decrypt(super().getvalue())
-
- def getbuffer(self, nocryptlayer: bool = False) -> memoryview:
- if nocryptlayer:
- return super().getbuffer()
- else:
- raise NotImplementedError('memoryview with crypt layer is not supported')
-
- def read(self, size: IntegerLike | None = -1, /, nocryptlayer: bool = False) -> bytes:
- if nocryptlayer:
- return super().read(size)
- elif not self._decrypt_available:
- raise io.UnsupportedOperation('read with crypt layer')
- else:
- curpos = self.tell()
- if size is None:
- size = -1
- size = toint(size)
- if size < 0:
- target_data = super().getvalue()[curpos:]
- else:
- target_data = super().getvalue()[curpos:curpos + size]
-
- if self._keystream_available:
- result_data = bytes(self._xor_data_keystream(curpos, target_data))
- else:
- result_data = self._cipher.decrypt(target_data, curpos)
- self.seek(curpos + len(result_data), 0)
-
- return result_data
-
- def readinto(self, buffer: WritableBuffer, /, nocryptlayer: bool = False) -> int:
- if nocryptlayer:
- return super().readinto(buffer)
- elif not self._decrypt_available:
- raise io.UnsupportedOperation('readinto with crypt layer')
- else:
- if isinstance(buffer, memoryview):
- memview = buffer
- else:
- memview = memoryview(buffer)
- memview = memview.cast('B')
-
- data = self.read(len(memview))
- data_len = len(data)
-
- memview[:data_len] = data
-
- return data_len
-
- def read1(self, size: IntegerLike | None = -1, /, nocryptlayer: bool = False) -> bytes:
- if nocryptlayer or self._decrypt_available:
- return self.read(size, nocryptlayer)
-
- raise io.UnsupportedOperation('read1 with crypt layer')
-
- def readinto1(self, buffer: WritableBuffer, /, nocryptlayer: bool = False) -> int:
- if nocryptlayer or self._decrypt_available:
- return self.readinto(buffer, nocryptlayer)
-
- raise io.UnsupportedOperation('readinto1 with crypt layer')
-
- def readblock(self,
- size: IntegerLike | None = -1, /,
- nocryptlayer: bool = False, *,
- block_size: IntegerLike | None = io.DEFAULT_BUFFER_SIZE
- ) -> bytes:
- if not self._decrypt_available:
- raise io.UnsupportedOperation('readblock with crypt layer')
-
- curpos = self.tell()
- if size is None:
- size = -1
- size = toint(size)
- if block_size is None:
- block_size = io.DEFAULT_BUFFER_SIZE
- block_size = toint(block_size)
- if block_size < 0:
- block_size = io.DEFAULT_BUFFER_SIZE
- if size < 0:
- target_data = super().getvalue()[curpos:curpos + block_size]
- else:
- target_data = super().getvalue()[curpos:curpos + min([size, block_size])]
-
- if nocryptlayer:
- result_data = target_data
- elif self._keystream_available:
- result_data = bytes(self._xor_data_keystream(curpos, target_data, eof=None))
- else:
- result_data = self._cipher.decrypt(target_data, curpos)
-
- self.seek(curpos + len(result_data), 0)
-
- return result_data
-
- def readline(self, size: IntegerLike | None = -1, /, nocryptlayer: bool = False) -> bytes:
- if nocryptlayer:
- return super().readline(size)
- else:
- if not self._decrypt_available:
- raise io.UnsupportedOperation('readline with crypt layer')
- curpos = self.tell()
- if size is None:
- size = -1
- size = toint(size)
- if size < 0:
- target_data = super().getvalue()[curpos:]
- else:
- target_data = super().getvalue()[curpos:curpos + size]
-
- if self._keystream_available:
- result_data = bytes(self._xor_data_keystream(curpos, target_data, eof=b'\n'))
- else:
- result_data = bytearray()
- start = curpos
- while 1:
- stop = start + self._iter_block_size
- target_data_segment = target_data[start:stop]
- if target_data_segment == b'':
- break
- d = self._cipher.decrypt(target_data_segment, start)
- if b'\n' in d:
- result_data.append(d[:d.index(b'\n')])
- break
- else:
- result_data.append(d)
- start += self._iter_block_size
- self.seek(curpos + len(result_data), 0)
-
- return bytes(result_data)
-
- def readlines(self, hint: IntegerLike | None = -1, /, nocryptlayer: bool = False) -> list[bytes]:
- if nocryptlayer:
- return super().readlines(hint)
- elif not self._decrypt_available:
- raise io.UnsupportedOperation('readlines with crypt layer')
- else:
- results_lines = []
- if hint is None:
- hint = -1
- hint = toint(hint)
- if hint < 0:
- while 1:
- line = self.readline()
- if line == b'':
- return results_lines
- results_lines.append(line)
- else:
- for _ in range(hint):
- line = self.readline()
- if line == b'':
- return results_lines
- results_lines.append(line)
-
- def write(self, data: BytesLike, /, nocryptlayer: bool = False) -> int:
- if nocryptlayer:
- return super().write(data)
- elif not self._encrypt_available:
- raise io.UnsupportedOperation('write with crypt layer')
- else:
- curpos = self.tell()
- return super().write(self._cipher.encrypt(data, curpos))
-
- def writelines(self, lines: Iterable[BytesLike], /, nocryptlayer: bool = False) -> None:
- if nocryptlayer:
- return super().writelines(lines)
- elif not self._encrypt_available:
- raise io.UnsupportedOperation('writelines with crypt layer')
- else:
- for line in lines:
- super().write(line)
-
-
-class EncryptedBytesIOSkel(io.BytesIO):
+class EncryptedBytesIO(io.BytesIO):
def __repr__(self) -> str:
reprstr_seq = [
f'<{self.__module__}.{self.__class__.__name__} at {hex(id(self))}, '
diff --git a/src/libtakiyasha/qmc/__init__.py b/src/libtakiyasha/qmc/__init__.py
index c7c4597..bb572c7 100644
--- a/src/libtakiyasha/qmc/__init__.py
+++ b/src/libtakiyasha/qmc/__init__.py
@@ -6,13 +6,14 @@
from base64 import b64decode, b64encode
from dataclasses import dataclass
from pathlib import Path
-from typing import Callable, IO, Iterable, Literal, NamedTuple
+from typing import Callable, IO, Iterable, Literal, NamedTuple, overload
from .qmcdataciphers import HardenedRC4, Mask128
from .qmckeyciphers import QMCv2KeyEncryptV1, QMCv2KeyEncryptV2
from ..exceptions import CrypterCreatingError
from ..keyutils import make_random_ascii_string, make_salt
-from ..prototypes import EncryptedBytesIOSkel
+from ..miscutils import proberfuncfactory
+from ..prototypes import EncryptedBytesIO
from ..typedefs import BytesLike, FilePath, IntegerLike
from ..typeutils import isfilepath, tobytes, verify_fileobj
from ..warns import CrypterSavingWarning
@@ -43,9 +44,9 @@ class QMCv2QTag:
可以按照操作数据类(``dataclass``)实例的方式操作本类的实例。
"""
song_id: int = 0
- """此歌曲在 QQ 音乐的 ID。"""
+ """QTag 数据的第二部分,为 QTag 数据所在文件中被加密的歌曲在 QQ 音乐的 ID。"""
unknown: int = 2
- """含义未知,在已知所有样本中都为 2。"""
+ """QTag 数据的第三部分,含义未知,在已知所有样本中都为 2。"""
@classmethod
def load(cls, qtag_serialized: BytesLike, /):
@@ -75,13 +76,16 @@ def dump(self, master_key_encrypted_b64encoded: BytesLike, /) -> bytes:
@dataclass
class QMCv2STag:
- """解析、存储和重建 QMCv2 文件末尾的 STag 数据。"""
+ """解析、存储和重建 QMCv2 文件末尾的 STag 数据。
+
+ 可以按照操作数据类(``dataclass``)实例的方式操作本类的实例。
+ """
song_id: int = 0
- """此歌曲在 QQ 音乐的 ID。"""
+ """STag 数据的第一部分,为 QTag 数据所在文件中被加密的歌曲在 QQ 音乐的 ID。"""
unknown: int = 2
- """含义未知,在已知所有样本中都为 2。"""
+ """STag 数据的第二部分,含义未知,在已知所有样本中都为 2。"""
song_mid: str = '0' * 14
- """此歌曲在 QQ 音乐的媒体 ID(MId)。"""
+ """STag 数据的第三部分,为 QTag 数据所在文件中被加密的歌曲在 QQ 音乐的媒体 ID(MId)。"""
@classmethod
def load(cls, stag_serialized: BytesLike, /):
@@ -99,10 +103,7 @@ def load(cls, stag_serialized: BytesLike, /):
return cls(song_id=song_id, unknown=unknown, song_mid=song_mid)
def dump(self) -> bytes:
- """根据当前 QMCv2STag 对象生成并返回一段 STag 数据。
-
- 可以按照操作数据类(``dataclass``)实例的方式操作本类的实例。
- """
+ """根据当前 QMCv2STag 对象生成并返回一段 STag 数据。"""
return b','.join(
[
str(self.song_id).encode('ascii'),
@@ -115,17 +116,57 @@ def dump(self) -> bytes:
class QMCv1FileInfo(NamedTuple):
"""用于存储 QMCv1 文件的信息。"""
cipher_data_offset: int
+ """加密数据在文件中开始的位置。"""
cipher_data_len: int
+ """加密数据在文件中的长度。"""
+ opener: Callable[[tuple[FilePath | IO[bytes], QMCv1FileInfo] | FilePath | IO[bytes], ...], QMCv1]
+ """打开文件的方式,为一个可调对象,其会返回一个加密文件对象。"""
+ opener_kwargs_required: tuple[str, ...]
+ """通过 ``opener`` 打开文件时,所必需的关键字参数的名称。"""
+ opener_kwargs_optional: tuple[str, ...]
+ """通过 ``opener`` 打开文件时,可选的关键字参数的名称。
+
+ 此属性仅储存可能会影响 ``opener`` 行为的可选关键字参数;
+ 对 ``opener`` 行为没有影响的可选关键字参数不会出现在此属性中。
+ """
class QMCv2FileInfo(NamedTuple):
"""用于存储 QMCv2 文件的信息。"""
cipher_ctor: Callable[[...], HardenedRC4] | Callable[[...], Mask128] | None
+ """Cipher 的构造函数,接受一个必需参数(密钥),返回一个用于解密数据的
+ Cipher 对象。通常就是 Cipher 类自身。
+
+ 对于文件尾部嵌有 STag 数据,或者无任何尾部数据的 QMCv2 文件,由于其没有内嵌密钥,此属性为 ``None``。
+ """
cipher_data_offset: int
+ """加密数据在文件中开始的位置。"""
cipher_data_len: int
- master_key_encrypted: bytes | None
+ """加密数据在文件中的长度。"""
+ master_key_encrypted: bytes
+ """受加密保护的主密钥。
+
+ 对于文件尾部嵌有 STag 数据,或者无任何尾部数据的 QMCv2 文件,由于其没有内嵌密钥,此属性为 ``None``。
+ """
master_key_encryption_ver: int | None
+ """主密钥的加密保护版本,通常为 1 或 2,也仅支持这两个版本。
+
+ 对于文件尾部嵌有 STag 数据,或者无任何尾部数据的 QMCv2 文件,由于其没有内嵌密钥,此属性为 ``None``。
+ """
extra_info: QMCv2QTag | QMCv2STag | None
+ """文件尾部嵌有的除主密钥之外的额外数据,经过解析后的结果。
+ 如果有,为 ``QMCv2QTag`` 或 ``QMCv2STag`` 对象;如果没有,则为 ``None``。
+ """
+ opener: Callable[[tuple[FilePath | IO[bytes], QMCv2FileInfo] | FilePath | IO[bytes], ...], QMCv2]
+ """打开文件的方式,为一个可调对象,其会返回一个加密文件对象。"""
+ opener_kwargs_required: tuple[str, ...]
+ """通过 ``opener`` 打开文件时,所必需的关键字参数的名称。"""
+ opener_kwargs_optional: tuple[str, ...]
+ """通过 ``opener`` 打开文件时,可选的关键字参数的名称。
+
+ 此属性仅储存可能会影响 ``opener`` 行为的可选关键字参数;
+ 对 ``opener`` 行为没有影响的可选关键字参数不会出现在此属性中。
+ """
def _guess_cipher_ctor(master_key: BytesLike, /,
@@ -147,8 +188,16 @@ def _guess_cipher_ctor(master_key: BytesLike, /,
return Mask128
-def probe_qmcv1(filething: FilePath | IO[bytes], /, is_qmcv1: bool = False) -> tuple[Path | IO[bytes], QMCv1FileInfo | None]:
- """探测源文件 ``filething`` 是否为一个 QMCv1 文件。
+@overload
+def probe_qmcv1(filething: FilePath | IO[bytes], /,
+ is_qmcv1: bool = False
+ ) -> tuple[Path | IO[bytes], QMCv1FileInfo | None]:
+ pass
+
+
+@proberfuncfactory
+def probe_qmcv1(filething, /, is_qmcv1=False):
+ r"""探测源文件 ``filething`` 是否为一个 QMCv1 文件。
返回一个 2 个元素长度的元组:
@@ -156,13 +205,14 @@ def probe_qmcv1(filething: FilePath | IO[bytes], /, is_qmcv1: bool = False) -> t
- 如果 ``filething`` 是 QMCv1 文件,那么第二个元素为一个 ``QMCv1FileInfo`` 对象;
- 否则为 ``None``。
- 目前难以通过文件结构识别 QMCv1 文件,因此本方法通过文件扩展名判断是否为 QMCv1 文件。
- 只要文件扩展名匹配下列正则表达式模式(不区分大小写),本方法就会将此文件视为一个 QMCv1 文件:
+ 目前无法在没有密钥的情况下通过文件结构识别 QMCv1 文件,
+ 因此本方法通过文件名模式判断是否为 QMCv1 文件。
+ 只要文件名匹配下列正则表达式模式(不区分大小写),本方法就会将此文件视为一个 QMCv1 文件:
- ``^\\.qmc[a-zA-Z0-9]{1,4}$``
+ ``^.*\.qmc([0-9]|flac|ogg|ra)$``
- 对于不匹配以上正则表达式的文件扩展名(或者无法获取到文件扩展名),如果参数
- ``is_qmcv1=True``,本方法会跳过探测过程,认为此文件是一个 QMCv1 文件,并直接返回结果。
+ 对于不匹配以上正则表达式的文件名,如果参数 ``is_qmcv1=True``,
+ 本方法会跳过探测过程,认为此文件是一个 QMCv1 文件,并直接返回结果。
本方法的返回值可以用于 ``QMCv1.open()`` 的第一个位置参数。
@@ -176,33 +226,27 @@ def probe_qmcv1(filething: FilePath | IO[bytes], /, is_qmcv1: bool = False) -> t
filething 是 QMCv1 文件,那么第二个元素为一个 QMCv1FileInfo 对象;否则为 None。
"""
- def operation(fd: IO[bytes]) -> QMCv1FileInfo | None:
- filename = getattr(fd, 'name', None)
- if filename is None and not is_qmcv1:
- return
- filepath = Path(filename)
- if QMCV1_FILENAME_PATTERN.fullmatch(filepath.name) or is_qmcv1:
- return QMCv1FileInfo(
- cipher_data_offset=0,
- cipher_data_len=fd.seek(0, 2)
- )
+ filename = getattr(filething, 'name', None)
+ if filename is None and not is_qmcv1:
+ return
+ filepath = Path(filename)
+ if QMCV1_FILENAME_PATTERN.fullmatch(filepath.name) or is_qmcv1:
+ return QMCv1FileInfo(
+ cipher_data_offset=0,
+ cipher_data_len=filething.seek(0, 2),
+ opener=QMCv1.open,
+ opener_kwargs_required=('mask',),
+ opener_kwargs_optional=()
+ )
- if isfilepath(filething):
- with open(filething, mode='rb') as fileobj:
- return Path(filething), operation(fileobj)
- else:
- fileobj = verify_fileobj(filething, 'binary',
- verify_readable=True,
- verify_seekable=True
- )
- fileobj_origpos = fileobj.tell()
- prs = operation(fileobj)
- fileobj.seek(fileobj_origpos, 0)
- return fileobj, prs
+@overload
+def probe_qmcv2(filething: FilePath | IO[bytes], /) -> tuple[Path | IO[bytes], QMCv2FileInfo | None]:
+ pass
-def probe_qmcv2(filething: FilePath | IO[bytes], /) -> tuple[Path | IO[bytes], QMCv2FileInfo | None]:
+@proberfuncfactory
+def probe_qmcv2(filething, /):
"""探测源文件 ``filething`` 是否为一个 QMCv2 文件。
返回一个 2 个元素长度的元组:
@@ -221,78 +265,89 @@ def probe_qmcv2(filething: FilePath | IO[bytes], /) -> tuple[Path | IO[bytes], Q
一个 2 个元素长度的元组:第一个元素为 filething;如果
filething 是 QMCv2 文件,那么第二个元素为一个 QMCv2FileInfo 对象;否则为 None。
"""
+ opener_kwargs_required = []
+ opener_kwargs_optional = []
- def operation(fd: IO[bytes]) -> QMCv2FileInfo | None:
- total_size = fd.seek(-4, 2) + 4
- tail_data = fd.read(4)
-
- if tail_data == b'STag':
- fd.seek(-8, 2)
- tag_serialized_len = int.from_bytes(fd.read(4), 'big')
- if tag_serialized_len > (total_size - 8):
- return
- cipher_data_len = fd.seek(-(tag_serialized_len + 8), 2)
- extra_info = QMCv2STag.load(fd.read(tag_serialized_len))
-
- cipher_ctor = None
- master_key_encrypted = None
- master_key_encryption_ver = None
- elif tail_data == b'QTag':
- fd.seek(-8, 2)
- tag_serialized_len = int.from_bytes(fd.read(4), 'big')
- if tag_serialized_len > (total_size - 8):
- return
- cipher_data_len = fd.seek(-(tag_serialized_len + 8), 2)
- master_key_encrypted_b64encoded, extra_info = QMCv2QTag.load(fd.read(tag_serialized_len))
- master_key_encrypted = b64decode(master_key_encrypted_b64encoded)
-
- cipher_ctor = _guess_cipher_ctor(master_key_encrypted)
- master_key_encryption_ver = 1
- else:
- extra_info = None
- master_key_encrypted_b64encoded_len = int.from_bytes(tail_data, 'little')
- if master_key_encrypted_b64encoded_len > total_size - 4:
- return
- cipher_data_len = fd.seek(-(master_key_encrypted_b64encoded_len + 4), 2)
- master_key_encrypted_b64encoded = fd.read(master_key_encrypted_b64encoded_len)
- try:
- master_key_encrypted_b64encoded.decode('ascii')
- except UnicodeDecodeError:
- return
- master_key_encrypted_b64decoded = b64decode(master_key_encrypted_b64encoded)
- if master_key_encrypted_b64decoded.startswith(b'QQMusic EncV2,Key:'):
- master_key_encrypted = master_key_encrypted_b64decoded[18:]
- master_key_encryption_ver = 2
- else:
- master_key_encrypted = master_key_encrypted_b64decoded
- master_key_encryption_ver = 1
- cipher_ctor = _guess_cipher_ctor(master_key_encrypted)
-
- return QMCv2FileInfo(cipher_ctor=cipher_ctor,
- cipher_data_offset=0,
- cipher_data_len=cipher_data_len,
- master_key_encrypted=master_key_encrypted,
- master_key_encryption_ver=master_key_encryption_ver,
- extra_info=extra_info
- )
+ total_size = filething.seek(-4, 2) + 4
+ tail_data = filething.read(4)
+
+ if tail_data == b'STag':
+ filething.seek(-8, 2)
+ tag_serialized_len = int.from_bytes(filething.read(4), 'big')
+ if tag_serialized_len > (total_size - 8):
+ return
+ cipher_data_len = filething.seek(-(tag_serialized_len + 8), 2)
+ extra_info = QMCv2STag.load(filething.read(tag_serialized_len))
+
+ cipher_ctor = None
+ master_key_encrypted = None
+ master_key_encryption_ver = None
+
+ opener_kwargs_required.append('master_key')
+ elif tail_data == b'QTag':
+ opener_kwargs_required.append('core_key')
+ opener_kwargs_optional.append('master_key')
- if isfilepath(filething):
- with open(filething, mode='rb') as fileobj:
- return Path(filething), operation(fileobj)
+ filething.seek(-8, 2)
+ tag_serialized_len = int.from_bytes(filething.read(4), 'big')
+ if tag_serialized_len > (total_size - 8):
+ return
+ cipher_data_len = filething.seek(-(tag_serialized_len + 8), 2)
+ master_key_encrypted_b64encoded, extra_info = QMCv2QTag.load(filething.read(tag_serialized_len))
+ try:
+ master_key_encrypted_b64encoded.decode('ascii')
+ except UnicodeDecodeError:
+ return
+ master_key_encrypted_b64decoded = b64decode(master_key_encrypted_b64encoded)
+ if master_key_encrypted_b64decoded.startswith(b'QQMusic EncV2,Key:'):
+ master_key_encrypted = master_key_encrypted_b64decoded[18:]
+ master_key_encryption_ver = 2
+
+ opener_kwargs_required.append('garble_keys')
+ else:
+ master_key_encrypted = master_key_encrypted_b64decoded
+ master_key_encryption_ver = 1
+ cipher_ctor = _guess_cipher_ctor(master_key_encrypted)
else:
- fileobj = verify_fileobj(filething, 'binary',
- verify_readable=True,
- verify_seekable=True
- )
- fileobj_origpos = fileobj.tell()
- prs = operation(fileobj)
- fileobj.seek(fileobj_origpos, 0)
+ opener_kwargs_required.append('core_key')
+ opener_kwargs_optional.append('master_key')
- return fileobj, prs
+ extra_info = None
+ master_key_encrypted_b64encoded_len = int.from_bytes(tail_data, 'little')
+ if master_key_encrypted_b64encoded_len > total_size - 4:
+ return
+ cipher_data_len = filething.seek(-(master_key_encrypted_b64encoded_len + 4), 2)
+ master_key_encrypted_b64encoded = filething.read(master_key_encrypted_b64encoded_len)
+ try:
+ master_key_encrypted_b64encoded.decode('ascii')
+ except UnicodeDecodeError:
+ return
+ master_key_encrypted_b64decoded = b64decode(master_key_encrypted_b64encoded)
+ if master_key_encrypted_b64decoded.startswith(b'QQMusic EncV2,Key:'):
+ master_key_encrypted = master_key_encrypted_b64decoded[18:]
+ master_key_encryption_ver = 2
+
+ opener_kwargs_required.append('garble_keys')
+ else:
+ master_key_encrypted = master_key_encrypted_b64decoded
+ master_key_encryption_ver = 1
+ cipher_ctor = _guess_cipher_ctor(master_key_encrypted)
+
+ return QMCv2FileInfo(cipher_ctor=cipher_ctor,
+ cipher_data_offset=0,
+ cipher_data_len=cipher_data_len,
+ master_key_encrypted=master_key_encrypted,
+ master_key_encryption_ver=master_key_encryption_ver,
+ extra_info=extra_info,
+ opener=QMCv2.open,
+ opener_kwargs_required=tuple(opener_kwargs_required),
+ opener_kwargs_optional=tuple(opener_kwargs_optional)
+ )
def probe_qmc(
- filething: FilePath | IO[bytes], /
+ filething: FilePath | IO[bytes], /,
+ is_qmcv1: bool = False
) -> tuple[Path | IO[bytes], QMCv1FileInfo | None] | tuple[Path | IO[bytes], QMCv2FileInfo | None]:
"""探测源文件 ``filething`` 是否为一个 QMCv1 或 QMCv2 文件。
@@ -307,6 +362,7 @@ def probe_qmc(
Args:
filething: 源文件的路径或文件对象
+ is_qmcv1: (针对 QMCv1 文件)跳过探测过程,认为源文件是一个 QMCv1 文件;QMCv2 文件不受此参数影响
Returns:
一个 2 个元素长度的元组:第一个元素为 filething;如果
filething 是 QMCv1 文件,那么第二个元素为一个 QMCv1FileInfo 对象;如果
@@ -315,10 +371,10 @@ def probe_qmc(
fthing, fileinfo = probe_qmcv2(filething)
if fileinfo:
return fthing, fileinfo
- return probe_qmcv1(filething)
+ return probe_qmcv1(filething, is_qmcv1=is_qmcv1)
-class QMCv1(EncryptedBytesIOSkel):
+class QMCv1(EncryptedBytesIO):
"""基于 BytesIO 的 QMCv1 透明加密二进制流。
所有读写相关方法都会经过透明加密层处理:
@@ -542,7 +598,7 @@ def new(cls, mask: BytesLike = None):
return cls(Mask128(mask))
-class QMCv2(EncryptedBytesIOSkel):
+class QMCv2(EncryptedBytesIO):
"""基于 BytesIO 的 QMCv2 透明加密二进制流。
所有读写相关方法都会经过透明加密层处理: