本文主要介紹ROS的Topics概念,如何創建Publisher和Subscriber,通過Topic在ROS程序間通信;介紹ROS的Services概念,如何創建Client和Server并建立通信。
更多內容,訪問專欄目錄獲取實時更新。
ROS Topics
Topics可以被視為一種具名的總線,用于節點間交換數據,通過Topics可以發布和訂閱消息,實現單向的流式通信。需要注意的重點包括:
- 單向流式通信(發布/訂閱模式)
- 匿名通信
- 每個Topic都有特定的消息格式
- 可以在ROS節點中通過Python, C++等多種語言實現
- 一個節點可以擁有多個Topic
Publisher in Python
在ROS2基礎編程一文中,我們已經創建了工作空間和工作包,在此基礎上,來通過Python實現一個Topic的發布者。
在my_py_pkg工作包下創建一個新的文件robot_news_station.py
robot_news_station.py
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from example_interfaces.msg import String # do not forget to add package dependencyclass RobotNewsPublisherNode(Node):def __init__(self):super().__init__("robot_news_publisher")self.robot_name = "HiRobot"self.publisher = self.create_publisher(String, "robot_news", 10)self.timer = self.create_timer(1, self.publish_news)self.get_logger().info("Robot News Publisher has been started")def publish_news(self):msg = String()msg.data = "Hi, this is " + str(self.robot_name) + " from the robot news publisher"self.publisher.publish(msg)def main(args=None):rclpy.init(args=args)node = RobotNewsPublisherNode()rclpy.spin(node)rclpy.shutdown()if __name__ == "__main__":main()
還需要添加依賴包引用,修改package.xml文件:
然后裝載我們的新節點,修改setup.py:
之后執行下面的指令就可以啟動我們的publisher了:
colcon build --packages-select my_py_pkg --symlink-install
source ~/.bashrc
ros2 run my_py_pkg robot_news_station
ros2 topic echo /robot_news
Subscriber in Python
創建訂閱者的方式與創建發布者類似,這里我們添加了一個名為robot_news_subscriber.py的文件:
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from example_interfaces.msg import String # do not forget to add package dependencyclass RobotNewsSubscriberNode(Node):def __init__(self):super().__init__("robot_news_subscriber")self.subscriber = self.create_subscription(String, "robot_news", self.subscribe_news_callback, 10)self.get_logger().info("Robot News Subscriber has been started")def subscribe_news_callback(self, msg):self.get_logger().info(msg.data)def main(args=None):rclpy.init(args=args)node = RobotNewsSubscriberNode()rclpy.spin(node)rclpy.shutdown()if __name__ == "__main__":main()
不要忘記修改setup.py和colcon build
編譯。之后同時運行publisher和subscriber就可以看到右側命令行里的subscriber每1s收到一條來自publisher的信息,并打印:
使用命令行工具調試Topics
ros2 topic list
ros2 topic info <topic_name> # 查看所有激活的topic
ros2 topic echo <topic_name> # 創建一個訂閱者監聽發布的消息
ros2 interface show <msg_type> # 顯示話題消息的類型
ros2 topic hz <topic_name> # 獲取發布頻率
ros2 topic bw <topic_name> # 獲取消息的長度,byte width
ros2 topic pub <topic_name> <msg_type> <msg_data> # 發布一個topic
ros2 topic pub -r 10 /robot_news example_interfaces/msg/String "{data: 'hello from robot'}"
ros2 node list
ros2 info <node_name>
ros2 run <pkg_name> <node_name> --ros-args -r __node:=<new_node_name>
ros2 run <pkg_name> <node_name> --ros-args -r __node:=<new_node_name> -r <topic_name>:=<new_topic_name>
Ros Services
前文提到Topics實現的是單向的傳輸,通過發布/訂閱模式建立連接,但用在一些需要請求/回復的分布式系統中就不太合適了。
Services可以幫助我們實現請求/回復的通信模式,一條消息用于請求,一條用于回復,ROS節點以字符串名稱提供服務,客戶端通過發送請求消息并等待回復來調用服務,從而建立持久性的連接。
Service Server in Python
ros2 interface show example_interfaces/srv
執行上面的指令,你能看到example_interface包里提供了哪些服務,這里我們使用AddTwoInts來演示如何創建一個service server
add_two_ints_server.py
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoIntsclass AddTwoIntsServerNode(Node):def __init__(self):super().__init__("add_two_ints_server")self.server = self.create_service(AddTwoInts, "add_two_ints", self.callback_add_two_ints)self.get_logger().info("Add Two Ints Server has started")def callback_add_two_ints(self, request, response):response.sum = request.a + request.bself.get_logger().info(str(request.a) + " + " + str(request.b) + " = " + str(response.sum))return responsedef main(args=None):rclpy.init(args=args)node = AddTwoIntsServerNode()rclpy.spin(node)rclpy.shutdown()if __name__ == "__main__":main()
編譯并運行,通過ros2 service list
就能在服務列表里看到我們啟動的服務了。
ros2 service call /add_two_ints example_interfaces/srv/AddTwoInts "{a: 3, b: 4}"
執行上面的指令就可以在命令行里調用我們創建的service。
Service Client in Python
add_two_ints_client.py
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoIntsdef main(args=None):rclpy.init(args=args)node = Node("add_two_ints_no_oop")client = node.create_client(AddTwoInts, "add_two_ints")while not client.wait_for_service(1):node.get_logger().warn("Waiting for server Add Two Ints...")request = AddTwoInts.Request()request.a = 3request.b = 8future = client.call_async(request)rclpy.spin_until_future_complete(node, future)try:response = future.result()node.get_logger().info(str(request.a) + " + " + str(request.b) + " = " + str(response.sum))except Exception as e:node.get_logger().error("Service call failed %r" % (e,))rclpy.shutdown()if __name__ == "__main__":main()
使用命令行工具調試Services
ros2 service list
ros2 service type <service_name>
ros2 interface show <service type>
ros2 service call <service_name> <service_type> <value>
ros2 run <pkg_name> <node_name> --ros-args -r <service_name>:=<new_service_name>
ROS Interface
ROS應用之間有三種通信接口:messages, services和actions,ROS2使用IDL(interface definition language)來描述這些接口,使得在不同應用,不同編程語言間進行交互更加簡單。例如前文提到的Topic和Service:
Topic:
- Name:話題名
- 消息定義:Msg,如example_interfaces/msg/Int64
Service:
- Name: 服務名
- 服務定義:Srv,如example_interfaces/srv/AddTwoInts (包含request和response)
創建自定義的Msg
創建一個新的工作包my_robot_interfaces,移除目錄下的include和src文件夾,并新建msg文件夾:
在msg文件夾下新建msg定義文件: HardwareStatus.msg
int64 temperature
string debug_message
更新package.xml,添加:
<build_depend>rosidl_default_generators</build_depend><exec_depend>rosidl_default_runtime</exec_depend><member_of_group>rosidl_interface_packages</member_of_group>
更新CMakeLists.txt,添加:
find_package(rosidl_default_generators REQUIRED)rosidl_generate_interfaces(${PROJECT_NAME}"msg/HardwareStatus.msg"
)ament_export_dependencies(rosidl_default_runtime)
ament_package()
然后編譯工作包,就可以在其他工程中使用該Msg定義了。
使用自定義的Msg
在my_py_pkg工作包下創建一個新的publisher:hw_status_publisher.py
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from my_robot_interfaces.msg import HardwareStatusclass HardwareStatusPublisherNode(Node):def __init__(self):super().__init__("hardware_status_publisher")self.hw_status_publisher = self.create_publisher(HardwareStatus, "hardware_status", 10)self.timer = self.create_timer(1, self.publish_hw_status)self.get_logger().info("Hardware Status Publisher has been started")def publish_hw_status(self):msg = HardwareStatus()msg.temperature = 45msg.debug_message = "No error"self.hw_status_publisher.publish(msg)def main(args=None):rclpy.init(args=args)node = HardwareStatusPublisherNode()rclpy.spin(node)rclpy.shutdown()if __name__ == "__main__":main()
不要忘記在package.xml中添加對my_robot_interface的依賴,并把新的節點加載到setup.py并編譯。運行haredware_status_publisher并監聽,可以看到如下的效果:
創建自定義的Srv
在my_robot_interfaces工作包目錄下創建srv文件夾,并新建文件ComputerRectangleArea.srv
float64 length
float64 width
---
float64 area
更新CMakeLists.txt
成功編譯了工作包后我們就能夠獲得自定義的Srv了
如有錯誤,歡迎留言或來信指正:hbin6358@163.com