Skip to content
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 142 additions & 0 deletions sets/mindstorms-ev3/education-expansion/remote_control/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
from pybricks.hubs import EV3Brick
from pybricks.parameters import Color, Port, Stop, Direction
from pybricks.ev3devices import Motor
from pybricks.tools import wait, StopWatch, run_task
from pybricks.messaging import rfcomm_connect
from micropython import const
from umath import sin, pi
import ustruct

hub = EV3Brick()

# The large wheel connected to port A is the steering wheel.
axis1_motor = Motor(Port.A, positive_direction=Direction.COUNTERCLOCKWISE)

# The small wheel connected to port D is the throttle trigger.
axis2_motor = Motor(Port.D)

# These variables are used to compute a duty cycle that resists the user's
# attempt to turn the motor in a direction with a low duty cycle, while still
# allowing us to tell when the user stops turning the motor in that direction.

OSCILLATE_PERIOD = const(22) # ms
ANGULAR_FREQ_COEFF = 2 * pi / OSCILLATE_PERIOD

def resist_duty(base_duty, t, static_friction_duty) -> int:
"""Computes a duty cycle that oscillates around a base duty cycle."""
added_duty = sin(t * ANGULAR_FREQ_COEFF) * static_friction_duty
return int(base_duty + added_duty)

# The fraction of the motor's power that is required to overcome the
# motor's static friction. For large motors, it's a greater fraction
# of the motor's maximum power than for small motors.
LARGE_MOTOR_STATIC_FRICTION_DUTY = const(20)
SMALL_MOTOR_STATIC_FRICTION_DUTY = const(10)

# The bluetooth address of the remotely controlled device. Change this
# to the address printed by your remote controlled device program.
TARGET_BLUETOOTH_ADDRESS = 'F0:45:DA:13:1C:8A'

def steering_wheel(t: int) -> int:
"""Drives the steering wheel process.

1. Provides force-feedback for the steering wheel depending on how
far it is from neutral.
2. Returns an integer between -127 and 127 that expresses how far
the steering wheel is from neutral.

Args:
t (int): The time in milliseconds since the process started.
Returns:
int: An integer between -127 and 127 representing the steering
wheel position.
"""
STEERING_RESIST_MAX_DUTY = const(30)
STEERING_DEAD_ZONE = const(10) # degrees
RESIST_START_ANGLE = const(50) # degrees - angle where resistance begins
RESIST_RANGE = const(130) # degrees - range over which resistance ramps to max
MAX_ANGLE = const(180) # degrees - maximum steering wheel angle

angle = axis1_motor.angle()
speed = axis1_motor.speed()
abs_angle = abs(angle)

if abs_angle < STEERING_DEAD_ZONE:
hub.light.on(Color.BLUE)
axis1_motor.brake()
if abs_angle < 1:
axis1_motor.stop()
elif (angle > 0) != (speed > 0) and speed != 0:
# If the motor is currently turned in one direction and turning back
# toward zero, encourage that motion by running the motor toward zero.
axis1_motor.dc(-30 if angle > 0 else 30)
else:
# If the user is currently turning the motor away from zero, resist
# that movement with a duty cycle that increases the further we get
# from zero. Duty cycle of resistance maxes out at 15%.
base_duty = STEERING_RESIST_MAX_DUTY * max(0, min(1, (abs_angle - RESIST_START_ANGLE) / RESIST_RANGE))
duty = resist_duty(base_duty, t, LARGE_MOTOR_STATIC_FRICTION_DUTY)
duty = duty if angle < 0 else -duty
axis1_motor.dc(duty)

# Scale the return value: 0 in dead zone, ±127 at max resistance angle
if abs_angle < STEERING_DEAD_ZONE:
return 0

scaled = int(127 * (abs_angle - STEERING_DEAD_ZONE) / (MAX_ANGLE - STEERING_DEAD_ZONE))
scaled = min(127, scaled) # Cap at 127
return scaled if angle > 0 else -scaled


AXIS2_MAX_THROW = const(29) # degrees - maximum throw for throttle


def throttle(t: int) -> int:
"""Returns an integer between 0 and 127 representing the throttle position."""
angle = axis2_motor.angle()

