sim2real/rl_policy/utils/motion_buffer.py 工作逻辑
这篇文档单独解释 RealtimeMotionBuffer 是怎么工作的。
它的职责可以概括成一句话:
持续接收 publisher 发来的实时 motion 数据
-> 按时间戳缓存
-> 在 policy 需要 observation 时按目标时间插值
-> 返回统一的 MotionData
这个类主要服务 tracking policy 的 live / zmq motion 模式。
1. 它解决的核心问题
tracking policy 不是只需要“当前这一帧” motion。
它通常需要这样一组参考时间点:
future_steps = [-4, -2, 0, 1, 2, 3, 4, 8, 16]
dt = 20ms
也就是说,policy 一次 forward 需要:
- 比“参考时刻”更早的几帧
- 参考时刻本身
- 未来几帧
但实时 VR teleop 的 motion 是通过 ZMQ 一帧一帧到达的,而且到达时间并不一定正好卡在这些目标时间点上。
所以需要一个 buffer 来做三件事:
- 把收到的 motion 按时间存起来
- 给 policy 提供一个“延迟后的稳定时间轴”
- 对不在离散采样点上的目标时间做插值
2. 整体数据流
PICO / XR body stream
-> pico_g1_zmq_publisher.py
-> ZMQ JSON payload
-> RealtimeMotionBuffer
-> MotionData
-> StateProcessor
-> track observations
-> policy
更细一点:
publisher
发出:
smplx_t_ns
joint_pos
body_pos_w
body_quat_w
RealtimeMotionBuffer
收到 payload
-> 解析 JSON
-> 按 smplx_t_ns 排序插入
-> 保存到内部时间缓存
policy step
当前时间 t
-> 取参考时刻 t - delay
-> 根据 future_steps 生成一组目标时间戳
-> 从 buffer 里查找/插值
-> 返回 MotionData
3. 初始化时做了什么
RealtimeMotionBuffer.__init__() 里会初始化这些状态:
joint_names- 固定为 teleop 侧定义好的 canonical G1 joint 顺序
body_names- 固定为 teleop 侧定义好的 canonical G1 body 顺序
future_steps- policy 配置里要求的参考时间步列表
dt_s- motion 的离散步长,tracking 里通常是
0.02
- motion 的离散步长,tracking 里通常是
tolerance_s- 给网络抖动 / retarget 延迟预留的安全余量
delay_s- 如果没显式传,就自动按下面公式算:
delay_s = max(future_steps) * dt_s + tolerance_s
例如:
future_steps = [-4, -2, 0, 1, 2, 3, 4, 8, 16]
dt_s = 0.02
tolerance_s = 0.04
delay_s = 16 * 0.02 + 0.04 = 0.36 s
它还会准备两个内部容器:
_timestamps_ns- 每一帧对应的时间戳
_frames- 每一帧对应的 motion 数据
可以理解成:
_timestamps_ns: [t0, t1, t2, t3, ...]
_frames: [f0, f1, f2, f3, ...]
两者下标一一对应,而且始终按时间升序排列。
4. ZMQ 收包线程怎么工作
如果初始化时传了 motion_zmq_connect,buffer 会起一个后台线程:
while True:
recv payload from ZMQ
parse payload
append into sorted buffer
这部分逻辑在 _start_motion_stream() 里。
每次收到一条 payload,会交给 __append_payload()。
5. 一条 payload 进入 buffer 时发生了什么
__append_payload() 的逻辑可以概括成:
raw string / bytes
-> 去掉 topic 前缀
-> JSON 反序列化
-> 取出时间戳和 motion 字段
-> 转成 numpy
-> 插入有序时间轴
当前它主要读这些字段:
smplx_t_nsjoint_posbody_pos_wbody_quat_w
兼容逻辑:
- 如果没有
joint_pos,也会尝试读dof_pos或qpos - 如果给了
joint_names/body_names,会检查它们是否和 canonical G1 顺序一致
插入时用的是 bisect_right,所以即使消息有轻微乱序,也会按时间顺序插进去。
内部每个 frame 现在只保留三类核心数据:
frame = {
joint_pos,
body_pos_w,
body_quat_w,
}
6. 为什么 policy 看到的是 t - delay
这是这个 buffer 最重要的设计点。
policy 在控制时刻 t,不能直接把 t 当作参考时刻,因为它还需要未来若干步的数据。
所以这里采用:
参考时刻 = t - delay
这样,policy 实际上总是在看“一段更早、但已经收齐未来窗口”的 motion。
示意图:
当前真实时间: t
|
v
时间轴: ---- ---- ---- ---- ---- ---- ---- ---- ----
^
|
t - delay
policy 需要的 future_steps:
[-4, -2, 0, 1, 2, 3, 4, 8, 16]
对应目标时间:
(t-delay) - 4*dt
(t-delay) - 2*dt
(t-delay) + 0*dt
(t-delay) + 1*dt
...
(t-delay) + 16*dt
所以虽然名字叫 future_steps,但从“当前真实时间 t”的视角看,它其实是:
- 一个经过延迟对齐后的未来窗口
- 用这个窗口来保证 observation 不会因为“未来帧还没到”而缺数据
7. get_obs() 怎么取一组 observation
get_obs() 每次被调用时,大致做这几步。
第一步:拿当前时间
current_time_ns = time.time_ns()
第二步:清掉太旧的数据
先算一个 cutoff:
cutoff = current_time
- (delay + abs(min_future_step) * dt)
这表示:
- 比这个时间更早的帧
- 连最早那个负的
future_step也用不上了
就可以删掉。
但实现上会保留 cutoff 前最后一帧,避免插值时丢失左端点。
示意图:
时间轴:
old old old keep | useful useful useful useful
^
|
cutoff
清理规则:
cutoff 之前不是全删
而是保留 cutoff 前最后一帧
这是为了保证:
目标时间落在 [旧帧, 新帧] 中间时
仍然能做插值
第三步:生成目标时间戳
先算参考基准时间:
target_base = current_time - delay
再把 future_steps 展开成具体时间:
target_times = target_base + future_steps * dt
例如:
future_steps = [-4, -2, 0, 1, 2, 3, 4, 8, 16]
dt = 20ms
target_times =
target_base - 80ms
target_base - 40ms
target_base
target_base + 20ms
...
target_base + 320ms