云计算百科
云计算领域专业知识百科平台

openpi π₀ 项目部署运行逻辑(三)——策略推理服务器 serve_policy.py

 π₀ 主控脚本都在 scripts 中:

其中,serve_policy.py 是 openpi 中的策略推理服务端脚本,作用为:启动一个 WebSocket 服务器,加载预训练策略模型,等待外部请求(如来自 main.py 的控制程序),然后执行动作推理并返回结果

一句话总结一下:

将一个 Pi0 策略模型部署为网络服务(WebSocket API),供机器人主控进程远程调用

目录

1 库引用

2 参数定义

3 加载策略模型

4 启动推理服务

5 使用方法总结

5.1 使用方法

5.2 问题分析


1 库引用

import dataclasses # 用于创建结构化参数对象
import enum # 用于定义环境枚举类型
import logging # 用于日志输出
import socket # 获取本机 IP 地址信息

import tyro # 命令行参数解析库,比 argparse 更现代

# 引入策略和策略配置模块
from openpi.policies import policy as _policy
from openpi.policies import policy_config as _policy_config
# 引入 WebSocket 推理服务模块
from openpi.serving import websocket_policy_server
# 引入训练配置模块
from openpi.training import config as _config

2 参数定义

# 定义支持的机器人环境枚举类型
class EnvMode(enum.Enum):
ALOHA = "aloha"
ALOHA_SIM = "aloha_sim"
DROID = "droid"
LIBERO = "libero"

# 定义用于加载 checkpoint 的参数结构
@dataclasses.dataclass
class Checkpoint:
config: str # 模型对应的训练配置名,如 "pi0_aloha_sim"
dir: str # checkpoint 目录路径(可以是本地或 S3)

# 定义默认策略类型占位符结构
@dataclasses.dataclass
class Default:
pass # 使用默认模型配置(例如从 DEFAULT_CHECKPOINT 中选择)

# 定义主入口参数结构体
@dataclasses.dataclass
class Args:
env: EnvMode = EnvMode.ALOHA_SIM # 使用的机器人环境
default_prompt: str | None = None # 默认语言提示词
port: int = 8000 # WebSocket 服务端口
record: bool = False # 是否记录策略输出(用于调试)
policy: Checkpoint | Default = dataclasses.field(default_factory=Default)

# 定义每个环境对应的默认 checkpoint
DEFAULT_CHECKPOINT: dict[EnvMode, Checkpoint] = {
EnvMode.ALOHA: Checkpoint(
config="pi0_aloha",
dir="s3://openpi-assets/checkpoints/pi0_base",
),
EnvMode.ALOHA_SIM: Checkpoint(
config="pi0_aloha_sim",
dir="s3://openpi-assets/checkpoints/pi0_aloha_sim",
),
EnvMode.DROID: Checkpoint(
config="pi0_fast_droid",
dir="s3://openpi-assets/checkpoints/pi0_fast_droid",
),
EnvMode.LIBERO: Checkpoint(
config="pi0_fast_libero",
dir="s3://openpi-assets/checkpoints/pi0_fast_libero",
),
}

其中,class Args 使用 Tyro 来从命令行解析参数,包括以下选项:

  • –env:选择环境,如 ALOHA, DROID(仅在使用默认模型时起作用)
  • –default_prompt:语言提示词(自然语言)
  • –port:监听的端口(默认 8000)
  • –record:是否开启动作记录(用于调试)
  • –policy:加载指定 checkpoint,支持:1. Checkpoint(config="…", dir="…")  2. Default() 使用内置 checkpoint

3 加载策略模型

# 从默认环境创建策略模型
def create_default_policy(env: EnvMode, *, default_prompt: str | None = None) -> _policy.Policy:
if checkpoint := DEFAULT_CHECKPOINT.get(env):
return _policy_config.create_trained_policy(
_config.get_config(checkpoint.config), # 加载配置
checkpoint.dir, # 加载参数
default_prompt=default_prompt # 设置默认语言提示
)
raise ValueError(f"Unsupported environment mode: {env}")

# 从命令行参数创建策略模型
def create_policy(args: Args) -> _policy.Policy:
match args.policy:
case Checkpoint(): # 如果显式指定 checkpoint
return _policy_config.create_trained_policy(
_config.get_config(args.policy.config),
args.policy.dir,
default_prompt=args.default_prompt
)
case Default(): # 否则使用默认模型
return create_default_policy(args.env, default_prompt=args.default_prompt)

