机械臂校准Python 脚本模板,用于自动化执行你提供的机械臂升降关节(Joint 7)标定流程。
✅ 假设:
- 机械臂控制器通过 TCP Socket 接收 JSON 命令;
- 每条命令发送后需等待 1 秒(或直到收到响应);
- 人工操作步骤(ID 010)会暂停脚本并提示用户。
✅ Python 脚本:calibrate_lift_joint.py
import socket
import json
import time
import logging
================== 配置 ==================
CONTROLLER_IP = "192.168.1.100" # 替换为你的机械臂控制器 IP
CONTROLLER_PORT = 8080 # 替换为实际端口
DELAY_MS = 1000 # 每条指令后延时(毫秒)
TIMEOUT_SEC = 5 # Socket 超时
设置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s"
)
logger = logging.getLogger(name)
def send_command(sock, cmd_dict, annotation=""):
"""发送 JSON 命令到控制器"""
try:
json_str = json.dumps(cmd_dict)
sock.sendall((json_str + "n").encode('utf-8'))
logger.info(f"Sent: {annotation} | {json_str}")
# 可选:等待响应(根据协议调整)
# response = sock.recv(1024).decode('utf-8')
# logger.debug(f"Response: {response}")
time.sleep(DELAY_MS / 1000.0) # 转换为秒
except Exception as e:
logger.error(f"发送失败: {e}")
raise
def main():
# 创建 TCP 连接
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(TIMEOUT_SEC)
try:
logger.info(f"Connecting to {CONTROLLER_IP}:{CONTROLLER_PORT}...")
sock.connect((CONTROLLER_IP, CONTROLLER_IP))
logger.info("Connected.")
except Exception as e:
logger.error(f"连接失败: {e}")
return
# =============== 阶段 1: 初始设置 ===============
logger.info("=== 阶段 1: 初始参数设置 ===")
commands_phase1 = [
({"command": "get_lift_state"}, "查升降状态"),
({"command": "set_joint_en_state", "joint_en_state": [7, 0]}, "掉使能"),
({"command": "set_joint_zero_pos", "joint_zero_pos": 7}, "设零位(临时)"),
({"command": "set_joint_min_pos", "joint_min_pos": [7, -17400000]}, "最小限位"),
({"command": "set_joint_max_pos", "joint_max_pos": [7, 17400000]}, "最大限位"),
({"command": "save_device_info_all"}, "参数保存"),
({"command": "set_joint_en_state", "joint_en_state": [7, 1]}, "上使能"),
({"command": "get_lift_state"}, "查询状态"),
({"command": "set_joint_clear_err", "joint_clear_err": 7}, "清除错误"),
({"command": "set_joint_en_state", "joint_en_state": [7, 1]}, "上使能"),
]
for cmd, note in commands_phase1:
send_command(sock, cmd, note)
# =============== 关键人工步骤 ===============
print("n" + "="*50)
print("🛑 请手动操作:")
print("使用示教器将升降关节移动至下方接近传感器限位处(物理零点)")
input("✅ 完成后按 Enter 继续...")
print("="*50 + "n")
# =============== 阶段 2: 精确标定 ===============
logger.info("=== 阶段 2: 精确定位零点 ===")
commands_phase2 = [
({"command": "set_joint_en_state", "joint_en_state": [7, 0]}, "掉使能"),
({"command": "set_joint_zero_pos", "joint_zero_pos": 7}, "设真实零位"),
({"command": "set_joint_min_pos", "joint_min_pos": [7, -100000]}, "修改最小限位"),
({"command": "save_device_info_all"}, "参数保存"),
({"command": "set_joint_en_state", "joint_en_state": [7, 1]}, "上使能"),
]
for cmd, note in commands_phase2:
send_command(sock, cmd, note)
logger.info("✅ 升降关节标定完成!")
if name == "main":
main()
🔧 使用前需修改的参数
参数 说明
CONTROLLER_IP 机械臂控制器的 IP 地址(如 192.168.1.100)
CONTROLLER_PORT 通信端口(常见:8080, 502, 10000 等)
DELAY_MS 若控制器要求更长/短延时,可调整
通信协议 如果使用 串口(Serial) 或 Modbus,需替换 socket 为 pyserial 或 pymodbus
📌 注意事项
1. 人工干预不可跳过
ID 010 必须由人确认位置,脚本已用 input() 暂停。
2. 错误处理
当前脚本在发送失败时会抛出异常。你可根据需要添加重试机制。
3. 响应解析(可选)
如果控制器返回 JSON 响应(如 {"status": "ok"}),可取消注释 sock.recv() 并解析结果,实现闭环验证。
4. 安全第一
- 确保周围无人;
- 建议先在仿真环境测试;
- 标定前确认急停可用。
🔄 扩展建议
- 支持多关节:将 7 改为变量 joint_id;
- 配置文件化:用 config.yaml 存储 IP、限位值等;
- GUI 版本:用 tkinter 或 PyQt 做按钮式操作界面;
- 日志存档:将日志写入文件,便于追溯。
下面分别提供 串口(Serial)版本 和 ROS 2 版本 的 Python 脚本,用于执行你提供的机械臂升降关节(Joint 7)标定流程。
✅ 一、串口(Serial)版本
适用于通过 USB/RS485 串口 与控制器通信的场景(如 STM32、PLC、自研驱动板)。
📦 依赖安装
pip install pyserial
📄 脚本:calibrate_serial.py
import serial
import json
import time
import logging
================== 配置 ==================
SERIAL_PORT = "/dev/ttyUSB0" # Linux: /dev/ttyUSB0, Windows: "COM3"
BAUDRATE = 115200
TIMEOUT = 1.0 # 读超时(秒)
DELAY_MS = 1000 # 指令间延时
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger(name)
def send_serial_command(ser, cmd_dict, annotation=""):
try:
json_str = json.dumps(cmd_dict)
ser.write((json_str + "n").encode('utf-8'))
logger.info(f"Sent: {annotation} | {json_str}")
# 可选:读取响应(假设一行 JSON)
# response = ser.readline().decode('utf-8').strip()
# if response:
# logger.debug(f"Response: {response}")
time.sleep(DELAY_MS / 1000.0)
except Exception as e:
logger.error(f"串口发送失败: {e}")
raise
def main():
try:
with serial.Serial(SERIAL_PORT, BAUDRATE, timeout=TIMEOUT) as ser:
logger.info(f"Opened serial port {SERIAL_PORT} @ {BAUDRATE} baud")
# === 阶段1:初始设置 ===
phase1 = [
({"command": "get_lift_state"}, "查升降状态"),
({"command": "set_joint_en_state", "joint_en_state": [7, 0]}, "掉使能"),
({"command": "set_joint_zero_pos", "joint_zero_pos": 7}, "设零位(临时)"),
({"command": "set_joint_min_pos", "joint_min_pos": [7, -17400000]}, "最小限位"),
({"command": "set_joint_max_pos", "joint_max_pos": [7, 17400000]}, "最大限位"),
({"command": "save_device_info_all"}, "参数保存"),
({"command": "set_joint_en_state", "joint_en_state": [7, 1]}, "上使能"),
({"command": "get_lift_state"}, "查询状态"),
({"command": "set_joint_clear_err", "joint_clear_err": 7}, "清除错误"),
({"command": "set_joint_en_state", "joint_en_state": [7, 1]}, "上使能"),
]
for cmd, note in phase1:
send_serial_command(ser, cmd, note)
# === 人工操作 ===
print("n" + "="*50)
print("🛑 请手动操作:")
print("使用示教器将升降关节移动至下方接近传感器限位处")
input("✅ 完成后按 Enter 继续...")
print("="*50 + "n")
# === 阶段2:精确定位 ===
phase2 = [
({"command": "set_joint_en_state", "joint_en_state": [7, 0]}, "掉使能"),
({"command": "set_joint_zero_pos", "joint_zero_pos": 7}, "设真实零位"),
({"command": "set_joint_min_pos", "joint_min_pos": [7, -100000]}, "修改最小限位"),
({"command": "save_device_info_all"}, "参数保存"),
({"command": "set_joint_en_state", "joint_en_state": [7, 1]}, "上使能"),
]
for cmd, note in phase2:
send_serial_command(ser, cmd, note)
logger.info("✅ 标定完成!")
except serial.SerialException as e:
logger.error(f"串口错误: {e}")
except KeyboardInterrupt:
logger.info("用户中断")
if name == "main":
main()
🔧 注意:
- Linux 下权限问题:sudo usermod -aG dialout $USER
- 波特率、数据位等需与控制器一致
✅ 二、ROS 2 版本
适用于 ROS 2 系统(如 Humble/Foxy),通过 自定义服务(Service)或话题(Topic) 控制机械臂。
已有一个 ROS 2 节点提供 SetCommand 服务,接收 JSON 字符串。
📦 依赖
- 已安装 ROS 2(Humble 或 Foxy)
- 自定义接口包(如 arm_msgs)
1. 创建自定义服务(可选)
若尚未定义,创建 srv/SetCommand.srv:
string command_json
bool success
string message
2. ROS 2 脚本:calibrate_ros2.py
import rclpy
from rclpy.node import Node
from arm_msgs.srv import SetCommand # 替换为你的实际包名
import json
import time
class Calibrator(Node):
def init(self):
super().init('lift_calibrator')
self.cli = self.create_client(SetCommand, 'arm/set_command')
while not self.cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info('等待 arm/set_command 服务...')
self.get_logger().info('服务可用')
def send_command(self, cmd_dict, annotation=""):
req = SetCommand.Request()
req.command_json = json.dumps(cmd_dict)
future = self.cli.call_async(req)
rclpy.spin_until_future_complete(self, future)
result = future.result()
if result.success:
self.get_logger().info(f"✅ {annotation}")
else:
self.get_logger().error(f"❌ {annotation}: {result.message}")
raise RuntimeError("命令执行失败")
time.sleep(1.0) # DELAY_MS = 1000ms
def main(args=None):
rclpy.init(args=args)
calibrator = Calibrator()
try:
# === 阶段1 ===
phase1 = [
({"command": "get_lift_state"}, "查升降状态"),
({"command": "set_joint_en_state", "joint_en_state": [7, 0]}, "掉使能"),
({"command": "set_joint_zero_pos", "joint_zero_pos": 7}, "设零位(临时)"),
({"command": "set_joint_min_pos", "joint_min_pos": [7, -17400000]}, "最小限位"),
({"command": "set_joint_max_pos", "joint_max_pos": [7, 17400000]}, "最大限位"),
({"command": "save_device_info_all"}, "参数保存"),
({"command": "set_joint_en_state", "joint_en_state": [7, 1]}, "上使能"),
({"command": "get_lift_state"}, "查询状态"),
({"command": "set_joint_clear_err", "joint_clear_err": 7}, "清除错误"),
({"command": "set_joint_en_state", "joint_en_state": [7, 1]}, "上使能"),
]
for cmd, note in phase1:
calibrator.send_command(cmd, note)
# === 人工操作 ===
print("n" + "="*50)
print("🛑 请手动操作:")
print("使用示教器将升降关节移动至下方接近传感器限位处")
input("✅ 完成后按 Enter 继续...")
print("="*50 + "n")
# === 阶段2 ===
phase2 = [
({"command": "set_joint_en_state", "joint_en_state": [7, 0]}, "掉使能"),
({"command": "set_joint_zero_pos", "joint_zero_pos": 7}, "设真实零位"),
({"command": "set_joint_min_pos", "joint_min_pos": [7, -100000]}, "修改最小限位"),
({"command": "save_device_info_all"}, "参数保存"),
({"command": "set_joint_en_state", "joint_en_state": [7, 1]}, "上使能"),
]
for cmd, note in phase2:
calibrator.send_command(cmd, note)
calibrator.get_logger().info("✅ ROS 2 标定完成!")
except KeyboardInterrupt:
pass
finally:
calibrator.destroy_node()
rclpy.shutdown()
if name == 'main':
main()
📌 使用步骤(ROS 2)
1. 编译你的工作空间:
colcon build --packages-select your_arm_package
source install/setup.bash
2. 启动机械臂驱动节点(提供 arm/set_command 服务)
3. 运行标定脚本:
ros2 run your_arm_package calibrate_ros2
🔚 总结对比
特性 TCP Socket 串口 (Serial) ROS 2
适用场景 局域网控制 直连嵌入式控制器 机器人中间件系统
实时性 中 高 中(依赖 DDS)
依赖 标准库 pyserial ROS 2 + 自定义消息
扩展性 一般 低 高(可集成 TF、MoveIt 等)