# This extra duty we apply helps keep the motor unstuck, which makes
# it easier for the user to adjust it with small increments.
axis2_motor.dc(resist_duty(0, t, SMALL_MOTOR_STATIC_FRICTION_DUTY))

scaled = int(127 * angle / AXIS2_MAX_THROW)
return min(127, max(0, scaled))


# How often we send update messages to the remote controlled device.
UPDATE_PERIOD = const(16) # ms

async def main():
"""Main remote control process."""
while True:
hub.light.on(Color.RED)
conn = None
try:
conn = await rfcomm_connect(TARGET_BLUETOOTH_ADDRESS)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are going to use it like this, we should make the connection object a context manager, so we can simply use a with statement instead of try/finally.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although I do prefer this functional style of rfcomm_connect() creating a connection object in general, it doesn't match our established pattern of a more object-oriented approach in Pybricks.

I.e. we would have rfcomm = Rfcomm() in the setup phase of the program (address could be here or later). And this object would have connect(), wait_for_connection() and disconnect() (or close()) methods.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

R.e. context manager: I agree. I will note it as a TODO before we mark the PR as mergable.

R.e. the functional vs. object style: Initially I was going to submit this as a purely experimental API. But since it seems like this might be something we can actually launch, I will convert this to an object style API as you suggest. It might actually make the context manager a little less complicated, since you can do

with RFCOMMSocket() as sock:
  await sock.connect(...)

As opposed to

async with rfcomm_connect(...) as sock:
  ...

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't saying that we have to do it the object oriented way. But we should consider how this will fit into the block programming language as well.

with RFCOMMSocket() as sock:

This is actually not ideal when used in a loop because it will create a new object on each loop. Maybe I was too quick to suggest using a context manager.

Our usual pattern is to declare all objects globally at the beginning of the program (setup stack in the block language).

Another thing that has come up in the past is a need to set the actual channel being used. E.g. when Windows connects to a Bluetooth device with serial service, it automatically sets up 2 RFCOMM channels, one for incoming and one for outgoing, so we have to put in the right channel number for that.

So my first thought would be to do something like:

hub = EV3Brick()
comm0 = RFCOMMChannel(0)

I'm a little tempted to include the address in the constructor as well, but there are different rules depending on if you are connecting or listening.

Copy link
Copy Markdown
Author

@jaguilar jaguilar Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually not ideal when used in a loop because it will create a new object on each loop. Maybe I was too quick to suggest using a context manager.

We should expect the need to reconnect to be very rare. I think it's probably okay.

E.g. when Windows connects to a Bluetooth device with serial service, it automatically sets up 2 RFCOMM channels, one for incoming and one for outgoing, so we have to put in the right channel number for that.

I didn't notice that behavior when I was messing with it. I had been under the impression that you can send and receive with the same rfcomm channel. AFAICT when you open a socket with python in Windows you only get one rfcomm channel created on the server. I did also create "ping-pong" test scripts on Windows and both when acting as a server and client, it sends and receives messages on the same rfcomm channel (from btstack's perspective).

I actually have removed myself from Windows lately but I would be interested to hear more about this.

comm0 = RFCOMMChannel(0)

Currently, the code does not allow the user to select which channel to address. This is a problem that we'll want to fix in a later update.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood. FWIW, I was able to talk to Windows with this. One drawback of the current code is that it will randomly select among the available channels windows advertises over SDP (picking 1 if possible). It would not be a bad thing to give the users an option of picking a specific channel.

Personally I would advise our users to steer clear of outgoing com ports on Windows. Just use Python with socket(AF_BLUETOOTH) instead, IMO.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I would advise our users to steer clear of outgoing com ports on Windows. Just use Python with socket(AF_BLUETOOTH) instead, IMO.

Sure. Windows is still going to create the COM ports though, which ties up a couple of the RFCOMM channels whether you use the COM ports or not. So if we can make it just work without an explicit channel and avoid the ones Windows uses, all the better. It would be nice if could still talk to NXT/EV3 running official LEGO firmware too though, which might require specifying a specific channel.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I'm down to add specifying a channel explicitly. And likewise on the server.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I implemented the RFCOMMSocket object per your original suggestion. This seems to be the python/micropython way. I implemented context manager, but its usage is optional. Still haven't done the manual channel selection stuff -- let's finish the review on the main body of the code first and once everyone is satisfied with what is already here I can tack that on.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates! I haven't been able to follow all updates this weekend.

