-
-
Notifications
You must be signed in to change notification settings - Fork 768
Add hwIo.ble module with unit tests #19838
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 1 commit
96f825a
4411273
c3345cf
f144b86
4419d4e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,179 @@ | ||||||||||
| # A part of NonVisual Desktop Access (NVDA) | ||||||||||
| # This file is covered by the GNU General Public License. | ||||||||||
| # See the file COPYING for more details. | ||||||||||
| # Copyright (C) 2025-2026 NV Access Limited, Dot Incorporated, Bram Duvigneau | ||||||||||
|
|
||||||||||
| """Unit tests for the dotPad braille display driver. | ||||||||||
|
|
||||||||||
| These tests cover the buffered receive logic that supports both serial (byte-at-a-time) | ||||||||||
| and BLE (complete packet) receive modes. The implementation is part of #19122. | ||||||||||
| """ | ||||||||||
|
|
||||||||||
| import unittest | ||||||||||
| from unittest.mock import MagicMock | ||||||||||
| import struct | ||||||||||
| import functools | ||||||||||
| import operator | ||||||||||
|
|
||||||||||
|
|
||||||||||
| @unittest.skip("Requires buffered receive implementation from #19122") | ||||||||||
| class TestDotPadBufferedReceive(unittest.TestCase): | ||||||||||
| """Tests for the buffered receive logic in the DotPad driver.""" | ||||||||||
|
|
||||||||||
| def setUp(self): | ||||||||||
| """Set up test fixtures.""" | ||||||||||
| from brailleDisplayDrivers.dotPad.driver import BrailleDisplayDriver | ||||||||||
| from brailleDisplayDrivers.dotPad.defs import ( | ||||||||||
| DP_Command, | ||||||||||
| DP_PacketSyncByte, | ||||||||||
| DP_CHECKSUM_BASE, | ||||||||||
| ) | ||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do you plan on moving the imports out in #19122 ? |
||||||||||
|
|
||||||||||
| self.BrailleDisplayDriver = BrailleDisplayDriver | ||||||||||
| self.DP_Command = DP_Command | ||||||||||
| self.DP_PacketSyncByte = DP_PacketSyncByte | ||||||||||
| self.DP_CHECKSUM_BASE = DP_CHECKSUM_BASE | ||||||||||
|
|
||||||||||
| # Create a minimal driver instance for testing receive logic | ||||||||||
| self.driver = MagicMock(spec=BrailleDisplayDriver) | ||||||||||
| self.driver._receiveBuffer = bytearray() | ||||||||||
| self.driver.MAX_PACKET_SIZE = 512 | ||||||||||
| self.driver._lastResponse = {} | ||||||||||
|
|
||||||||||
| # Track processed packets | ||||||||||
| self.processedPackets = [] | ||||||||||
|
|
||||||||||
| def mockProcessPacket(packetBody): | ||||||||||
| self.processedPackets.append(bytes(packetBody)) | ||||||||||
|
|
||||||||||
| self.driver._processPacket = mockProcessPacket | ||||||||||
|
|
||||||||||
| # Bind the actual _onReceive method | ||||||||||
| self.driver._onReceive = BrailleDisplayDriver._onReceive.__get__(self.driver, type(self.driver)) | ||||||||||
|
|
||||||||||
| def _createPacket(self, dest=0, cmd=0x0101, seqNum=0, data=b""): | ||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please add type hints |
||||||||||
| """Helper to create a valid DotPad packet. | ||||||||||
|
|
||||||||||
| :param dest: Destination address | ||||||||||
| :param cmd: Command code | ||||||||||
| :param seqNum: Sequence number | ||||||||||
| :param data: Packet data payload | ||||||||||
| :return: Complete packet as bytes | ||||||||||
| """ | ||||||||||
| packetBody = bytearray([dest]) | ||||||||||
| packetBody.extend(struct.pack(">H", cmd)) | ||||||||||
| packetBody.append(seqNum) | ||||||||||
| packetBody.extend(data) | ||||||||||
|
|
||||||||||
| checksum = functools.reduce(operator.xor, packetBody, self.DP_CHECKSUM_BASE) | ||||||||||
| packetBody.append(checksum) | ||||||||||
|
|
||||||||||
| packet = bytearray( | ||||||||||
| [ | ||||||||||
| self.DP_PacketSyncByte.SYNC1, | ||||||||||
| self.DP_PacketSyncByte.SYNC2, | ||||||||||
| ], | ||||||||||
| ) | ||||||||||
| packet.extend(struct.pack(">H", len(packetBody))) | ||||||||||
| packet.extend(packetBody) | ||||||||||
|
|
||||||||||
| return bytes(packet) | ||||||||||
|
|
||||||||||
| def test_completePacketAtOnce(self): | ||||||||||
| """Test receiving a complete packet in a single call (BLE behavior).""" | ||||||||||
| packet = self._createPacket(dest=0, cmd=0x0101, seqNum=1, data=b"test") | ||||||||||
|
|
||||||||||
| self.driver._onReceive(packet) | ||||||||||
|
|
||||||||||
| self.assertEqual(len(self.processedPackets), 1) | ||||||||||
| self.assertEqual(len(self.driver._receiveBuffer), 0) | ||||||||||
|
|
||||||||||
| def test_byteAtATime(self): | ||||||||||
| """Test receiving a packet one byte at a time (Serial behavior).""" | ||||||||||
| packet = self._createPacket(dest=0, cmd=0x0101, seqNum=1, data=b"AB") | ||||||||||
|
|
||||||||||
| for byte in packet: | ||||||||||
| self.driver._onReceive(bytes([byte])) | ||||||||||
|
|
||||||||||
| self.assertEqual(len(self.processedPackets), 1) | ||||||||||
| self.assertEqual(len(self.driver._receiveBuffer), 0) | ||||||||||
|
|
||||||||||
| def test_partialPacket(self): | ||||||||||
| """Test receiving a packet in multiple chunks.""" | ||||||||||
| packet = self._createPacket(dest=0, cmd=0x0101, seqNum=1, data=b"test data") | ||||||||||
|
|
||||||||||
| chunk1 = packet[: len(packet) // 2] | ||||||||||
| chunk2 = packet[len(packet) // 2 :] | ||||||||||
|
|
||||||||||
| self.driver._onReceive(chunk1) | ||||||||||
| self.assertEqual(len(self.processedPackets), 0) | ||||||||||
| self.assertGreater(len(self.driver._receiveBuffer), 0) | ||||||||||
|
|
||||||||||
| self.driver._onReceive(chunk2) | ||||||||||
| self.assertEqual(len(self.processedPackets), 1) | ||||||||||
| self.assertEqual(len(self.driver._receiveBuffer), 0) | ||||||||||
|
|
||||||||||
| def test_multiplePacketsAtOnce(self): | ||||||||||
| """Test receiving multiple complete packets in a single call.""" | ||||||||||
| packet1 = self._createPacket(dest=0, cmd=0x0101, seqNum=1, data=b"A") | ||||||||||
| packet2 = self._createPacket(dest=0, cmd=0x0102, seqNum=2, data=b"B") | ||||||||||
| packet3 = self._createPacket(dest=0, cmd=0x0103, seqNum=3, data=b"C") | ||||||||||
|
||||||||||
| packet2 = self._createPacket(dest=0, cmd=0x0102, seqNum=2, data=b"B") | |
| packet3 = self._createPacket(dest=0, cmd=0x0103, seqNum=3, data=b"C") | |
| packet2 = self._createPacket(dest=0, cmd=0x0100, seqNum=2, data=b"B") | |
| packet3 = self._createPacket(dest=0, cmd=0x0110, seqNum=3, data=b"C") |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -99,3 +99,47 @@ def matchFunc(match: bdDetect.DeviceMatch) -> bool: | |||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do you think this could be moved into a PR with the the planned hwIo/ble module? |
||||||||||||||||||||||||||||||||||
| registrar.addBluetoothDevices(matchFunc) | ||||||||||||||||||||||||||||||||||
| self.assertEqual(registrar._getDriverDict().get(bdDetect.CommunicationType.BLUETOOTH), matchFunc) | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| @unittest.skip("Requires BLE support in bdDetect from #19122") | ||||||||||||||||||||||||||||||||||
| def test_addBleDevices(self): | ||||||||||||||||||||||||||||||||||
| """Test adding a BLE match function.""" | ||||||||||||||||||||||||||||||||||
| from brailleDisplayDrivers import dotPad | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| registrar = bdDetect.DriverRegistrar(dotPad.BrailleDisplayDriver.name) | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| def matchFunc(match: bdDetect.DeviceMatch) -> bool: | ||||||||||||||||||||||||||||||||||
| return match.id.startswith("DotPad") | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| registrar.addBleDevices(matchFunc) | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| stored_match_func = registrar._getDriverDict().get(bdDetect.CommunicationType.BLE) | ||||||||||||||||||||||||||||||||||
| self.assertEqual(stored_match_func, matchFunc) | ||||||||||||||||||||||||||||||||||
| self.assertTrue(callable(stored_match_func)) | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| @unittest.skip("Requires BLE support in bdDetect from #19122") | ||||||||||||||||||||||||||||||||||
| def test_bleDeviceMatching(self): | ||||||||||||||||||||||||||||||||||
| """Test that BLE device matching works correctly.""" | ||||||||||||||||||||||||||||||||||
| from brailleDisplayDrivers import dotPad | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| registrar = bdDetect.DriverRegistrar(dotPad.BrailleDisplayDriver.name) | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| registrar.addBleDevices(dotPad.BrailleDisplayDriver._isBleDotPad) | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| matching_device = bdDetect.DeviceMatch( | ||||||||||||||||||||||||||||||||||
| type=bdDetect.ProtocolType.BLE, | ||||||||||||||||||||||||||||||||||
| id="DotPad320", | ||||||||||||||||||||||||||||||||||
| port="AA:BB:CC:DD:EE:FF", | ||||||||||||||||||||||||||||||||||
| deviceInfo={"name": "DotPad320", "address": "AA:BB:CC:DD:EE:FF"}, | ||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| non_matching_device = bdDetect.DeviceMatch( | ||||||||||||||||||||||||||||||||||
| type=bdDetect.ProtocolType.BLE, | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
| type=bdDetect.ProtocolType.BLE, | |
| id="DotPad320", | |
| port="AA:BB:CC:DD:EE:FF", | |
| deviceInfo={"name": "DotPad320", "address": "AA:BB:CC:DD:EE:FF"}, | |
| ) | |
| non_matching_device = bdDetect.DeviceMatch( | |
| type=bdDetect.ProtocolType.BLE, | |
| type=bdDetect.ProtocolType.CUSTOM, | |
| id="DotPad320", | |
| port="AA:BB:CC:DD:EE:FF", | |
| deviceInfo={"name": "DotPad320", "address": "AA:BB:CC:DD:EE:FF"}, | |
| ) | |
| non_matching_device = bdDetect.DeviceMatch( | |
| type=bdDetect.ProtocolType.CUSTOM, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can the init file get copyright headers?