分两种情况:

1. 手动指定模型与路径,即增加参数 –policy:

–policy 'Checkpoint(config="pi0_aloha", dir="s3://openpi-assets/checkpoints/pi0_base")'

2. 使用内置默认策略(会从 S3 下载),即不指定参数 –policy:

uv run scripts/serve_policy.py –env ALOHA –default_prompt='fold the towel'

4 启动推理服务

# 主逻辑函数
def main(args: Args) -> None:
policy = create_policy(args) # 加载策略对象
policy_metadata = policy.metadata # 获取策略的元信息

if args.record: # 如果启用记录,包裹为记录器
policy = _policy.PolicyRecorder(policy, "policy_records")

# 获取本机 IP 和主机名(日志打印)
hostname = socket.gethostname()
local_ip = socket.gethostbyname(hostname)
logging.info("Creating server (host: %s, ip: %s)", hostname, local_ip)

# 创建 WebSocket 推理服务对象
server = websocket_policy_server.WebsocketPolicyServer(
policy=policy,
host="0.0.0.0", # 监听所有接口
port=args.port, # 指定端口
metadata=policy_metadata, # 附加元信息
)
server.serve_forever() # 启动阻塞服务循环

# 脚本入口:解析 CLI 参数并运行主函数
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, force=True) # 初始化日志
main(tyro.cli(Args)) # 使用 tyro 从命令行解析 Args 并运行

启动一个 WebSocket 服务端,监听端口,等待来自主控系统(如 main.py)的动作请求,完成如下功能:

  • 接收来自客户端的 observation(图像、语言提示等)
  • 执行 policy.infer(…) 
  • 返回预测的动作结果

5 使用方法总结

5.1 使用方法

✅ 最简单用法(使用默认模型):

uv run scripts/serve_policy.py –env ALOHA –default_prompt='fold the towel'

自动从 S3 下载 pi0_aloha 的默认模型,并运行推理服务器,端口0.0.0.0:8000

✅ 使用本地模型:

uv run scripts/serve_policy.py policy:checkpoint \\
–policy.config=pi0_aloha \\
–policy.dir=/home/yejiangchen/.cache/openpi/openpi-assets/checkpoints/pi0_aloha_towel

✅ 启用推理行为记录:

uv run scripts/serve_policy.py –record …

将推理行为记录下来,便于调试

如果在使用本地模型时出现 bug,运行以下命令查看:

uv run scripts/serve_policy.py policy:checkpoint –help

运行成功可以看到:

可以看到日志:

  • 模型已经被正常加载(包括 JAX/Flax 环境、参数、norm stats)
  • 推理服务已启动,在 0.0.0.0:8000 监听 WebSocket 请求
  • 各种 INFO 日志都是正常的底层库启动信息(比如尝试加载 rocm、tpu,最后还是用的 CUDA/CPU,不影响使用)
  • 检查点参数与归一化参数(norm stats)都已找到并成功加载

5.2 问题分析

可以看到 GPU 启动问题,检查 JAX 是否用上 GPU,输入:

(pi0) yejiangchen@yejiangchen-System-Product-Name:~/Desktop/Codes/openpi-main$ python
Python 3.11.11 (main, Dec 11 2024, 16:28:39) [GCC 11.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import jax
>>> print(jax.devices())
[CudaDevice(id=0)]

可以看到:jax.devices() 输出为 [CudaDevice(id=0)],这说明 JAX 在当前环境下已经可以正确识别并调用 GPU(CUDA)。 CUDA 驱动、jax、jaxlib 安装都没有问题

其实只要 jax.devices() 显示 CudaDevice,模型实际计算就会在 GPU 上执行,即便启动日志没高亮显示 gpu 关键字也没关系

JAX/Flax 框架有时不会像 PyTorch 那样在日志里明确输出 “Using CUDA”。它只会在代码第一次涉及到数组/张量运算时,将数据移动到 GPU。如果已经能看到 [CudaDevice(id=0)],说明后续所有计算都会尽量在 GPU 上进行

运行时使用 nvtop 查看 GPU 占用:

安装:

sudo apt install nvtop

使用:

nvtop

可以看到 GPU 基本拉满了

赞(0)
未经允许不得转载:网硕互联帮助中心 » openpi π₀ 项目部署运行逻辑(三)——策略推理服务器 serve_policy.py
分享到: 更多 (0)

评论 抢沙发

评论前必须登录!