Could we make some mock examples of end user code with the various approaches in pybricks/support#2274?

hub.light.on(Color.GREEN)
stopwatch = StopWatch()
update = StopWatch()
msg_buf = bytearray(2)
while True:
t = stopwatch.time()
wheel = steering_wheel(t)
throttle_value = throttle(t)

if update.time() < UPDATE_PERIOD:
# We drive the steering wheel and throttle at a
# higher rate than we send out updates, because
# their motors need to be adjusted more frequently
# than our RC device needs to receive updates.
continue
Comment thread
jaguilar marked this conversation as resolved.

ustruct.pack_into('>bb', msg_buf, 0, wheel, throttle_value)
conn.write(msg_buf)
update.reset()

print(f"AXIS1:{wheel} AXIS2:{throttle_value}")
await wait(1)
finally:
if conn:
conn.close()

run_task(main())
87 changes: 87 additions & 0 deletions sets/mindstorms-ev3/education-expansion/tank_bot_rc/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
#!/usr/bin/env pybricks-micropython

"""
Example LEGO® MINDSTORMS® EV3 Tank Bot Program
----------------------------------------------

This program requires LEGO® EV3 MicroPython v2.0.
Download: https://education.lego.com/en-us/support/mindstorms-ev3/python-for-ev3

Building instructions can be found at:
https://education.lego.com/en-us/support/mindstorms-ev3/building-instructions#building-expansion
"""

from pybricks.hubs import EV3Brick
from pybricks.ev3devices import Motor, GyroSensor
from pybricks.parameters import Port, Direction, Button, Color
from pybricks.tools import StopWatch, wait, run_task
from pybricks.robotics import DriveBase
from pybricks.messaging import rfcomm_listen, local_address

from micropython import const

import ustruct

# Initialize the EV3 brick.
ev3 = EV3Brick()

# Configure 2 motors on Ports B and C. Set the motor directions to
# counterclockwise, so that positive speed values make the robot move
# forward. These will be the left and right motors of the Tank Bot.
left_motor = Motor(Port.A, Direction.COUNTERCLOCKWISE)
right_motor = Motor(Port.D, Direction.COUNTERCLOCKWISE)

# The wheel diameter of the Tank Bot is about 54 mm.
WHEEL_DIAMETER = 54

# The axle track is the distance between the centers of each of the
# wheels. This is about 200 mm for the Tank Bot.
AXLE_TRACK = 200

# The Driving Base is comprised of 2 motors. There is a wheel on each
# motor. The wheel diameter and axle track values are used to make the
# motors move at the correct speed when you give a drive command.
robot = DriveBase(left_motor, right_motor, WHEEL_DIAMETER, AXLE_TRACK)

SPEED_SCALE = 6 # Scale factor for speed (768 // 127)
TURN_SCALE = 2 # Scale factor for turn rate (320 // 127)

# Storage for incoming messages from remote control.
msg_buf = bytearray(2)
msg_buf_view = memoryview(msg_buf)

# Tracks the next-to-be-filled index in msg_buf.
cur_idx = 0

async def main():
while True:
print('Local address: ', local_address())
ev3.light.on(Color.RED)
print('Waiting for connection...')
conn = await rfcomm_listen()
print('Connected!')
ev3.light.on(Color.GREEN)

timeout = StopWatch()
cur_idx = 0
while timeout.time() < 100:
cur_idx += conn.readinto(msg_buf_view[cur_idx:], len(msg_buf) - cur_idx)
Comment thread
jaguilar marked this conversation as resolved.
Outdated

if cur_idx != len(msg_buf):
# We were not able to read the entire message. Loop again.
await wait(1)
continue

timeout.reset()
axis1, axis2 = ustruct.unpack('>bb', msg_buf)
cur_idx = 0

speed = axis2 * SPEED_SCALE # -768 to +768 mm/s
turn_rate = axis1 * TURN_SCALE # -320 to +320 deg/s
robot.drive(speed, turn_rate)

robot.stop()
print('Client disconnected or timed out.')
conn.close()

run_task(main())