Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/api/codec.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,5 @@ SPDX-License-Identifier: CC0-1.0
::: pykmp.codec.InvalidDestinationAddressError
::: pykmp.codec.UnsupportedDecimalExponentError
::: pykmp.codec.CrcChecksumInvalidError
::: pykmp.codec.InvalidStuffingByteError
::: pykmp.codec.TruncatedStuffingError
23 changes: 23 additions & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,25 @@ SPDX-License-Identifier: CC0-1.0

_In development._

### Noteworthy changes

- The physical byte-stuffing codec has been fixed to avoid recursive destuffing during
decode which could cause valid frames to be decoded incorrectly.

The implementation now uses a single-pass approach for both
[`decode()`][pykmp.codec.PhysicalCodec.decode] and
[`encode()`][pykmp.codec.PhysicalCodec.encode]. This removes a fragile
ordering dependency in encoding and
improves handling of truncated or otherwise unrecognized stuffing
sequences, with corresponding test coverage updates.

Introduces two new exceptions in the codec module:
[`TruncatedStuffingError`][pykmp.codec.TruncatedStuffingError] and
[`InvalidStuffingByteError`][pykmp.codec.InvalidStuffingByteError].

Thanks to [Jan Kundrát](https://github.com/jktjkt) for reporting the issue along
with a proposed fix ([PR #6](https://github.com/gertvdijk/PyKMP/pull/6)).

### Tooling changes

- Project documentation has been migrated from [MkDocs](https://www.mkdocs.org/) with
Expand All @@ -22,6 +41,10 @@ _In development._
with configuration aligned more closely with the VS Code IDE experience.
- Binary-heavy tests and related constants have been reformatted to use
`bytes.fromhex()`, improving the readability of byte-oriented inputs and outputs.
- The physical codec happy-path tests now combine
[`decode()`][pykmp.codec.PhysicalCodec.decode] and
[`encode()`][pykmp.codec.PhysicalCodec.encode] round-trip examples into a single
parametrized test, reducing duplication and keeping the example cases symmetrical.
- Minor follow-up changes further modernized the developer tooling after `0.0.2`.
These under-the-hood updates included refreshing the VS Code workspace for the
`uv`-managed environment, refining Pyright-specific suppressions, correcting a small
Expand Down
2 changes: 1 addition & 1 deletion docs/contributing.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ the owners of this repository before making a change.
```console
$ uv run pytest
[...]
==== 116 passed in 0.21s ====
==== 118 passed in 0.21s ====
```

1. Verify that you can run the `run-all-linters` script.
Expand Down
5 changes: 5 additions & 0 deletions docs/thanks.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ the possibilities integrating these MULTICAL® meters (as provided by Dutch util
non-cloud smart home.
[GoT: Kamstrup Multical 402 stadsverwarmingsmeter RPI3+IR-kop][got-multical402-topic]

# Contributors

[Jan Kundrát][jankundrát]

[meterlogger-wiki-kmp]: https://github.com/nabovarme/MeterLogger/wiki/Kamstrup-Protocol
[github-ronaldvdmeer-multical402]: https://github.com/ronaldvdmeer/multical402-4-domoticz
[got-multical402-topic]: https://gathering.tweakers.net/forum/list_messages/1776625
[jankundrát]: https://github.com/jktjkt
88 changes: 63 additions & 25 deletions src/pykmp/codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import enum
import logging
import math
from types import MappingProxyType
from typing import TYPE_CHECKING, Any, ClassVar, Final, Literal, NewType, cast

import attrs
Expand All @@ -28,7 +27,7 @@
from . import constants

if TYPE_CHECKING:
from collections.abc import Mapping
from collections.abc import Iterator

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -135,6 +134,26 @@ class CrcChecksumInvalidError(BaseCodecError):
"""CRC checksum validation of the data link byte sequence did not pass."""


class TruncatedStuffingError(BaseCodecError):
"""Byte stuffing indicates one more byte at the end of the input."""

def __str__(self) -> str: # noqa: D105
return "Byte stuffing indicates one more byte at the end of the input"


@attrs.frozen(kw_only=True)
class InvalidStuffingByteError(BaseCodecError):
"""Byte stuffing encountered an unrecognized encoded byte."""

raw_byte: int

def __str__(self) -> str: # noqa: D105
return (
"Byte stuffing encountered an unrecognized encoded byte "
f"{self.raw_byte:02X}"
)


@attrs.frozen(kw_only=True)
class ApplicationData:
"""Data class for the data in the application layer of the Kamstrup KMP protocol."""
Expand Down Expand Up @@ -178,21 +197,12 @@ class PhysicalCodec:

direction: PhysicalDirection = attrs.field()
_start_byte: int = attrs.field(init=False) # depends on direction

BYTE_STUFFING_MAP: ClassVar[Mapping[bytes, bytes]] = MappingProxyType({
the_byte.to_bytes(1, "big"): (
constants.ByteCode.STUFFING.value.to_bytes(1, "big")
+ (the_byte ^ 0xFF).to_bytes(1, "big")
)
for the_byte in (
# Order matters for having BYTE_STUFFING as the first; itself is used in
# the escaped sequence.
constants.ByteCode.STUFFING.value,
constants.ByteCode.ACK.value,
constants.ByteCode.START_FROM_METER.value,
constants.ByteCode.START_TO_METER.value,
constants.ByteCode.STOP.value,
)
_stuffable_bytes: ClassVar[frozenset[int]] = frozenset({
constants.ByteCode.ACK.value,
constants.ByteCode.START_FROM_METER.value,
constants.ByteCode.START_TO_METER.value,
constants.ByteCode.STOP.value,
constants.ByteCode.STUFFING.value,
})

def __attrs_post_init__(self) -> None:
Expand All @@ -208,6 +218,39 @@ def _direction_to_start_byte(cls, direction: PhysicalDirection) -> int:
case PhysicalDirection.TO_METER:
return constants.ByteCode.START_TO_METER.value

@classmethod
def _iter_destuffed_bytes(cls, stuffed: bytes) -> Iterator[int]:
"""Yield destuffed byte values from a stuffed byte sequence."""
in_stuffing = False

for raw_byte in stuffed:
if in_stuffing:
in_stuffing = False
if (xored := raw_byte ^ 0xFF) not in cls._stuffable_bytes:
raise InvalidStuffingByteError(raw_byte=raw_byte)
yield xored
continue

if raw_byte == constants.ByteCode.STUFFING.value:
in_stuffing = True
continue

yield raw_byte

if in_stuffing:
raise TruncatedStuffingError

@classmethod
def _iter_stuffed_bytes(cls, raw: DataLinkBytes) -> Iterator[bytes]:
"""Yield stuffed byte chunks from an unescaped byte sequence."""
for raw_byte in raw:
if raw_byte in cls._stuffable_bytes:
yield (
(constants.ByteCode.STUFFING.value << 8) + (raw_byte ^ 0xFF)
).to_bytes(2, "big")
else:
yield raw_byte.to_bytes(1, "big")

def decode(self, frame: PhysicalBytes) -> DataLinkBytes:
"""
Decode a byte sequence of the physical layer into 'DataLinkBytes'.
Expand Down Expand Up @@ -237,10 +280,7 @@ def decode(self, frame: PhysicalBytes) -> DataLinkBytes:
)

data_bytes = frame[1:-1]
for unescaped_byte, escaped_bytes in self.BYTE_STUFFING_MAP.items():
data_bytes = data_bytes.replace(escaped_bytes, unescaped_byte)

return cast(DataLinkBytes, data_bytes)
return cast(DataLinkBytes, bytes(self._iter_destuffed_bytes(data_bytes)))

def encode(self, data_bytes: DataLinkBytes) -> PhysicalBytes:
"""
Expand All @@ -255,13 +295,11 @@ def encode(self, data_bytes: DataLinkBytes) -> PhysicalBytes:
what="Data link bytes", actual=0, length_expected=None
)

raw = cast(bytes, data_bytes)
for unescaped_byte, escaped_bytes in self.BYTE_STUFFING_MAP.items():
raw = raw.replace(unescaped_byte, escaped_bytes)
raw_stuffed = b"".join(self._iter_stuffed_bytes(data_bytes))

frame = (
self._start_byte.to_bytes(1, "big")
+ raw
+ raw_stuffed
+ constants.ByteCode.STOP.value.to_bytes(1, "big")
)
return cast(PhysicalBytes, frame)
Expand Down
113 changes: 60 additions & 53 deletions tests/test_codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,44 +24,85 @@
DataLinkData,
FloatCodec,
InvalidDestinationAddressError,
InvalidStuffingByteError,
OutOfRangeError,
PhysicalBytes,
PhysicalCodec,
PhysicalDirection,
TruncatedStuffingError,
UnsupportedDecimalExponentError,
)

from . import util


@pytest.mark.parametrize(
("direction", "frame", "expected"),
("direction", "frame", "data_link"),
[
pytest.param(
PhysicalDirection.TO_METER,
"80 04 1BF2 00 1BF9 0D",
" 04 0D 00 06 ",
id="Kamstrup doc 3.1 Physical layer example, plus start/stop byte",
),
pytest.param(
PhysicalDirection.TO_METER,
"80 3F 01 058A 0D",
" 3F 01 058A ",
id="Kamstrup doc 6.2.1 GetType request (no stuffing needed)",
),
pytest.param(
PhysicalDirection.FROM_METER,
"40 3F 02 01234567 E956 0D",
" 3F 02 01234567 E956 ",
id="Kamstrup doc 6.2.2 GetSerialNo response (no destuffing needed)",
),
pytest.param(
PhysicalDirection.TO_METER,
"80 3F 10 01 00 1B7F D408 0D",
" 3F 10 01 00 80 D408 ",
id="Kamstrup doc 6.2.4 GetRegister request (stuffing needed)",
),
pytest.param(
PhysicalDirection.FROM_METER,
"40 3F 10 00 1B7F 16 04 11 012AF024 6303 0D",
" 3F 10 00 80 16 04 11 012AF024 6303 ",
id="Kamstrup doc 6.2.4 GetRegister response (destuffing needed)",
),
pytest.param(
PhysicalDirection.TO_METER,
"80 1BE4 0D",
" 1B ",
id="stuffing stuff byte",
),
pytest.param(
PhysicalDirection.FROM_METER,
# Note that '1BE4' destuffs to '1B'. An implementation that would run
# multiple destuffing rounds would wrongly attempt to destuff '1B__' again.
# In this case where it is followed by '7F' ('1B7F') after initial
# destuffing, it would be wrongly destuffed in another round to '80' while
# it should not. See issue/PR gertvdijk/PyKMP#6.
"40 1BE47F 0D",
" 1B 7F ",
id="stuffed stuff byte should not cause recursive destuffing",
Comment thread
gertvdijk marked this conversation as resolved.
),
],
)
def test_codec_physical_decode(
def test_codec_physical_decode_encode(
direction: PhysicalDirection,
frame: str,
expected: str,
data_link: str,
ensure_no_warnings_logged: util.SimpleContextTest,
) -> None:
with ensure_no_warnings_logged():
returned = PhysicalCodec(direction=direction).decode(
decoded = PhysicalCodec(direction=direction).decode(
PhysicalBytes(bytes.fromhex(frame))
)
assert returned.hex() == bytes.fromhex(expected).hex()
encoded = PhysicalCodec(direction=direction).encode(
DataLinkBytes(bytes.fromhex(data_link))
)
assert decoded.hex() == bytes.fromhex(data_link).hex()
assert encoded.hex() == bytes.fromhex(frame).hex()


@pytest.mark.parametrize(
Expand Down Expand Up @@ -116,6 +157,20 @@ def test_codec_physical_decode_ack(
"Frame is of zero length.",
id="empty",
),
pytest.param(
PhysicalDirection.FROM_METER,
"40 3F 1BFF 0D",
InvalidStuffingByteError,
"Byte stuffing encountered an unrecognized encoded byte FF",
id="Unrecongized stuffing value",
),
pytest.param(
PhysicalDirection.FROM_METER,
"40 3F 1B 0D",
TruncatedStuffingError,
"Byte stuffing indicates one more byte at the end of the input",
id="Truncated stuffing value",
),
],
)
def test_codec_physical_decode_error(
Expand All @@ -129,54 +184,6 @@ def test_codec_physical_decode_error(
codec.decode(PhysicalBytes(bytes.fromhex(frame)))


@pytest.mark.parametrize(
("direction", "to_encode", "expected"),
[
pytest.param(
PhysicalDirection.TO_METER,
" 04 0D 00 06 ",
"80 04 1BF2 00 1BF9 0D",
id="Kamstrup doc 3.1 Physical layer example, plus start/stop byte",
),
pytest.param(
PhysicalDirection.TO_METER,
" 3F 01 058A ",
"80 3F 01 058A 0D",
id="Kamstrup doc 6.2.1 GetType request (no stuffing needed)",
),
pytest.param(
PhysicalDirection.TO_METER,
" 3F 10 01 00 80 D408 ",
"80 3F 10 01 00 1B7F D408 0D",
id="Kamstrup doc 6.2.4 GetRegister request (stuffing needed)",
),
pytest.param(
PhysicalDirection.TO_METER,
" 1B ",
"80 1BE4 0D",
id="stuffing character stuffed",
),
pytest.param(
PhysicalDirection.FROM_METER,
" 3F ",
"40 3F 0D",
id="encode as meter (direction=FROM_METER) has different start byte",
),
],
)
def test_codec_physical_encode(
direction: PhysicalDirection,
to_encode: str,
expected: str,
ensure_no_warnings_logged: util.SimpleContextTest,
) -> None:
with ensure_no_warnings_logged():
codec = PhysicalCodec(direction=direction)
assert codec.encode(DataLinkBytes(bytes.fromhex(to_encode))).hex() == (
bytes.fromhex(expected).hex()
)


@pytest.mark.parametrize(
("direction"),
[
Expand Down