diff --git a/src/pykmp/codec.py b/src/pykmp/codec.py index 0cf83da..d98b056 100644 --- a/src/pykmp/codec.py +++ b/src/pykmp/codec.py @@ -136,6 +136,25 @@ class CrcChecksumInvalidError(BaseCodecError): """CRC checksum validation of the data link byte sequence did not pass.""" +class TruncatedStuffingError(BaseCodecError): + """Byte stuffing indicates one more byte at the end of the input""" + + def __str__(self) -> str: # noqa: D105 + return "Byte stuffing indicates one more byte at the end of the input" + + +@attrs.frozen(kw_only=True) +class InvalidStuffingByteError(BaseCodecError): + """Byte stuffing encountered an unrecognized encoded byte""" + + raw_byte: int + + def __str__(self) -> str: # noqa: D105 + return ( + f"Byte stuffing encountered an unrecognized encoded byte {self.raw_byte:02X}" + ) + + @attrs.frozen(kw_only=True) class ApplicationData: """Data class for the data in the application layer of the Kamstrup KMP protocol.""" @@ -180,22 +199,6 @@ class PhysicalCodec: direction: PhysicalDirection = attrs.field() _start_byte: int = attrs.field(init=False) # depends on direction - BYTE_STUFFING_MAP: Final[dict[bytes, bytes]] = { - the_byte.to_bytes(1, "big"): ( - constants.ByteCode.STUFFING.value.to_bytes(1, "big") - + (the_byte ^ 0xFF).to_bytes(1, "big") - ) - for the_byte in ( - # Order matters for having BYTE_STUFFING as the first; itself is used in the - # escaped sequence. - constants.ByteCode.STUFFING.value, - constants.ByteCode.ACK.value, - constants.ByteCode.START_FROM_METER.value, - constants.ByteCode.START_TO_METER.value, - constants.ByteCode.STOP.value, - ) - } - def __attrs_post_init__(self) -> None: """Select start byte value according to configuration (direction).""" self._start_byte = self._direction_to_start_byte(self.direction) @@ -238,10 +241,33 @@ def decode(self, frame: PhysicalBytes) -> DataLinkBytes: ) data_bytes = frame[1:-1] - for unescaped_byte, escaped_bytes in self.BYTE_STUFFING_MAP.items(): - data_bytes = data_bytes.replace(escaped_bytes, unescaped_byte) - - return cast(DataLinkBytes, data_bytes) + res = bytes() + i = 0 + in_stuffing = False + + while i < len(data_bytes): + if in_stuffing: + in_stuffing = False + xored = data_bytes[i] ^ 0xff + if xored not in ( + constants.ByteCode.STUFFING.value, + constants.ByteCode.ACK.value, + constants.ByteCode.START_FROM_METER.value, + constants.ByteCode.START_TO_METER.value, + constants.ByteCode.STOP.value, + ): + raise InvalidStuffingByteError(raw_byte=data_bytes[i]) + res += xored.to_bytes(1, "big") + elif data_bytes[i] == constants.ByteCode.STUFFING.value: + in_stuffing = True + else: + res += data_bytes[i].to_bytes(1, "big") + i += 1 + + if in_stuffing: + raise TruncatedStuffingError + + return cast(DataLinkBytes, res) def encode(self, data_bytes: DataLinkBytes) -> PhysicalBytes: """ @@ -257,12 +283,24 @@ def encode(self, data_bytes: DataLinkBytes) -> PhysicalBytes: ) raw = cast(bytes, data_bytes) - for unescaped_byte, escaped_bytes in self.BYTE_STUFFING_MAP.items(): - raw = raw.replace(unescaped_byte, escaped_bytes) + res = bytes() + i = 0 + while i < len(raw): + if raw[i] in ( + constants.ByteCode.STUFFING.value, + constants.ByteCode.ACK.value, + constants.ByteCode.START_FROM_METER.value, + constants.ByteCode.START_TO_METER.value, + constants.ByteCode.STOP.value, + ): + res += ((constants.ByteCode.STUFFING.value << 8) | (raw[i] ^ 0xff)).to_bytes(2, "big") + else: + res += raw[i].to_bytes(1, "big") + i += 1 frame = ( self._start_byte.to_bytes(1, "big") - + raw + + res + constants.ByteCode.STOP.value.to_bytes(1, "big") ) return cast(PhysicalBytes, frame) diff --git a/tests/test_codec.py b/tests/test_codec.py index a7ae239..86ee36a 100644 --- a/tests/test_codec.py +++ b/tests/test_codec.py @@ -29,6 +29,8 @@ PhysicalCodec, PhysicalDirection, UnsupportedDecimalExponentError, + InvalidStuffingByteError, + TruncatedStuffingError, ) from . import util @@ -51,6 +53,13 @@ DataLinkBytes(b"\x3F\x10\x00\x80\x16\x04\x11\x01\x2A\xF0\x24\x63\x03"), id="Kamstrup doc 6.2.4 GetRegister response (destuffing needed)", ), + pytest.param( + PhysicalDirection.FROM_METER, + # the key part is that the 000B1BE47F 000B0FC8 should not cause recursive decoding + PhysicalBytes(bytes.fromhex('40 3F B8 1BF9 04 43 000B1BE47F 000B0FC8 0d')), + DataLinkBytes(bytes.fromhex('3F B8 06 04 43 000B1B7F 000B0FC8')), + id='GetLogIDPastAbsResponse snippet which was by mistake destuffed too much', + ), ], ) def test_codec_physical_decode( @@ -116,6 +125,20 @@ def test_codec_physical_decode_ack( "Frame is of zero length.", id="empty", ), + pytest.param( + PhysicalDirection.FROM_METER, + PhysicalBytes(bytes.fromhex('40 3F 1BFF 0D')), + InvalidStuffingByteError, + "Byte stuffing encountered an unrecognized encoded byte FF", + id="Unrecongized stuffing value", + ), + pytest.param( + PhysicalDirection.FROM_METER, + PhysicalBytes(bytes.fromhex('40 3F 1B 0D')), + TruncatedStuffingError, + "Byte stuffing indicates one more byte at the end of the input", + id="Truncated stuffing value", + ), ], ) def test_codec_physical_decode_error(