【ROS2】MOMO的鱼香ROS2(四)ROS2入门篇——ROS2节点通信之话题与服务
- 开源代码
- 2025-07-21 18:55:16

ROS2节点通信之话题与服务点 引言1 理解从通信开始1.1 TCP(传输控制协议)1.2 UDP(用户数据报协议)1.3 基于共享内存的IPC方式 2 ROS2话题2.1 ROS2话题指令2.2 话题之RCLPY实现2.2.1 编写发布者2.2 2 编写订阅者2.2.3 运行测试 3 ROS2服务3.1 ROS2服务指令3.2 服务之RCLPY实现3.2.1 编写客户端3.2 2 编写服务端3.2.3 运行测试 引言
笔者跟着鱼香ROS的ROS2学习之旅 学习参考: 【ROS2机器人入门到实战】 笔者的学习目录
MOMO的鱼香ROS2(一)ROS2入门篇——从Ubuntu操作系统开启MOMO的鱼香ROS2(二)ROS2入门篇——ROS2初体验MOMO的鱼香ROS2(三)ROS2入门篇——ROS2第一个节点专业术语认识
TCP(传输控制协议)UDP(用户数据报协议) 1 理解从通信开始通信的目的是在计算机系统中实现不同组件、进程或设备之间的信息和数据传递。通过通信,各个实体可以共享信息、协调行动并实现协同工作。在计算机领域,通信是构建分布式系统、网络和协议的基础。 通信的原理涉及两个主要方面:通信协议和通信方式。
通信协议定义了数据的格式、传输方式、错误检测和纠正等规则,以确保可靠的数据传输。 通信方式涉及了不同的通信介质和技术,包括网络通信和进程间通信(IPC)。
1.1 TCP(传输控制协议)使用ping命令进行基于UDP的网络通信:
# 修改为自己的ip地址 ping 192.168.2.42 ping 192.168.2.36 1.2 UDP(用户数据报协议)使用nc命令进行基于TCP的网络通信:
# 终端1 nc -l 1234 # 终端2 echo "Hello, TCP!" | nc 127.0.0.1 1234关于TCP的通信可以参考笔者的博客:【Python】基于socket函数的TCP通信
1.3 基于共享内存的IPC方式基于共享内存的进程间通信(IPC)方式 通过ipcs命令查看当前系统中的共享内存段:
ipcs -m使用ipcrm命令删除不再需要的共享内存段:
ipcrm -m <shmid> 2 ROS2话题一个节点发布数据到某个话题上,另外一个节点就可以通过订阅话题拿到数据。
ROS2话题通信其实还可以是1对n,n对1,n对n的。 同一个话题,所有的发布者和接收者必须使用相同消息接口。
2.1 ROS2话题指令1.查看所有话题
ros2 topic list # 增加查看消息类型 ros2 topic list -t2.打印实时话题内容
ros2 topic echo <topic_name>3.查看主题信息
ros2 topic info <topic_name> 2.2 话题之RCLPY实现创建功能包
cd ROS_WS/colcon_ws/src ros2 pkg create imu_py --build-type ament_python --dependencies rclpy创建节点文件
cd ROS_WS/colcon_ws/src/imu_py touch imu_publisher.py touch imu_subscribe.py 2.2.1 编写发布者imu_publisher.py
# -*- coding: utf-8 -*- """ 1.查看映射端口 ls /dev/ttyUSB* 2.更改端口的权限 sudo chmod 777 /dev/ttyUSB0 """ import rclpy from rclpy.node import Node # 话题接口 from sensor_msgs.msg import Imu # imu接口 # Usart Library import serial # imu接收数据类型 class IMUPublisher(Node): def __init__(self,name): super().__init__(name) # 继承父类,初始化名称 self.get_logger().info("大家好,我是%s!" % name) self.publisher_node = self.create_publisher(Imu, 'imu_data', 1) # 创建发布imu数据的发布者到话题:imu_data上 # 串口初始化 self.IMU_Usart = serial.Serial( port='/dev/ttyUSB0', # 串口 baudrate=115200, # 波特率 timeout=0.001 # 由于后续使用read_all按照一个timeout周期时间读取数据 # imu在波特率115200返回数据时间大概是1ms,9600下大概是10ms # 所以读取时间设置0.001s ) # 接收数据初始化 self.ACC_X: float = 0.0 # X轴加速度 self.ACC_Y: float = 0.0 # Y轴加速度 self.ACC_Z: float = 0.0 # Z轴加速度 self.GYRO_X: float = 0.0 # X轴陀螺仪 self.GYRO_Y: float = 0.0 # Y轴陀螺仪 self.GYRO_Z: float = 0.0 # Z轴陀螺仪 self.roll: float = 0.0 # 横滚角 self.pitch: float = 0.0 # 俯仰角 self.yaw: float = 0.0 # 航向角 self.leve: float = 0.0 # 磁场校准精度 self.temp: float = 0.0 # 器件温度 self.MAG_X: float = 0.0 # 磁场X轴 self.MAG_Y: float = 0.0 # 磁场Y轴 self.MAG_Z: float = 0.0 # 磁场Z轴 self.Q0: float = 0.0 # 四元数Q0.0 self.Q1: float = 0.0 # 四元数Q1 self.Q2: float = 0.0 # 四元数Q2 self.Q3: float = 0.0 # 四元数Q3 # 判断串口是否打开成功 if self.IMU_Usart.isOpen(): print("open success") else: print("open failed") # 回调函数返回周期 time_period = 0.001 self.timer = self.create_timer(time_period, self.timer_callback) def timer_callback(self): """ 定时器回调函数 """ # ----读取IMU的内部数据----------------------------------- try: count = self.IMU_Usart.inWaiting() if count > 0: # 发布sensor_msgs/Imu 数据类型 imu_data = Imu() imu_data.header.frame_id = "map" imu_data.header.stamp = self.get_clock().now().to_msg() imu_data.linear_acceleration.x = self.ACC_X imu_data.linear_acceleration.y = self.ACC_Y imu_data.linear_acceleration.z = self.ACC_Z imu_data.angular_velocity.x = self.GYRO_X * 3.1415926 / 180.0 # unit transfer to rad/s imu_data.angular_velocity.y = self.GYRO_Y * 3.1415926 / 180.0 imu_data.angular_velocity.z = self.GYRO_Z * 3.1415926 / 180.0 imu_data.orientation.x = self.Q0 imu_data.orientation.y = self.Q1 imu_data.orientation.z = self.Q2 imu_data.orientation.w = self.Q3 self.publisher_node.publish(imu_data) # 发布imu的数据 self.get_logger().info(f'发布了指令:{imu_data.header.frame_id}') #打印一下发布的数据 # -------------------------------------------------------- # print('读取中') except KeyboardInterrupt: if serial != None: print("close serial port") self.IMU_Usart.close() def main(args=None): """ ros2运行该节点的入口函数 编写ROS2节点的一般步骤 1. 导入库文件 2. 初始化客户端库 3. 新建节点对象 4. spin循环节点 5. 关闭客户端库 """ rclpy.init(args=args) # 初始化rclpy node = IMUPublisher("imu_publisher") # 新建一个节点 rclpy.spin(node) # 保持节点运行,检测是否收到退出指令(Ctrl+C) rclpy.shutdown() # 关闭rclpy 2.2 2 编写订阅者imu_subscribe.py
# -*- coding: utf-8 -*- import rclpy from rclpy.node import Node # 话题接口 from sensor_msgs.msg import Imu # imu接口 class IMUSubscribe(Node): def __init__(self,name): super().__init__(name) self.get_logger().info("大家好,我是%s!" % name) self.imu_subscribe_node = self.create_subscription(Imu, 'imu_data',self.imu_callback, 1) # 创建发布imu数据的发布者到话题:imu_data上 def imu_callback(self, imu_data): self.get_logger().info(f'收到[{imu_data.header.frame_id}]命令') def main(args=None): rclpy.init(args=args) # 初始化rclpy node = IMUSubscribe("imu_subscribe") # 新建一个节点 rclpy.spin(node) # 保持节点运行,检测是否收到退出指令(Ctrl+C) rclpy.shutdown() # 关闭rclpy 2.2.3 运行测试修改下setup.py
entry_points={ 'console_scripts': [ "imu_publisher=imu_py.imu_publisher:main", "imu_subscribe=imu_py.imu_subscribe:main", ], },发布节点
colcon build --packages-select imu_py source install/setup.bash ros2 run imu_py imu_publisher订阅节点
source install/setup.bash ros2 run imu_py imu_subscribe可视化
rqt上图中的节点名称取决于下图
3 ROS2服务
服务分为客户端和服务端,服务-客户端模型也可以称为请求-响应模型,不同于话题是没有返回的,适用于单向或大量的数据传递。而服务是双向的,客户端发送请求,服务端响应请求。
注意: 同一个服务(名称相同)有且只能有一个节点来提供 同一个服务可以被多个客户端调用
3.1 ROS2服务指令启动服务端
ros2 run examples_rclpy_minimal_service service1.查看所有服务
ros2 service list2.手动调用服务
ros2 service call /add_two_ints example_interfaces/srv/AddTwoInts "{a: 5,b: 10}"3.查看服务接口类型
ros2 service type <service_name>4.查找使用某一接口的服务
ros2 service find example_interfaces/srv/AddTwoInts 3.2 服务之RCLPY实现创建节点文件
cd ROS_WS/colcon_ws/src/imu_py touch imu_client.py touch imu_server.py 3.2.1 编写客户端imu_client.py
# -*- coding: utf-8 -*- import rclpy from rclpy.node import Node from example_interfaces.srv import AddTwoInts class ServiceClient(Node): def __init__(self,name): super().__init__(name) self.get_logger().info("节点已启动:%s!" % name) self.client_ = self.create_client(AddTwoInts,"add_two_ints_srv") def result_callback_(self, result_future): response = result_future.result() self.get_logger().info(f"收到返回结果:{response.sum}") def send_request(self, a, b): while rclpy.ok() and self.client_.wait_for_service(1)==False: self.get_logger().info(f"等待服务端上线....") request = AddTwoInts.Request() request.a = a request.b = b self.client_.call_async(request).add_done_callback(self.result_callback_) def main(args=None): rclpy.init(args=args) # 初始化rclpy node = ServiceClient("service_client") # 新建一个节点 # 调用函数发送请求 node.send_request(3,4) rclpy.spin(node) # 保持节点运行,检测是否收到退出指令(Ctrl+C) rclpy.shutdown() # 关闭rclpy 3.2 2 编写服务端imu_server.py
# -*- coding: utf-8 -*- import rclpy from rclpy.node import Node # 导入接口 from example_interfaces.srv import AddTwoInts class ServiceServer(Node): def __init__(self,name): super().__init__(name) self.get_logger().info("节点已启动:%s!" % name) self.add_ints_server_ = self.create_service(AddTwoInts,"add_two_ints_srv", self.handle_add_two_ints) def handle_add_two_ints(self,request, response): self.get_logger().info(f"收到请求,计算{request.a} + {request.b}") response.sum = request.a + request.b return response def main(args=None): rclpy.init(args=args) # 初始化rclpy node = ServiceServer("service_server") # 新建一个节点 rclpy.spin(node) # 保持节点运行,检测是否收到退出指令(Ctrl+C) rclpy.shutdown() # 关闭rclpy 3.2.3 运行测试修改下setup.py
entry_points={ 'console_scripts': [ "imu_client=imu_py.imu_client:main", "imu_server=imu_py.imu_server:main" ], },发布节点
colcon build --packages-select imu_py source install/setup.bash ros2 run imu_py imu_client订阅节点
source install/setup.bash ros2 run imu_py imu_server可视化
rqt【ROS2】MOMO的鱼香ROS2(四)ROS2入门篇——ROS2节点通信之话题与服务由讯客互联开源代码栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“【ROS2】MOMO的鱼香ROS2(四)ROS2入门篇——ROS2节点通信之话题与服务”
下一篇
石头剪刀布游戏-华为OD统一考试