diff --git a/docs/api/codec.md b/docs/api/codec.md index a7fbcb0..722c536 100644 --- a/docs/api/codec.md +++ b/docs/api/codec.md @@ -38,3 +38,5 @@ SPDX-License-Identifier: CC0-1.0 ::: pykmp.codec.InvalidDestinationAddressError ::: pykmp.codec.UnsupportedDecimalExponentError ::: pykmp.codec.CrcChecksumInvalidError +::: pykmp.codec.InvalidStuffingByteError +::: pykmp.codec.TruncatedStuffingError diff --git a/docs/changelog.md b/docs/changelog.md index 68ea4d9..b42edf1 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -9,6 +9,25 @@ SPDX-License-Identifier: CC0-1.0 _In development._ +### Noteworthy changes + +- The physical byte-stuffing codec has been fixed to avoid recursive destuffing during + decode which could cause valid frames to be decoded incorrectly. + + The implementation now uses a single-pass approach for both + [`decode()`][pykmp.codec.PhysicalCodec.decode] and + [`encode()`][pykmp.codec.PhysicalCodec.encode]. This removes a fragile + ordering dependency in encoding and + improves handling of truncated or otherwise unrecognized stuffing + sequences, with corresponding test coverage updates. + + Introduces two new exceptions in the codec module: + [`TruncatedStuffingError`][pykmp.codec.TruncatedStuffingError] and + [`InvalidStuffingByteError`][pykmp.codec.InvalidStuffingByteError]. + + Thanks to [Jan Kundrát](https://github.com/jktjkt) for reporting the issue along + with a proposed fix ([PR #6](https://github.com/gertvdijk/PyKMP/pull/6)). + ### Tooling changes - Project documentation has been migrated from [MkDocs](https://www.mkdocs.org/) with @@ -22,6 +41,10 @@ _In development._ with configuration aligned more closely with the VS Code IDE experience. - Binary-heavy tests and related constants have been reformatted to use `bytes.fromhex()`, improving the readability of byte-oriented inputs and outputs. +- The physical codec happy-path tests now combine + [`decode()`][pykmp.codec.PhysicalCodec.decode] and + [`encode()`][pykmp.codec.PhysicalCodec.encode] round-trip examples into a single + parametrized test, reducing duplication and keeping the example cases symmetrical. - Minor follow-up changes further modernized the developer tooling after `0.0.2`. These under-the-hood updates included refreshing the VS Code workspace for the `uv`-managed environment, refining Pyright-specific suppressions, correcting a small diff --git a/docs/contributing.md b/docs/contributing.md index e8fa01f..ab0935b 100644 --- a/docs/contributing.md +++ b/docs/contributing.md @@ -45,7 +45,7 @@ the owners of this repository before making a change. ```console $ uv run pytest [...] - ==== 116 passed in 0.21s ==== + ==== 118 passed in 0.21s ==== ``` 1. Verify that you can run the `run-all-linters` script. diff --git a/docs/thanks.md b/docs/thanks.md index f6fdf05..73a4ac8 100644 --- a/docs/thanks.md +++ b/docs/thanks.md @@ -19,6 +19,11 @@ the possibilities integrating these MULTICAL® meters (as provided by Dutch util non-cloud smart home. [GoT: Kamstrup Multical 402 stadsverwarmingsmeter RPI3+IR-kop][got-multical402-topic] +# Contributors + +[Jan Kundrát][jankundrát] + [meterlogger-wiki-kmp]: https://github.com/nabovarme/MeterLogger/wiki/Kamstrup-Protocol [github-ronaldvdmeer-multical402]: https://github.com/ronaldvdmeer/multical402-4-domoticz [got-multical402-topic]: https://gathering.tweakers.net/forum/list_messages/1776625 +[jankundrát]: https://github.com/jktjkt diff --git a/src/pykmp/codec.py b/src/pykmp/codec.py index 5040bdf..9c4f480 100644 --- a/src/pykmp/codec.py +++ b/src/pykmp/codec.py @@ -19,7 +19,6 @@ import enum import logging import math -from types import MappingProxyType from typing import TYPE_CHECKING, Any, ClassVar, Final, Literal, NewType, cast import attrs @@ -28,7 +27,7 @@ from . import constants if TYPE_CHECKING: - from collections.abc import Mapping + from collections.abc import Iterator logger = logging.getLogger(__name__) @@ -135,6 +134,26 @@ class CrcChecksumInvalidError(BaseCodecError): """CRC checksum validation of the data link byte sequence did not pass.""" +class TruncatedStuffingError(BaseCodecError): + """Byte stuffing indicates one more byte at the end of the input.""" + + def __str__(self) -> str: # noqa: D105 + return "Byte stuffing indicates one more byte at the end of the input" + + +@attrs.frozen(kw_only=True) +class InvalidStuffingByteError(BaseCodecError): + """Byte stuffing encountered an unrecognized encoded byte.""" + + raw_byte: int + + def __str__(self) -> str: # noqa: D105 + return ( + "Byte stuffing encountered an unrecognized encoded byte " + f"{self.raw_byte:02X}" + ) + + @attrs.frozen(kw_only=True) class ApplicationData: """Data class for the data in the application layer of the Kamstrup KMP protocol.""" @@ -178,21 +197,12 @@ class PhysicalCodec: direction: PhysicalDirection = attrs.field() _start_byte: int = attrs.field(init=False) # depends on direction - - BYTE_STUFFING_MAP: ClassVar[Mapping[bytes, bytes]] = MappingProxyType({ - the_byte.to_bytes(1, "big"): ( - constants.ByteCode.STUFFING.value.to_bytes(1, "big") - + (the_byte ^ 0xFF).to_bytes(1, "big") - ) - for the_byte in ( - # Order matters for having BYTE_STUFFING as the first; itself is used in - # the escaped sequence. - constants.ByteCode.STUFFING.value, - constants.ByteCode.ACK.value, - constants.ByteCode.START_FROM_METER.value, - constants.ByteCode.START_TO_METER.value, - constants.ByteCode.STOP.value, - ) + _stuffable_bytes: ClassVar[frozenset[int]] = frozenset({ + constants.ByteCode.ACK.value, + constants.ByteCode.START_FROM_METER.value, + constants.ByteCode.START_TO_METER.value, + constants.ByteCode.STOP.value, + constants.ByteCode.STUFFING.value, }) def __attrs_post_init__(self) -> None: @@ -208,6 +218,39 @@ def _direction_to_start_byte(cls, direction: PhysicalDirection) -> int: case PhysicalDirection.TO_METER: return constants.ByteCode.START_TO_METER.value + @classmethod + def _iter_destuffed_bytes(cls, stuffed: bytes) -> Iterator[int]: + """Yield destuffed byte values from a stuffed byte sequence.""" + in_stuffing = False + + for raw_byte in stuffed: + if in_stuffing: + in_stuffing = False + if (xored := raw_byte ^ 0xFF) not in cls._stuffable_bytes: + raise InvalidStuffingByteError(raw_byte=raw_byte) + yield xored + continue + + if raw_byte == constants.ByteCode.STUFFING.value: + in_stuffing = True + continue + + yield raw_byte + + if in_stuffing: + raise TruncatedStuffingError + + @classmethod + def _iter_stuffed_bytes(cls, raw: DataLinkBytes) -> Iterator[bytes]: + """Yield stuffed byte chunks from an unescaped byte sequence.""" + for raw_byte in raw: + if raw_byte in cls._stuffable_bytes: + yield ( + (constants.ByteCode.STUFFING.value << 8) + (raw_byte ^ 0xFF) + ).to_bytes(2, "big") + else: + yield raw_byte.to_bytes(1, "big") + def decode(self, frame: PhysicalBytes) -> DataLinkBytes: """ Decode a byte sequence of the physical layer into 'DataLinkBytes'. @@ -237,10 +280,7 @@ def decode(self, frame: PhysicalBytes) -> DataLinkBytes: ) data_bytes = frame[1:-1] - for unescaped_byte, escaped_bytes in self.BYTE_STUFFING_MAP.items(): - data_bytes = data_bytes.replace(escaped_bytes, unescaped_byte) - - return cast(DataLinkBytes, data_bytes) + return cast(DataLinkBytes, bytes(self._iter_destuffed_bytes(data_bytes))) def encode(self, data_bytes: DataLinkBytes) -> PhysicalBytes: """ @@ -255,13 +295,11 @@ def encode(self, data_bytes: DataLinkBytes) -> PhysicalBytes: what="Data link bytes", actual=0, length_expected=None ) - raw = cast(bytes, data_bytes) - for unescaped_byte, escaped_bytes in self.BYTE_STUFFING_MAP.items(): - raw = raw.replace(unescaped_byte, escaped_bytes) + raw_stuffed = b"".join(self._iter_stuffed_bytes(data_bytes)) frame = ( self._start_byte.to_bytes(1, "big") - + raw + + raw_stuffed + constants.ByteCode.STOP.value.to_bytes(1, "big") ) return cast(PhysicalBytes, frame) diff --git a/tests/test_codec.py b/tests/test_codec.py index cbf154e..afc6f21 100644 --- a/tests/test_codec.py +++ b/tests/test_codec.py @@ -24,10 +24,12 @@ DataLinkData, FloatCodec, InvalidDestinationAddressError, + InvalidStuffingByteError, OutOfRangeError, PhysicalBytes, PhysicalCodec, PhysicalDirection, + TruncatedStuffingError, UnsupportedDecimalExponentError, ) @@ -35,33 +37,72 @@ @pytest.mark.parametrize( - ("direction", "frame", "expected"), + ("direction", "frame", "data_link"), [ + pytest.param( + PhysicalDirection.TO_METER, + "80 04 1BF2 00 1BF9 0D", + " 04 0D 00 06 ", + id="Kamstrup doc 3.1 Physical layer example, plus start/stop byte", + ), + pytest.param( + PhysicalDirection.TO_METER, + "80 3F 01 058A 0D", + " 3F 01 058A ", + id="Kamstrup doc 6.2.1 GetType request (no stuffing needed)", + ), pytest.param( PhysicalDirection.FROM_METER, "40 3F 02 01234567 E956 0D", " 3F 02 01234567 E956 ", id="Kamstrup doc 6.2.2 GetSerialNo response (no destuffing needed)", ), + pytest.param( + PhysicalDirection.TO_METER, + "80 3F 10 01 00 1B7F D408 0D", + " 3F 10 01 00 80 D408 ", + id="Kamstrup doc 6.2.4 GetRegister request (stuffing needed)", + ), pytest.param( PhysicalDirection.FROM_METER, "40 3F 10 00 1B7F 16 04 11 012AF024 6303 0D", " 3F 10 00 80 16 04 11 012AF024 6303 ", id="Kamstrup doc 6.2.4 GetRegister response (destuffing needed)", ), + pytest.param( + PhysicalDirection.TO_METER, + "80 1BE4 0D", + " 1B ", + id="stuffing stuff byte", + ), + pytest.param( + PhysicalDirection.FROM_METER, + # Note that '1BE4' destuffs to '1B'. An implementation that would run + # multiple destuffing rounds would wrongly attempt to destuff '1B__' again. + # In this case where it is followed by '7F' ('1B7F') after initial + # destuffing, it would be wrongly destuffed in another round to '80' while + # it should not. See issue/PR gertvdijk/PyKMP#6. + "40 1BE47F 0D", + " 1B 7F ", + id="stuffed stuff byte should not cause recursive destuffing", + ), ], ) -def test_codec_physical_decode( +def test_codec_physical_decode_encode( direction: PhysicalDirection, frame: str, - expected: str, + data_link: str, ensure_no_warnings_logged: util.SimpleContextTest, ) -> None: with ensure_no_warnings_logged(): - returned = PhysicalCodec(direction=direction).decode( + decoded = PhysicalCodec(direction=direction).decode( PhysicalBytes(bytes.fromhex(frame)) ) - assert returned.hex() == bytes.fromhex(expected).hex() + encoded = PhysicalCodec(direction=direction).encode( + DataLinkBytes(bytes.fromhex(data_link)) + ) + assert decoded.hex() == bytes.fromhex(data_link).hex() + assert encoded.hex() == bytes.fromhex(frame).hex() @pytest.mark.parametrize( @@ -116,6 +157,20 @@ def test_codec_physical_decode_ack( "Frame is of zero length.", id="empty", ), + pytest.param( + PhysicalDirection.FROM_METER, + "40 3F 1BFF 0D", + InvalidStuffingByteError, + "Byte stuffing encountered an unrecognized encoded byte FF", + id="Unrecongized stuffing value", + ), + pytest.param( + PhysicalDirection.FROM_METER, + "40 3F 1B 0D", + TruncatedStuffingError, + "Byte stuffing indicates one more byte at the end of the input", + id="Truncated stuffing value", + ), ], ) def test_codec_physical_decode_error( @@ -129,54 +184,6 @@ def test_codec_physical_decode_error( codec.decode(PhysicalBytes(bytes.fromhex(frame))) -@pytest.mark.parametrize( - ("direction", "to_encode", "expected"), - [ - pytest.param( - PhysicalDirection.TO_METER, - " 04 0D 00 06 ", - "80 04 1BF2 00 1BF9 0D", - id="Kamstrup doc 3.1 Physical layer example, plus start/stop byte", - ), - pytest.param( - PhysicalDirection.TO_METER, - " 3F 01 058A ", - "80 3F 01 058A 0D", - id="Kamstrup doc 6.2.1 GetType request (no stuffing needed)", - ), - pytest.param( - PhysicalDirection.TO_METER, - " 3F 10 01 00 80 D408 ", - "80 3F 10 01 00 1B7F D408 0D", - id="Kamstrup doc 6.2.4 GetRegister request (stuffing needed)", - ), - pytest.param( - PhysicalDirection.TO_METER, - " 1B ", - "80 1BE4 0D", - id="stuffing character stuffed", - ), - pytest.param( - PhysicalDirection.FROM_METER, - " 3F ", - "40 3F 0D", - id="encode as meter (direction=FROM_METER) has different start byte", - ), - ], -) -def test_codec_physical_encode( - direction: PhysicalDirection, - to_encode: str, - expected: str, - ensure_no_warnings_logged: util.SimpleContextTest, -) -> None: - with ensure_no_warnings_logged(): - codec = PhysicalCodec(direction=direction) - assert codec.encode(DataLinkBytes(bytes.fromhex(to_encode))).hex() == ( - bytes.fromhex(expected).hex() - ) - - @pytest.mark.parametrize( ("direction"), [