在 ROS 系統中,通信接口(Interface) 是節點之間傳遞信息的標準“語言協議”,確保了不同功能節點之間可以正確理解和使用彼此傳送的數據內容。我們可以將其理解為“數據結構+格式定義”,貫穿于話題(Topic)、服務(Service)、動作(Action)等通信機制中。
在軟件開發中,“接口”是一種連接關系,它規定了數據如何進出,模塊如何對接,只有兩邊的格式和要求一致,系統才能正常“搭伙做事”。
為了讓每個 ROS 節點可以用不同的語言編寫,比如一個節點用 C++ 控制硬件,另一個節點用 Python 實現上層邏輯,ROS 把通信接口設計成與編程語言無關的格式。
int32
表示 32 位整型
int64
表示 64 位長整型
bool
表示布爾值
還支持數組(如 int32[]
)、嵌套結構體等復合數據類型
這些接口定義寫在 .msg
(消息)或 .srv
(服務)文件中。
在 編譯時,ROS 會自動生成對應語言(如 C++ 或 Python)里的數據結構文件,供節點直接使用,開發者不需要自己去寫解析邏輯。
無論你用什么語言編寫節點,只要遵循統一接口,數據就能準確傳輸、協同工作。
ROS 2 三種通信機制(話題、服務、動作)在接口定義上的格式差異。
在 ROS 2 系統中,通信接口的定義格式因通信機制而異,但都遵循統一的、語言無關的描述方式。
話題(Topic)通信
使用 .msg
文件定義,屬于單向異步通信。
只需定義每一幀數據的內容格式,如:
int32 x
int32 y
表示消息中包含兩個 32 位整型數據,可用于發送二維坐標等。
服務(Service)通信
使用 .srv
文件定義,屬于請求-應答式的同步通信。
定義由請求部分和應答部分組成,中間用 ---
分隔,例如:
int64 a
int64 b
---
int64 sum
客戶端發起請求,包含 a
和 b
,服務器處理后返回 sum
。
動作(Action)通信
使用 .action
文件定義,適合描述持續一段時間的過程性任務,如移動、旋轉、導航等。
定義包含三部分:
# 目標
bool enable
---
# 結果
bool finish
---
# 反饋
int32 state
目標(Goal):開始任務,如開始轉動
結果(Result):任務最終是否完成
反饋(Feedback):周期性返回進度,如當前已轉動角度
通信機制 | 接口文件 | 特點 | 數據方向 |
---|---|---|---|
話題 | .msg | 異步、廣播 | 單向 |
服務 | .srv | 同步請求 | 客戶端 ? 服務端 |
動作 | .action | 可反饋過程 | 客戶端 ? 服務端 + 反饋 |
服務接口的定義與使用
在本案例中,我們以一個獲取目標位置的服務為例,全面了解 ROS 2 中服務接口的定義方法及實際應用方式。
learning_interface/srv/GetObjectPosition.srv
bool get ? ? # 請求:是否獲取目標位置
---
int32 x ? ? ?# 響應:目標X坐標
int32 y ? ? ?# 響應:目標Y坐標
bool get
:客戶端請求參數,true 表示請求當前目標位置。
---
:分隔請求與響應。
int32 x, y
:服務端反饋的目標坐標。
import rclpy # ROS2 Python接口庫
from rclpy.node import Node # ROS2 節點類
from learning_interface.srv import GetObjectPosition # 自定義的服務接口class objectClient(Node):def __init__(self, name):super().__init__(name) # ROS2節點父類初始化self.client = self.create_client(GetObjectPosition, 'get_target_position')while not self.client.wait_for_service(timeout_sec=1.0):self.get_logger().info('service not available, waiting again...')self.request = GetObjectPosition.Request()def send_request(self):self.request.get = Trueself.future = self.client.call_async(self.request)def main(args=None):rclpy.init(args=args) # ROS2 Python接口初始化node = objectClient("service_object_client") # 創建ROS2節點對象并進行初始化node.send_request()while rclpy.ok():rclpy.spin_once(node)if node.future.done():try:response = node.future.result()except Exception as e:node.get_logger().info('Service call failed %r' % (e,))else:node.get_logger().info('Result of object position:\n x: %d y: %d' %(response.x, response.y))breaknode.destroy_node() # 銷毀節點對象rclpy.shutdown() # 關閉ROS2 Python接口import rclpy # ROS2 Python接口庫
from rclpy.node import Node # ROS2 節點類
from sensor_msgs.msg import Image # 圖像消息類型
import numpy as np # Python數值計算庫
from cv_bridge import CvBridge # ROS與OpenCV圖像轉換類
import cv2 # Opencv圖像處理庫
from learning_interface.srv import GetObjectPosition # 自定義的服務接口lower_red = np.array([0, 90, 128]) # 紅色的HSV閾值下限
upper_red = np.array([180, 255, 255]) # 紅色的HSV閾值上限class ImageSubscriber(Node):def __init__(self, name):super().__init__(name) # ROS2節點父類初始化self.sub = self.create_subscription(Image, 'image_raw', self.listener_callback, 10) # 創建訂閱者對象(消息類型、話題名、訂閱者回調函數、隊列長度)self.cv_bridge = CvBridge() # 創建一個圖像轉換對象,用于OpenCV圖像與ROS的圖像消息的互相轉換self.srv = self.create_service(GetObjectPosition, # 創建服務器對象(接口類型、服務名、服務器回調函數)'get_target_position',self.object_position_callback) self.objectX = 0self.objectY = 0 def object_detect(self, image):hsv_img = cv2.cvtColor(image, cv2.COLOR_BGR2HSV) # 圖像從BGR顏色模型轉換為HSV模型mask_red = cv2.inRange(hsv_img, lower_red, upper_red) # 圖像二值化contours, hierarchy = cv2.findContours(mask_red, cv2.RETR_LIST, cv2.CHAIN_APPROX_NONE) # 圖像中輪廓檢測for cnt in contours: # 去除一些輪廓面積太小的噪聲if cnt.shape[0] < 150:continue(x, y, w, h) = cv2.boundingRect(cnt) # 得到蘋果所在輪廓的左上角xy像素坐標及輪廓范圍的寬和高cv2.drawContours(image, [cnt], -1, (0, 255, 0), 2)# 將蘋果的輪廓勾勒出來cv2.circle(image, (int(x+w/2), int(y+h/2)), 5,(0, 255, 0), -1) # 將蘋果的圖像中心點畫出來self.objectX = int(x+w/2)self.objectY = int(y+h/2)cv2.imshow("object", image) # 使用OpenCV顯示處理后的圖像效果cv2.waitKey(50)def listener_callback(self, data):self.get_logger().info('Receiving video frame') # 輸出日志信息,提示已進入回調函數image = self.cv_bridge.imgmsg_to_cv2(data, 'bgr8') # 將ROS的圖像消息轉化成OpenCV圖像self.object_detect(image) # 蘋果檢測def object_position_callback(self, request, response): # 創建回調函數,執行收到請求后對數據的處理if request.get == True:response.x = self.objectX # 目標物體的XY坐標response.y = self.objectYself.get_logger().info('Object position\nx: %d y: %d' %(response.x, response.y)) # 輸出日志信息,提示已經反饋else:response.x = 0response.y = 0self.get_logger().info('Invalid command') # 輸出日志信息,提示已經反饋return responsedef main(args=None): # ROS2節點主入口main函數rclpy.init(args=args) # ROS2 Python接口初始化node = ImageSubscriber("service_object_server") # 創建ROS2節點對象并進行初始化rclpy.spin(node) # 循環等待ROS2退出node.destroy_node() # 銷毀節點對象rclpy.shutdown() # 關閉ROS2 Python接口
這是一個通過圖像識別紅色目標物體并通過ROS服務機制獲取其當前坐標的系統。
服務器端(服務提供方):接收圖像并處理,檢測紅色物體位置,一旦客戶端請求,就返回該位置。
客戶端(服務調用方):主動發起“請告訴我目標位置”的請求,獲取最新坐標。
在 ROS2 中,通信接口(Interface) 是一種標準格式的“數據結構定義”,用于描述節點間如何傳遞數據。
對于 服務通信(Service) 來說,接口包括兩部分:
請求部分(Request):客戶端發送給服務端的數據。
響應部分(Response):服務端返回給客戶端的結果。
接口使用 .srv
文件定義,例如你定義的:
定義通信接口文件 .srv
bool get
---
int32 x
int32 y
在服務端創建服務對象,綁定回調
self.srv = self.create_service(
? ? GetObjectPosition, ? ? ? ? ? ? ? ? ? # 使用定義好的接口
? ? 'get_target_position', ? ? ? ? ? ? ?# 服務名
? ? self.object_position_callback ? ? ? # 處理函數
)
在客戶端創建服務請求并發送
self.client = self.create_client(GetObjectPosition, 'get_target_position')
self.request = GetObjectPosition.Request()
self.request.get = True
self.future = self.client.call_async(self.request)
服務端處理請求并返回響應
def object_position_callback(self, request, response):
? ? if request.get:
? ? ? ? response.x = self.objectX
? ? ? ? response.y = self.objectY
? ? return response
再次說明,# 文件:learning_interface/srv/GetObjectPosition.srv
bool get
---
int32 x
int32 y
系統會自動生成:
一個 GetObjectPosition.Request
類(含 get
字段)
一個 GetObjectPosition.Response
類(含 x
和 y
字段)
所以,你在代碼中哪里寫了 .get
、.x
、.y
,就說明用了通信接口定義的字段!
通信接口 .srv
文件是怎么關聯上的?
在代碼中,首先導入:# 文件:learning_interface/srv/GetObjectPosition.srv
from learning_interface.srv import GetObjectPosition
意思是:你定義的服務接口 GetObjectPosition.srv
,位于 ROS功能包 learning_interface
的 srv/
文件夾里。
你就可以在你項目的工作空間中查找該文件:cd ~/ros2_ws/src/learning_interface/srv
ls
# 應該能看到 GetObjectPosition.srv