-
Notifications
You must be signed in to change notification settings - Fork 30
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
重写了 KGM/VPR 的解密支持,并添加了 KGM/VPR 的加密支持
- Loading branch information
Showing
3 changed files
with
186 additions
and
295 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,144 +1,91 @@ | ||
# -*- coding: utf-8 -*- | ||
from __future__ import annotations | ||
|
||
from typing import IO | ||
from typing import IO, Literal | ||
|
||
from .kgmvprdataciphers import (KGMEncryptionAlgorithm, | ||
KGMEncryptionAlgorithmWithCachedMask, | ||
VPREnceyptionAlgorithm, | ||
VPREncryptionAlgorithmWithCachedMask | ||
) | ||
from .kgmvprdataciphers import KGMorVPREncryptAlgorithm | ||
from ..common import CryptLayerWrappedIOSkel | ||
from ..exceptions import CrypterCreatingError | ||
from ..typedefs import BytesLike, FilePath | ||
from ..typeutils import is_filepath, tobytes, verify_fileobj | ||
from ..typeutils import is_filepath, verify_fileobj | ||
|
||
__all__ = ['KGM', 'VPR'] | ||
__all__ = ['KGMorVPR'] | ||
|
||
|
||
class KGM(CryptLayerWrappedIOSkel): | ||
class KGMorVPR(CryptLayerWrappedIOSkel): | ||
@property | ||
def cipher(self) -> KGMEncryptionAlgorithm | KGMEncryptionAlgorithmWithCachedMask: | ||
def cipher(self) -> KGMorVPREncryptAlgorithm: | ||
return self._cipher | ||
|
||
@property | ||
def master_key(self) -> bytes: | ||
return self.cipher.file_key | ||
return self.cipher.master_key | ||
|
||
@classmethod | ||
def _from_file_operation(cls, | ||
fileobj: IO[bytes], | ||
table1: bytes, | ||
table2: bytes, | ||
tablev2: bytes | ||
): | ||
header = fileobj.read(60) | ||
file_key = header[28:44] + b'\x00' | ||
header_len = int.from_bytes(header[16:20], 'little') | ||
|
||
fileobj.seek(header_len, 0) | ||
|
||
initial_bytes = fileobj.read() | ||
|
||
ret = cls(KGMEncryptionAlgorithm(file_key, table1, table2, tablev2), initial_bytes) | ||
ret._file_key = file_key | ||
|
||
return ret | ||
|
||
@classmethod | ||
def from_file(cls, | ||
kgm_filething: FilePath | IO[bytes], /, | ||
table1: BytesLike, | ||
table2: BytesLike, | ||
tablev2: BytesLike | ||
) -> KGM: | ||
table1 = tobytes(table1) | ||
table2 = tobytes(table2) | ||
tablev2 = tobytes(tablev2) | ||
|
||
if is_filepath(kgm_filething): | ||
with open(kgm_filething, mode='rb') as kgm_fileobj: | ||
instance = cls._from_file_operation(kgm_fileobj, table1, table2, tablev2) | ||
else: | ||
kgm_fileobj = verify_fileobj(kgm_filething, 'binary', | ||
verify_readable=True, | ||
verify_seekable=True | ||
) | ||
instance = cls._from_file_operation(kgm_fileobj, table1, table2, tablev2) | ||
|
||
instance._name = getattr(kgm_fileobj, 'name', None) | ||
|
||
return instance | ||
|
||
def to_file(self, kgm_filething: FilePath | IO[bytes], /, **kwargs) -> None: | ||
raise NotImplementedError | ||
|
||
def new(self) -> KGM: | ||
raise NotImplementedError | ||
|
||
|
||
class VPR(CryptLayerWrappedIOSkel): | ||
@property | ||
def cipher(self) -> VPREnceyptionAlgorithm | VPREnceyptionAlgorithm: | ||
return self._cipher | ||
def vpr_key(self) -> bytes | None: | ||
return self._cipher.vpr_key | ||
|
||
@property | ||
def master_key(self) -> bytes: | ||
return self.cipher.file_key | ||
def subtype(self): | ||
return 'KGM' if self.vpr_key is None else 'VPR' | ||
|
||
@property | ||
def vpr_key(self) -> bytes: | ||
return self.cipher.vpr_key | ||
def __init__(self, cipher: KGMorVPREncryptAlgorithm, /, initial_bytes: BytesLike = b'') -> None: | ||
super().__init__(cipher, initial_bytes) | ||
|
||
@classmethod | ||
def _from_file_operation(cls, | ||
fileobj: IO[bytes], | ||
vpr_key: bytes, | ||
table1: bytes, | ||
table2: bytes, | ||
tablev2: bytes | ||
): | ||
header = fileobj.read(60) | ||
file_key = header[28:44] + b'\x00' | ||
header_len = int.from_bytes(header[16:20], 'little') | ||
|
||
fileobj.seek(header_len, 0) | ||
|
||
initial_bytes = fileobj.read() | ||
|
||
ret = cls(VPREnceyptionAlgorithm(vpr_key, file_key, table1, table2, tablev2), initial_bytes) | ||
ret._file_key = file_key | ||
|
||
return ret | ||
def new(cls) -> KGMorVPR: | ||
raise NotImplementedError('coming soon') | ||
|
||
@classmethod | ||
def from_file(cls, | ||
vpr_filething: FilePath | IO[bytes], /, | ||
vpr_key: BytesLike, | ||
kgm_vpr_filething: FilePath | IO[bytes], /, | ||
table1: BytesLike, | ||
table2: BytesLike, | ||
tablev2: BytesLike | ||
) -> VPR: | ||
vpr_key = tobytes(vpr_key) | ||
table1 = tobytes(table1) | ||
table2 = tobytes(table2) | ||
tablev2 = tobytes(tablev2) | ||
|
||
if is_filepath(vpr_filething): | ||
with open(vpr_filething, mode='rb') as vpr_fileobj: | ||
instance = cls._from_file_operation(vpr_fileobj, vpr_key, table1, table2, tablev2) | ||
tablev2: BytesLike, | ||
vpr_key: BytesLike = None, | ||
) -> KGMorVPR: | ||
def operation(fileobj: IO[bytes]) -> KGMorVPR: | ||
fileobj_endpos = fileobj.seek(0, 2) | ||
fileobj.seek(0, 0) | ||
magicheader = fileobj.read(16) | ||
if magicheader == b'\x05\x28\xbc\x96\xe9\xe4\x5a\x43\x91\xaa\xbd\xd0\x7a\xf5\x36\x31': | ||
subtype: Literal['KGM', 'VPR'] = 'VPR' | ||
if vpr_key is None: | ||
raise ValueError( | ||
f"{repr(kgm_vpr_filething)} is a VPR file, but argument 'vpr_key' is missing" | ||
) | ||
elif magicheader == b'\x7c\xd5\x32\xeb\x86\x02\x7f\x4b\xa8\xaf\xa6\x8e\x0f\xff\x99\x14': | ||
subtype: Literal['KGM', 'VPR'] = 'KGM' | ||
else: | ||
raise ValueError(f"{repr(kgm_vpr_filething)} is not a KGM or VPR file") | ||
header_len = int.from_bytes(fileobj.read(4), 'little') | ||
if header_len > fileobj_endpos: | ||
raise CrypterCreatingError( | ||
f"{repr(kgm_vpr_filething)} is not a valid {subtype} file: " | ||
f"header length ({header_len}) is greater than file size ({fileobj_endpos})" | ||
) | ||
fileobj.seek(28, 0) | ||
master_key = fileobj.read(16) + b'\x00' | ||
fileobj.seek(header_len, 0) | ||
|
||
initial_bytes = fileobj.read() | ||
|
||
cipher = KGMorVPREncryptAlgorithm(table1, table2, tablev2, master_key, vpr_key) | ||
return cls(cipher, initial_bytes) | ||
|
||
if is_filepath(kgm_vpr_filething): | ||
with open(kgm_vpr_filething, mode='rb') as kgm_vpr_fileobj: | ||
instance = operation(kgm_vpr_fileobj) | ||
else: | ||
vpr_fileobj = verify_fileobj(vpr_filething, 'binary', | ||
verify_readable=True, | ||
verify_seekable=True | ||
) | ||
instance = cls._from_file_operation(vpr_fileobj, vpr_key, table1, table2, tablev2) | ||
kgm_vpr_fileobj = verify_fileobj(kgm_vpr_filething, 'binary', | ||
verify_readable=True, | ||
verify_seekable=True | ||
) | ||
instance = operation(kgm_vpr_fileobj) | ||
|
||
instance._name = getattr(vpr_fileobj, 'name', None) | ||
instance._name = getattr(kgm_vpr_fileobj, 'name', None) | ||
|
||
return instance | ||
|
||
def to_file(self, vpr_filething: FilePath | IO[bytes], /, **kwargs) -> None: | ||
raise NotImplementedError | ||
|
||
def new(self) -> VPR: | ||
raise NotImplementedError | ||
def to_file(self, kgm_vpr_filething: FilePath | IO[bytes], /, **kwargs) -> None: | ||
raise NotImplementedError('coming soon') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
# -*- coding: utf-8 -*- | ||
from __future__ import annotations | ||
|
||
from typing import Generator | ||
|
||
from ..typedefs import BytesLike, IntegerLike | ||
from ..typeutils import tobytes, toint_nofloat | ||
|
||
__all__ = ['make_maskstream', 'xor_half_lower_byte'] | ||
|
||
|
||
def xor_half_lower_byte(byte: int) -> int: | ||
return byte ^ ((byte % 16) << 4) | ||
|
||
|
||
def make_maskstream(offset: IntegerLike, | ||
length: IntegerLike, /, | ||
table1: BytesLike, | ||
table2: BytesLike, | ||
tablev2: BytesLike | ||
) -> Generator[int, None, None]: | ||
offset = toint_nofloat(offset) | ||
length = toint_nofloat(length) | ||
if offset < 0: | ||
raise ValueError("first argument 'offset' must be a non-negative integer") | ||
if length < 0: | ||
raise ValueError("second argument 'length' must be a non-negative integer") | ||
table1 = tobytes(table1) | ||
table2 = tobytes(table2) | ||
tablev2 = tobytes(tablev2) | ||
if not (len(table1) == len(table2) == len(tablev2)): | ||
raise ValueError("argument 'table1', 'table2', 'tablev2' must have the same length") | ||
tablesize = len(tablev2) | ||
|
||
for idx in range(offset, offset + length): | ||
idx_urs4 = idx >> 4 | ||
value = 0 | ||
while idx_urs4 >= 17: | ||
value ^= table1[idx_urs4 % tablesize] | ||
idx_urs4 >>= 4 | ||
value ^= table2[idx_urs4 % tablesize] | ||
idx_urs4 >>= 4 | ||
yield value ^ tablev2[idx % tablesize] |
Oops, something went wrong.