1919import enum
2020import logging
2121import math
22- from types import MappingProxyType
2322from typing import TYPE_CHECKING , Any , ClassVar , Final , Literal , NewType , cast
2423
2524import attrs
2827from . import constants
2928
3029if TYPE_CHECKING :
31- from collections .abc import Mapping
30+ from collections .abc import Iterator
3231
3332logger = logging .getLogger (__name__ )
3433
@@ -135,6 +134,26 @@ class CrcChecksumInvalidError(BaseCodecError):
135134 """CRC checksum validation of the data link byte sequence did not pass."""
136135
137136
137+ class TruncatedStuffingError (BaseCodecError ):
138+ """Byte stuffing indicates one more byte at the end of the input."""
139+
140+ def __str__ (self ) -> str : # noqa: D105
141+ return "Byte stuffing indicates one more byte at the end of the input"
142+
143+
144+ @attrs .frozen (kw_only = True )
145+ class InvalidStuffingByteError (BaseCodecError ):
146+ """Byte stuffing encountered an unrecognized encoded byte."""
147+
148+ raw_byte : int
149+
150+ def __str__ (self ) -> str : # noqa: D105
151+ return (
152+ "Byte stuffing encountered an unrecognized encoded byte "
153+ f"{ self .raw_byte :02X} "
154+ )
155+
156+
138157@attrs .frozen (kw_only = True )
139158class ApplicationData :
140159 """Data class for the data in the application layer of the Kamstrup KMP protocol."""
@@ -178,21 +197,12 @@ class PhysicalCodec:
178197
179198 direction : PhysicalDirection = attrs .field ()
180199 _start_byte : int = attrs .field (init = False ) # depends on direction
181-
182- BYTE_STUFFING_MAP : ClassVar [Mapping [bytes , bytes ]] = MappingProxyType ({
183- the_byte .to_bytes (1 , "big" ): (
184- constants .ByteCode .STUFFING .value .to_bytes (1 , "big" )
185- + (the_byte ^ 0xFF ).to_bytes (1 , "big" )
186- )
187- for the_byte in (
188- # Order matters for having BYTE_STUFFING as the first; itself is used in
189- # the escaped sequence.
190- constants .ByteCode .STUFFING .value ,
191- constants .ByteCode .ACK .value ,
192- constants .ByteCode .START_FROM_METER .value ,
193- constants .ByteCode .START_TO_METER .value ,
194- constants .ByteCode .STOP .value ,
195- )
200+ _stuffable_bytes : ClassVar [frozenset [int ]] = frozenset ({
201+ constants .ByteCode .ACK .value ,
202+ constants .ByteCode .START_FROM_METER .value ,
203+ constants .ByteCode .START_TO_METER .value ,
204+ constants .ByteCode .STOP .value ,
205+ constants .ByteCode .STUFFING .value ,
196206 })
197207
198208 def __attrs_post_init__ (self ) -> None :
@@ -208,6 +218,40 @@ def _direction_to_start_byte(cls, direction: PhysicalDirection) -> int:
208218 case PhysicalDirection .TO_METER :
209219 return constants .ByteCode .START_TO_METER .value
210220
221+ @classmethod
222+ def _iter_destuffed_bytes (cls , stuffed : bytes ) -> Iterator [int ]:
223+ """Yield destuffed byte values from a stuffed byte sequence."""
224+ in_stuffing = False
225+
226+ for raw_byte in stuffed :
227+ if in_stuffing :
228+ in_stuffing = False
229+ xored = raw_byte ^ 0xFF
230+ if xored not in cls ._stuffable_bytes :
231+ raise InvalidStuffingByteError (raw_byte = raw_byte )
232+ yield xored
233+ continue
234+
235+ if raw_byte == constants .ByteCode .STUFFING .value :
236+ in_stuffing = True
237+ continue
238+
239+ yield raw_byte
240+
241+ if in_stuffing :
242+ raise TruncatedStuffingError
243+
244+ @classmethod
245+ def _iter_stuffed_bytes (cls , raw : DataLinkBytes ) -> Iterator [bytes ]:
246+ """Yield stuffed byte chunks from an unescaped byte sequence."""
247+ stuffing_prefix = constants .ByteCode .STUFFING .value .to_bytes (1 , "big" )
248+
249+ for raw_byte in raw :
250+ if raw_byte in cls ._stuffable_bytes :
251+ yield stuffing_prefix + (raw_byte ^ 0xFF ).to_bytes (1 , "big" )
252+ else :
253+ yield raw_byte .to_bytes (1 , "big" )
254+
211255 def decode (self , frame : PhysicalBytes ) -> DataLinkBytes :
212256 """
213257 Decode a byte sequence of the physical layer into 'DataLinkBytes'.
@@ -237,10 +281,7 @@ def decode(self, frame: PhysicalBytes) -> DataLinkBytes:
237281 )
238282
239283 data_bytes = frame [1 :- 1 ]
240- for unescaped_byte , escaped_bytes in self .BYTE_STUFFING_MAP .items ():
241- data_bytes = data_bytes .replace (escaped_bytes , unescaped_byte )
242-
243- return cast (DataLinkBytes , data_bytes )
284+ return cast (DataLinkBytes , bytes (self ._iter_destuffed_bytes (data_bytes )))
244285
245286 def encode (self , data_bytes : DataLinkBytes ) -> PhysicalBytes :
246287 """
@@ -255,13 +296,11 @@ def encode(self, data_bytes: DataLinkBytes) -> PhysicalBytes:
255296 what = "Data link bytes" , actual = 0 , length_expected = None
256297 )
257298
258- raw = cast (bytes , data_bytes )
259- for unescaped_byte , escaped_bytes in self .BYTE_STUFFING_MAP .items ():
260- raw = raw .replace (unescaped_byte , escaped_bytes )
299+ raw_stuffed = b"" .join (self ._iter_stuffed_bytes (data_bytes ))
261300
262301 frame = (
263302 self ._start_byte .to_bytes (1 , "big" )
264- + raw
303+ + raw_stuffed
265304 + constants .ByteCode .STOP .value .to_bytes (1 , "big" )
266305 )
267306 return cast (PhysicalBytes , frame )
0 commit comments