話題
節點之間的通信。
叫話題很形象。發布者發布一定數據類型的話題,訂閱者訂閱發布者。
-
訂閱者發布者不唯一。
-
異步通信,適用于周期發布的數據而不是邏輯性強的數據。
-
.msg 格式的消息結構,一種通信接口。
每個話題 topic 有話題名,數據類型和數據三個屬性。
Hello world
pub 按一定周期發布 helloworld 信息,sub 接收。
pub:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-"""
@作者: 古月居(www.guyuehome.com)
@說明: ROS2話題示例-發布“Hello World”話題
"""import rclpy # ROS2 Python接口庫
from rclpy.node import Node # ROS2 節點類
from std_msgs.msg import String # 字符串消息類型"""
創建一個發布者節點
"""
class PublisherNode(Node):def __init__(self, name):super().__init__(name) # ROS2節點父類初始化self.pub = self.create_publisher(String, "chatter", 10) # 創建發布者對象(消息類型、話題名、隊列長度)self.timer = self.create_timer(0.5, self.timer_callback) # 創建一個定時器(單位為秒的周期,定時執行的回調函數)def timer_callback(self): # 創建定時器周期執行的回調函數msg = String() # 創建一個String類型的消息對象msg.data = 'Hello World' # 填充消息對象中的消息數據self.pub.publish(msg) # 發布話題消息self.get_logger().info('Publishing: "%s"' % msg.data) # 輸出日志信息,提示已經完成話題發布def main(args=None): # ROS2節點主入口main函數rclpy.init(args=args) # ROS2 Python接口初始化node = PublisherNode("topic_helloworld_pub") # 創建ROS2節點對象并進行初始化rclpy.spin(node) # 循環等待ROS2退出node.destroy_node() # 銷毀節點對象rclpy.shutdown() # 關閉ROS2 Python接口
sub:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-"""
@作者: 古月居(www.guyuehome.com)
@說明: ROS2話題示例-訂閱“Hello World”話題消息
"""import rclpy # ROS2 Python接口庫
from rclpy.node import Node # ROS2 節點類
from std_msgs.msg import String # ROS2標準定義的String消息"""
創建一個訂閱者節點
"""
class SubscriberNode(Node):def __init__(self, name):super().__init__(name) # ROS2節點父類初始化self.sub = self.create_subscription(\String, "chatter", self.listener_callback, 10) # 創建訂閱者對象(消息類型、話題名、訂閱者回調函數、隊列長度)def listener_callback(self, msg): # 創建回調函數,執行收到話題消息后對數據的處理self.get_logger().info('I heard: "%s"' % msg.data) # 輸出日志信息,提示訂閱收到的話題消息def main(args=None): # ROS2節點主入口main函數rclpy.init(args=args) # ROS2 Python接口初始化node = SubscriberNode("topic_helloworld_sub") # 創建ROS2節點對象并進行初始化rclpy.spin(node) # 循環等待ROS2退出node.destroy_node() # 銷毀節點對象rclpy.shutdown() # 關閉ROS2 Python接口
物品識別
之前 webcam 最好是分開寫為兩個節點之間的通信,一個是攝像頭,一個是呈現的視頻。
主要是拿到識別的圖像數據之后不是第一時間就調用 opencv 算法處理,而是發給 sub,sub 進行處理。
這個發送的過程中的消息數據類型應當是 imgmsg 類型,所以需要一個 opencv -> imgmsg 類型的轉換方法。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-"""
@作者: 古月居(www.guyuehome.com)
@說明: ROS2話題示例-發布圖像話題
"""import rclpy # ROS2 Python接口庫
from rclpy.node import Node # ROS2 節點類
from sensor_msgs.msg import Image # 圖像消息類型
from cv_bridge import CvBridge # ROS與OpenCV圖像轉換類
import cv2 # Opencv圖像處理庫"""
創建一個發布者節點
"""
class ImagePublisher(Node):def __init__(self, name):super().__init__(name) # ROS2節點父類初始化self.publisher_ = self.create_publisher(Image, 'image_raw', 10) # 創建發布者對象(消息類型、話題名、隊列長度)self.timer = self.create_timer(0.1, self.timer_callback) # 創建一個定時器(單位為秒的周期,定時執行的回調函數)self.cap = cv2.VideoCapture(0) # 創建一個視頻采集對象,驅動相機采集圖像(相機設備號)self.cv_bridge = CvBridge() # 創建一個圖像轉換對象,用于稍后將OpenCV的圖像轉換成ROS的圖像消息def timer_callback(self):ret, frame = self.cap.read() # 一幀一幀讀取圖像if ret == True: # 如果圖像讀取成功self.publisher_.publish(self.cv_bridge.cv2_to_imgmsg(frame, 'bgr8')) # 發布圖像消息self.get_logger().info('Publishing video frame') # 輸出日志信息,提示已經完成圖像話題發布def main(args=None): # ROS2節點主入口main函數rclpy.init(args=args) # ROS2 Python接口初始化node = ImagePublisher("topic_webcam_pub") # 創建ROS2節點對象并進行初始化rclpy.spin(node) # 循環等待ROS2退出node.destroy_node() # 銷毀節點對象rclpy.shutdown() # 關閉ROS2 Python接口
sub 代碼:
class ImageSubscriber(Node):def __init__(self, name):super().__init__(name) # ROS2節點父類初始化self.sub = self.create_subscription(Image, 'image_raw', self.listener_callback, 10) # 創建訂閱者對象(消息類型、話題名、訂閱者回調函數、隊列長度)self.cv_bridge = CvBridge() # 創建一個圖像轉換對象,用于OpenCV圖像與ROS的圖像消息的互相轉換def object_detect(self, image):hsv_img = cv2.cvtColor(image, cv2.COLOR_BGR2HSV) # 圖像從BGR顏色模型轉換為HSV模型mask_red = cv2.inRange(hsv_img, lower_red, upper_red) # 圖像二值化contours, hierarchy = cv2.findContours(mask_red, cv2.RETR_LIST, cv2.CHAIN_APPROX_NONE) # 圖像中輪廓檢測for cnt in contours: # 去除一些輪廓面積太小的噪聲if cnt.shape[0] < 150:continue(x, y, w, h) = cv2.boundingRect(cnt) # 得到蘋果所在輪廓的左上角xy像素坐標及輪廓范圍的寬和高cv2.drawContours(image, [cnt], -1, (0, 255, 0), 2) # 將蘋果的輪廓勾勒出來cv2.circle(image, (int(x+w/2), int(y+h/2)), 5,(0, 255, 0), -1) # 將蘋果的圖像中心點畫出來cv2.imshow("object", image) # 使用OpenCV顯示處理后的圖像效果cv2.waitKey(10)def listener_callback(self, data):self.get_logger().info('Receiving video frame') # 輸出日志信息,提示已進入回調函數image = self.cv_bridge.imgmsg_to_cv2(data, 'bgr8') # 將ROS的圖像消息轉化成OpenCV圖像self.object_detect(image) # 蘋果檢測
流程是 opencv 圖像數據轉換成 ros2 可以傳輸的圖像數據,傳輸給 sub,sub 再轉回來再調用 opencv 庫。
其實常用 usb 相機驅動基本都是一致的,所以 pub 節點我們不需要自己手動寫。可以用下面的命令替代 pub,有一行 ERR 不影響。
$ sudo apt install ros-humble-usb-cam
$ ros2 run usb_cam usb_cam_node_exe
ros2 topic bw 可以看帶寬數據量。
可以通過 /rqt_graph 查看節點關系圖。