diff --git a/src/libtakiyasha/kgmvpr/__init__.py b/src/libtakiyasha/kgmvpr/__init__.py index 0d4fa79..6c15a7b 100644 --- a/src/libtakiyasha/kgmvpr/__init__.py +++ b/src/libtakiyasha/kgmvpr/__init__.py @@ -1,144 +1,91 @@ # -*- coding: utf-8 -*- from __future__ import annotations -from typing import IO +from typing import IO, Literal -from .kgmvprdataciphers import (KGMEncryptionAlgorithm, - KGMEncryptionAlgorithmWithCachedMask, - VPREnceyptionAlgorithm, - VPREncryptionAlgorithmWithCachedMask - ) +from .kgmvprdataciphers import KGMorVPREncryptAlgorithm from ..common import CryptLayerWrappedIOSkel +from ..exceptions import CrypterCreatingError from ..typedefs import BytesLike, FilePath -from ..typeutils import is_filepath, tobytes, verify_fileobj +from ..typeutils import is_filepath, verify_fileobj -__all__ = ['KGM', 'VPR'] +__all__ = ['KGMorVPR'] -class KGM(CryptLayerWrappedIOSkel): +class KGMorVPR(CryptLayerWrappedIOSkel): @property - def cipher(self) -> KGMEncryptionAlgorithm | KGMEncryptionAlgorithmWithCachedMask: + def cipher(self) -> KGMorVPREncryptAlgorithm: return self._cipher @property def master_key(self) -> bytes: - return self.cipher.file_key + return self.cipher.master_key - @classmethod - def _from_file_operation(cls, - fileobj: IO[bytes], - table1: bytes, - table2: bytes, - tablev2: bytes - ): - header = fileobj.read(60) - file_key = header[28:44] + b'\x00' - header_len = int.from_bytes(header[16:20], 'little') - - fileobj.seek(header_len, 0) - - initial_bytes = fileobj.read() - - ret = cls(KGMEncryptionAlgorithm(file_key, table1, table2, tablev2), initial_bytes) - ret._file_key = file_key - - return ret - - @classmethod - def from_file(cls, - kgm_filething: FilePath | IO[bytes], /, - table1: BytesLike, - table2: BytesLike, - tablev2: BytesLike - ) -> KGM: - table1 = tobytes(table1) - table2 = tobytes(table2) - tablev2 = tobytes(tablev2) - - if is_filepath(kgm_filething): - with open(kgm_filething, mode='rb') as kgm_fileobj: - instance = cls._from_file_operation(kgm_fileobj, table1, table2, tablev2) - else: - kgm_fileobj = verify_fileobj(kgm_filething, 'binary', - verify_readable=True, - verify_seekable=True - ) - instance = cls._from_file_operation(kgm_fileobj, table1, table2, tablev2) - - instance._name = getattr(kgm_fileobj, 'name', None) - - return instance - - def to_file(self, kgm_filething: FilePath | IO[bytes], /, **kwargs) -> None: - raise NotImplementedError - - def new(self) -> KGM: - raise NotImplementedError - - -class VPR(CryptLayerWrappedIOSkel): @property - def cipher(self) -> VPREnceyptionAlgorithm | VPREnceyptionAlgorithm: - return self._cipher + def vpr_key(self) -> bytes | None: + return self._cipher.vpr_key @property - def master_key(self) -> bytes: - return self.cipher.file_key + def subtype(self): + return 'KGM' if self.vpr_key is None else 'VPR' - @property - def vpr_key(self) -> bytes: - return self.cipher.vpr_key + def __init__(self, cipher: KGMorVPREncryptAlgorithm, /, initial_bytes: BytesLike = b'') -> None: + super().__init__(cipher, initial_bytes) @classmethod - def _from_file_operation(cls, - fileobj: IO[bytes], - vpr_key: bytes, - table1: bytes, - table2: bytes, - tablev2: bytes - ): - header = fileobj.read(60) - file_key = header[28:44] + b'\x00' - header_len = int.from_bytes(header[16:20], 'little') - - fileobj.seek(header_len, 0) - - initial_bytes = fileobj.read() - - ret = cls(VPREnceyptionAlgorithm(vpr_key, file_key, table1, table2, tablev2), initial_bytes) - ret._file_key = file_key - - return ret + def new(cls) -> KGMorVPR: + raise NotImplementedError('coming soon') @classmethod def from_file(cls, - vpr_filething: FilePath | IO[bytes], /, - vpr_key: BytesLike, + kgm_vpr_filething: FilePath | IO[bytes], /, table1: BytesLike, table2: BytesLike, - tablev2: BytesLike - ) -> VPR: - vpr_key = tobytes(vpr_key) - table1 = tobytes(table1) - table2 = tobytes(table2) - tablev2 = tobytes(tablev2) - - if is_filepath(vpr_filething): - with open(vpr_filething, mode='rb') as vpr_fileobj: - instance = cls._from_file_operation(vpr_fileobj, vpr_key, table1, table2, tablev2) + tablev2: BytesLike, + vpr_key: BytesLike = None, + ) -> KGMorVPR: + def operation(fileobj: IO[bytes]) -> KGMorVPR: + fileobj_endpos = fileobj.seek(0, 2) + fileobj.seek(0, 0) + magicheader = fileobj.read(16) + if magicheader == b'\x05\x28\xbc\x96\xe9\xe4\x5a\x43\x91\xaa\xbd\xd0\x7a\xf5\x36\x31': + subtype: Literal['KGM', 'VPR'] = 'VPR' + if vpr_key is None: + raise ValueError( + f"{repr(kgm_vpr_filething)} is a VPR file, but argument 'vpr_key' is missing" + ) + elif magicheader == b'\x7c\xd5\x32\xeb\x86\x02\x7f\x4b\xa8\xaf\xa6\x8e\x0f\xff\x99\x14': + subtype: Literal['KGM', 'VPR'] = 'KGM' + else: + raise ValueError(f"{repr(kgm_vpr_filething)} is not a KGM or VPR file") + header_len = int.from_bytes(fileobj.read(4), 'little') + if header_len > fileobj_endpos: + raise CrypterCreatingError( + f"{repr(kgm_vpr_filething)} is not a valid {subtype} file: " + f"header length ({header_len}) is greater than file size ({fileobj_endpos})" + ) + fileobj.seek(28, 0) + master_key = fileobj.read(16) + b'\x00' + fileobj.seek(header_len, 0) + + initial_bytes = fileobj.read() + + cipher = KGMorVPREncryptAlgorithm(table1, table2, tablev2, master_key, vpr_key) + return cls(cipher, initial_bytes) + + if is_filepath(kgm_vpr_filething): + with open(kgm_vpr_filething, mode='rb') as kgm_vpr_fileobj: + instance = operation(kgm_vpr_fileobj) else: - vpr_fileobj = verify_fileobj(vpr_filething, 'binary', - verify_readable=True, - verify_seekable=True - ) - instance = cls._from_file_operation(vpr_fileobj, vpr_key, table1, table2, tablev2) + kgm_vpr_fileobj = verify_fileobj(kgm_vpr_filething, 'binary', + verify_readable=True, + verify_seekable=True + ) + instance = operation(kgm_vpr_fileobj) - instance._name = getattr(vpr_fileobj, 'name', None) + instance._name = getattr(kgm_vpr_fileobj, 'name', None) return instance - def to_file(self, vpr_filething: FilePath | IO[bytes], /, **kwargs) -> None: - raise NotImplementedError - - def new(self) -> VPR: - raise NotImplementedError + def to_file(self, kgm_vpr_filething: FilePath | IO[bytes], /, **kwargs) -> None: + raise NotImplementedError('coming soon') diff --git a/src/libtakiyasha/kgmvpr/kgmmaskutils.py b/src/libtakiyasha/kgmvpr/kgmmaskutils.py new file mode 100644 index 0000000..ecd8c3e --- /dev/null +++ b/src/libtakiyasha/kgmvpr/kgmmaskutils.py @@ -0,0 +1,43 @@ +# -*- coding: utf-8 -*- +from __future__ import annotations + +from typing import Generator + +from ..typedefs import BytesLike, IntegerLike +from ..typeutils import tobytes, toint_nofloat + +__all__ = ['make_maskstream', 'xor_half_lower_byte'] + + +def xor_half_lower_byte(byte: int) -> int: + return byte ^ ((byte % 16) << 4) + + +def make_maskstream(offset: IntegerLike, + length: IntegerLike, /, + table1: BytesLike, + table2: BytesLike, + tablev2: BytesLike + ) -> Generator[int, None, None]: + offset = toint_nofloat(offset) + length = toint_nofloat(length) + if offset < 0: + raise ValueError("first argument 'offset' must be a non-negative integer") + if length < 0: + raise ValueError("second argument 'length' must be a non-negative integer") + table1 = tobytes(table1) + table2 = tobytes(table2) + tablev2 = tobytes(tablev2) + if not (len(table1) == len(table2) == len(tablev2)): + raise ValueError("argument 'table1', 'table2', 'tablev2' must have the same length") + tablesize = len(tablev2) + + for idx in range(offset, offset + length): + idx_urs4 = idx >> 4 + value = 0 + while idx_urs4 >= 17: + value ^= table1[idx_urs4 % tablesize] + idx_urs4 >>= 4 + value ^= table2[idx_urs4 % tablesize] + idx_urs4 >>= 4 + yield value ^ tablev2[idx % tablesize] diff --git a/src/libtakiyasha/kgmvpr/kgmvprdataciphers.py b/src/libtakiyasha/kgmvpr/kgmvprdataciphers.py index 590af73..5b47c41 100644 --- a/src/libtakiyasha/kgmvpr/kgmvprdataciphers.py +++ b/src/libtakiyasha/kgmvpr/kgmvprdataciphers.py @@ -1,226 +1,127 @@ # -*- coding: utf-8 -*- from __future__ import annotations -from typing import Generator +from typing import Generator, TypedDict +from .kgmmaskutils import make_maskstream, xor_half_lower_byte from .. import StreamCipherSkel -from ..miscutils import bytestrxor from ..typedefs import BytesLike, IntegerLike -from ..typeutils import CachedClassInstanceProperty, tobytearray, tobytes, toint_nofloat +from ..typeutils import CachedClassInstanceProperty, tobytes, toint_nofloat -__all__ = [ - 'KGMEncryptionAlgorithm', - 'KGMEncryptionAlgorithmWithCachedMask', - 'VPREnceyptionAlgorithm', - 'VPREncryptionAlgorithmWithCachedMask' -] +__all__ = ['KGMorVPRTables', 'KGMorVPREncryptAlgorithm'] -def xor_lower_helf_byte(b: int) -> int: - return b ^ ((b & 0xf) << 4) +class KGMorVPRTables(TypedDict): + table1: bytes + table2: bytes + tablev2: bytes -class KGMEncryptionAlgorithm(StreamCipherSkel): +class KGMorVPREncryptAlgorithm(StreamCipherSkel): @CachedClassInstanceProperty - def tablesize(self) -> int: - return 16 * 17 + def keysize(self) -> int: + return 17 - @property - def table1(self) -> bytes: - return self._table1 + @CachedClassInstanceProperty + def tablesize(self) -> int: + return 17 * 16 @property - def table2(self) -> bytes: - return self._table2 + def master_key(self) -> bytes: + return self._master_key @property - def tablev2(self) -> bytes: - return self._tablev2 + def vpr_key(self) -> bytes | None: + return self._vpr_key @property - def file_key(self): - return self._file_key + def tables(self) -> KGMorVPRTables: + return { + 'table1' : self._table1, + 'table2' : self._table2, + 'tablev2': self._tablev2 + } def __init__(self, - file_key: BytesLike, table1: BytesLike, table2: BytesLike, - tablev2: BytesLike, / + tablev2: BytesLike, + master_key: BytesLike, /, + vpr_key: BytesLike = None, ) -> None: self._table1 = tobytes(table1) self._table2 = tobytes(table2) self._tablev2 = tobytes(tablev2) - self._file_key = tobytes(file_key) - - if len(self._table1) != self.tablesize: - raise ValueError( - f"invalid length of 'table1': should be {self.tablesize}, not {len(self._table1)}" - ) - if len(self._table2) != self.tablesize: - raise ValueError( - f"invalid length of 'table2': should be {self.tablesize}, not {len(self._table2)}" - ) - if len(self._tablev2) != self.tablesize: - raise ValueError( - f"invalid length of 'tablev2': should be {self.tablesize}, not {len(self._tablev2)}" - ) - @classmethod - def generate_mask(cls, - offset: IntegerLike, - length: IntegerLike, /, - table1: BytesLike, - table2: BytesLike, - tablev2: BytesLike - ) -> Generator[int, None, None]: - tablesize: int = cls.tablesize + for valname, val in [('table1', self._table1), + ('table2', self._table2), + ('tablev2', self._tablev2)]: + if len(val) != self.tablesize: + raise ValueError( + f"invalid length of argument '{valname}': should be {self.tablesize}, not {len(val)}" + ) - offset = toint_nofloat(offset) - length = toint_nofloat(length) - if offset < 0: - raise ValueError("first argument 'offset' must be a non-negative integer") - if length < 0: - raise ValueError("second argument 'length' must be a non-negative integer") - table1 = tobytes(table1) - table2 = tobytes(table2) - tablev2 = tobytes(tablev2) - if len(table1) != tablesize: - raise ValueError( - f"invalid length of 'table1': should be {tablesize}, not {len(table1)}" - ) - if len(table2) != tablesize: - raise ValueError( - f"invalid length of 'table2': should be {tablesize}, not {len(table2)}" - ) - if len(tablev2) != tablesize: + self._master_key = tobytes(master_key) + if len(self._master_key) != self.keysize: raise ValueError( - f"invalid length of 'tablev2': should be {tablesize}, not {len(tablev2)}" + f"invalid length of argument 'master_key': " + f"should be {self.keysize}, not {len(self._master_key)}" ) - - for idx in range(offset, offset + length): - idx_urs4 = idx >> 4 - value = 0 - while idx_urs4 >= 17: - value ^= table1[idx_urs4 % tablesize] - idx_urs4 >>= 4 - value ^= table2[idx_urs4 % tablesize] - idx_urs4 >>= 4 - yield value ^ tablev2[idx % tablesize] + if vpr_key is None: + self._vpr_key = None + else: + self._vpr_key = tobytes(vpr_key) + if len(self._vpr_key) != self.keysize: + raise ValueError( + f"invalid length of argument 'vpr_key': " + f"should be {self.keysize}, not {len(self._vpr_key)}" + ) def keystream(self, offset: IntegerLike, length: IntegerLike, /) -> Generator[int, None, None]: raise NotImplementedError def encrypt(self, plaindata: BytesLike, offset: IntegerLike = 0, /) -> bytes: - raise NotImplementedError - - def decrypt(self, cipherdata: BytesLike, offset: IntegerLike = 0, /) -> bytes: - file_key = self._file_key + master_key = self._master_key + vpr_key: bytes | None = self._vpr_key + keysize = self.keysize offset = toint_nofloat(offset) if offset < 0: - raise ValueError("second argument 'offset' must be a non-negative integer") - cipherdata = tobytearray(cipherdata) - plaindata = bytearray(len(cipherdata)) - - mask_byte_iterator = self.generate_mask(offset, - len(cipherdata), - self._table1, - self._table2, - self._tablev2 - ) - for idx, data_byte_peer in enumerate(zip(cipherdata, mask_byte_iterator)): - cipherdata_byte, mask_byte = data_byte_peer - plaindata[idx] = xor_lower_helf_byte( - cipherdata_byte ^ mask_byte ^ file_key[(idx + offset) % 17] - ) - - return bytes(plaindata) - - -class KGMEncryptionAlgorithmWithCachedMask(StreamCipherSkel): - @property - def file_key(self) -> bytes: - return self._file_key - - def __init__(self, - file_key: BytesLike, - mask_data: BytesLike - ) -> None: - self._file_key = tobytes(file_key) - self._mask_data = tobytes(mask_data) - - def keystream(self, offset: IntegerLike, length: IntegerLike, /) -> Generator[int, None, None]: - raise NotImplementedError - - def encrypt(self, plaindata: BytesLike, offset: IntegerLike = 0, /) -> bytes: - raise NotImplementedError + ValueError("second argument 'offset' must be a non-negative integer") + plaindata = tobytes(plaindata) + cipherdata_buf = bytearray(len(plaindata)) + + maskstream_iterator = make_maskstream( + offset, len(plaindata), self._table1, self._table2, self._tablev2 + ) + for idx, peered_byte in enumerate(zip(plaindata, maskstream_iterator)): + pdb, msb = peered_byte + if vpr_key is not None: + pdb ^= vpr_key[(idx + offset) % keysize] + cdb = xor_half_lower_byte(pdb) ^ msb ^ master_key[(idx + offset) % keysize] + cipherdata_buf[idx] = cdb + + return tobytes(cipherdata_buf) def decrypt(self, cipherdata: BytesLike, offset: IntegerLike = 0, /) -> bytes: - file_key = self._file_key - mask_data = self._mask_data + master_key = self._master_key + vpr_key: bytes | None = self._vpr_key + keysize = self.keysize offset = toint_nofloat(offset) if offset < 0: - raise ValueError("second argument 'offset' must be a non-negative integer") - cipherdata = tobytearray(cipherdata) - plaindata = bytearray(len(cipherdata)) - - for idx, data_byte_peer in enumerate(zip(cipherdata, mask_data)): - cipherdata_byte, mask_byte = data_byte_peer - plaindata[idx] = xor_lower_helf_byte( - cipherdata_byte ^ mask_byte ^ file_key[(idx + offset) % 17] - ) - - return bytes(plaindata) - - -class VPREnceyptionAlgorithm(KGMEncryptionAlgorithm): - @property - def vpr_key(self): - return self._vpr_key - - def __init__(self, - vpr_key: BytesLike, - file_key: BytesLike, - table1: BytesLike, - table2: BytesLike, - tablev2: BytesLike, / - ): - self._vpr_key = tobytes(vpr_key) - super().__init__(file_key, table1, table2, tablev2) - - def keystream(self, offset: IntegerLike, length: IntegerLike, /) -> Generator[int, None, None]: - raise NotImplementedError - - def encrypt(self, plaindata: BytesLike, offset: IntegerLike = 0, /) -> bytes: - raise NotImplementedError - - def decrypt(self, cipherdata: BytesLike, offset: IntegerLike = 0, /) -> bytes: - vpr_key = self._vpr_key - - staged = super().decrypt(cipherdata, offset) - - return bytestrxor(staged, bytes(vpr_key[(_ + offset) % 17] for _ in range(len(cipherdata)))) - - -class VPREncryptionAlgorithmWithCachedMask(KGMEncryptionAlgorithmWithCachedMask): - @property - def vpr_key(self): - return self._vpr_key - - def __init__(self, vpr_key: BytesLike, file_key: BytesLike, mask_data: BytesLike): - self._vpr_key = tobytes(vpr_key) - super().__init__(file_key, mask_data) - - def keystream(self, offset: IntegerLike, length: IntegerLike, /) -> Generator[int, None, None]: - raise NotImplementedError - - def encrypt(self, plaindata: BytesLike, offset: IntegerLike = 0, /) -> bytes: - raise NotImplementedError - - def decrypt(self, cipherdata: BytesLike, offset: IntegerLike = 0, /) -> bytes: - vpr_key = self._vpr_key - - staged = super().decrypt(cipherdata, offset) - - return bytestrxor(staged, bytes(vpr_key[(_ + offset) % 17] for _ in range(len(cipherdata)))) + ValueError("second argument 'offset' must be a non-negative integer") + cipherdata = tobytes(cipherdata) + plaindata_buf = bytearray(len(cipherdata)) + + maskstream_iterator = make_maskstream( + offset, len(cipherdata), self._table1, self._table2, self._tablev2 + ) + for idx, peered_byte in enumerate(zip(cipherdata, maskstream_iterator)): + cdb, msb = peered_byte + pdb = xor_half_lower_byte(cdb ^ msb ^ master_key[(idx + offset) % keysize]) + if vpr_key is not None: + pdb ^= vpr_key[(idx + offset) % keysize] + plaindata_buf[idx] = pdb + + return tobytes(plaindata_buf)