diff --git a/simulate_python/unitree_mujoco.py b/simulate_python/unitree_mujoco.py index 95b934fe..0c675494 100755 --- a/simulate_python/unitree_mujoco.py +++ b/simulate_python/unitree_mujoco.py @@ -51,6 +51,9 @@ def SimulationThread(): locker.acquire() + # Update control torques from latest command + unitree.UpdateControl() + if config.ENABLE_ELASTIC_BAND: if elastic_band.enable: mj_data.xfrc_applied[band_attached_link, :3] = elastic_band.Advance( diff --git a/simulate_python/unitree_sdk2py_bridge.py b/simulate_python/unitree_sdk2py_bridge.py index a9ca97ae..eef4a9ad 100644 --- a/simulate_python/unitree_sdk2py_bridge.py +++ b/simulate_python/unitree_sdk2py_bridge.py @@ -88,6 +88,9 @@ def __init__(self, mj_model, mj_data): self.low_cmd_suber = ChannelSubscriber(TOPIC_LOWCMD, LowCmd_) self.low_cmd_suber.Init(self.LowCmdHandler, 10) + # Cache latest motor commands (updated by LowCmdHandler) + self.low_cmd_latest = None + # joystick self.key_map = { "R1": 0, @@ -109,18 +112,30 @@ def __init__(self, mj_model, mj_data): } def LowCmdHandler(self, msg: LowCmd_): - if self.mj_data != None: - for i in range(self.num_motor): - self.mj_data.ctrl[i] = ( - msg.motor_cmd[i].tau - + msg.motor_cmd[i].kp - * (msg.motor_cmd[i].q - self.mj_data.sensordata[i]) - + msg.motor_cmd[i].kd - * ( - msg.motor_cmd[i].dq - - self.mj_data.sensordata[i + self.num_motor] - ) + """Cache the latest low command. Control update happens in UpdateControl().""" + self.low_cmd_latest = msg + + def UpdateControl(self): + """ + Recompute motor control torques on every simulation step. + This is called from the simulation thread before mj_step(). + τ = tau_cmd + kp*(q_des-q_act) + kd*(dq_des-dq_act) + """ + if self.mj_data is None or self.low_cmd_latest is None: + return + + msg = self.low_cmd_latest + for i in range(self.num_motor): + self.mj_data.ctrl[i] = ( + msg.motor_cmd[i].tau + + msg.motor_cmd[i].kp + * (msg.motor_cmd[i].q - self.mj_data.sensordata[i]) + + msg.motor_cmd[i].kd + * ( + msg.motor_cmd[i].dq + - self.mj_data.sensordata[i + self.num_motor] ) + ) def PublishLowState(self): if self.mj_data != None: