RoGBAG 和 MCAP 都是機器人領域常用的二進制數據格式,用于存儲傳感器數據、控制命令和狀態信息。兩者主要區別在于:
RoGBAG:ROS 1/2 的標準日志格式,采用 LZF/LZ4 壓縮,適合中小型數據集
MCAP:新一代機器人數據標準,支持高效隨機訪問、增量寫入和多語言 SDK
數據處理應用架構設計
一個完整的 RoGBAG/MCAP 數據處理應用可以包含以下核心模塊:
文件解析層:負責格式識別、數據提取和元數據解析
數據處理層:提供濾波、轉換、特征提取等算法
存儲層:支持格式轉換、數據導出和索引構建
可視化層:提供波形圖、點云顯示、軌跡可視化等功能
應用層:實現特定領域的分析工具和工作流
基于 Python 的基礎實現示例
下面是一個基于 Python 的 RoGBAG/MCAP 數據處理應用框架示例:
import os
import logging
import numpy as np
from typing import Dict, List, Any, Optional, Callable# 第三方庫
try:import rosbag # ROS 1 Bag處理
except ImportError:logging.warning("rosbag庫未安裝,無法處理ROS 1 Bag文件")try:import mcap # MCAP處理from mcap_ros2.reader import read_ros2_messages # ROS 2 MCAP支持
except ImportError:logging.warning("mcap庫未安裝,無法處理MCAP文件")class DataProcessor:"""數據處理核心類"""def __init__(self):self.data_buffer = {} # 緩存解析后的數據self.metadata = {} # 存儲元數據self.plugins = [] # 注冊的數據處理插件def load_file(self, file_path: str) -> bool:"""根據文件擴展名自動加載對應的文件格式"""ext = os.path.splitext(file_path)[1].lower()if ext == '.bag':return self._load_rosbag(file_path)elif ext == '.mcap':return self._load_mcap(file_path)else:logging.error(f"不支持的文件格式: {ext}")return Falsedef _load_rosbag(self, file_path: str) -> bool:"""加載ROS Bag文件"""try:with rosbag.Bag(file_path, 'r') as bag:# 提取元數據self.metadata['duration'] = bag.get_end_time() - bag.get_start_time()self.metadata['message_count'] = bag.get_message_count()self.metadata['topics'] = list(bag.get_type_and_topic_info()[1].keys())# 提取數據for topic, msg, t in bag.read_messages():if topic not in self.data_buffer:self.data_buffer[topic] = {'timestamps': [], 'messages': []}self.data_buffer[topic]['timestamps'].append(t.to_sec())self.data_buffer[topic]['messages'].append(msg)logging.info(f"成功加載ROS Bag文件: {file_path}")return Trueexcept Exception as e:logging.error(f"加載ROS Bag文件失敗: {e}")return Falsedef _load_mcap(self, file_path: str) -> bool:"""加載MCAP文件"""try:with open(file_path, "rb") as f:reader = mcap.Reader(f)metadata = list(reader.get_metadata())self.metadata['metadata'] = {m.name: m.data for m in metadata}# 處理ROS 2消息for msg in read_ros2_messages(file_path):topic = msg.channel.topicif topic not in self.data_buffer:self.data_buffer[topic] = {'timestamps': [], 'messages': []}self.data_buffer[topic]['timestamps'].append(msg.log_time / 1e9) # 轉換為秒self.data_buffer[topic]['messages'].append(msg.ros_msg)logging.info(f"成功加載MCAP文件: {file_path}")return Trueexcept Exception as e:logging.error(f"加載MCAP文件失敗: {e}")return Falsedef register_plugin(self, plugin: Callable) -> None:"""注冊數據處理插件"""self.plugins.append(plugin)def process_data(self) -> Dict[str, Any]:"""應用所有注冊的插件處理數據"""results = {}for plugin in self.plugins:try:plugin_name = plugin.__name__results[plugin_name] = plugin(self.data_buffer)except Exception as e:logging.error(f"插件 {plugin.__name__} 執行失敗: {e}")return resultsdef export_to_mcap(self, output_path: str) -> bool:"""將當前數據導出為MCAP格式"""try:from mcap.writer import Writerwith open(output_path, "wb") as f:writer = Writer(f)writer.start()# 創建通道映射channel_map = {}for topic, data in self.data_buffer.items():# 簡化處理,實際應用需要根據消息類型確定schemaschema_id = writer.register_schema(name=topic.split('/')[-1],encoding="ros2",data=b"", # 實際應用中需要提取消息定義)channel_id = writer.register_channel(schema_id=schema_id,topic=topic,message_encoding="ros2",)channel_map[topic] = channel_id# 寫入消息for topic, data in self.data_buffer.items():channel_id = channel_map[topic]for ts, msg in zip(data['timestamps'], data['messages']):# 實際應用需要將ROS消息序列化為字節message_data = b"" # 簡化處理writer.add_message(channel_id=channel_id,log_time=int(ts * 1e9), # 轉換為納秒data=message_data,publish_time=int(ts * 1e9),)writer.finish()logging.info(f"成功導出為MCAP文件: {output_path}")return Trueexcept Exception as e:logging.error(f"導出為MCAP文件失敗: {e}")return False# 示例數據處理插件
def filter_low_frequency(data: Dict[str, Any]) -> Dict[str, Any]:"""過濾低頻數據的插件"""filtered_data = {}for topic, topic_data in data.items():if 'timestamps' in topic_data and len(topic_data['timestamps']) > 1:# 計算平均頻率dt = np.diff(topic_data['timestamps'])avg_freq = 1.0 / np.mean(dt)if avg_freq > 1.0: # 保留頻率高于1Hz的數據filtered_data[topic] = topic_datareturn filtered_datadef calculate_statistics(data: Dict[str, Any]) -> Dict[str, Any]:"""計算數據統計信息的插件"""stats = {}for topic, topic_data in data.items():if 'messages' in topic_data:stats[topic] = {'message_count': len(topic_data['messages']),'start_time': min(topic_data['timestamps']) if topic_data['timestamps'] else 0,'end_time': max(topic_data['timestamps']) if topic_data['timestamps'] else 0,}return stats# 應用示例
if __name__ == "__main__":processor = DataProcessor()# 加載數據文件if processor.load_file("sample.bag"):# 注冊處理插件processor.register_plugin(filter_low_frequency)processor.register_plugin(calculate_statistics)# 處理數據results = processor.process_data()# 輸出統計信息stats = results.get('calculate_statistics', {})for topic, stat in stats.items():print(f"Topic: {topic}")print(f" Messages: {stat['message_count']}")print(f" Duration: {stat['end_time'] - stat['start_time']:.2f}s")# 導出處理后的數據processor.export_to_mcap("processed_data.mcap")
應用開發建議
- 技術選型:
對于 Python 應用,推薦使用rosbag、mcap和rosbags庫
對于 C++ 應用,可使用rosbag、mcap-cpp和rclcpp
前端可視化可考慮Plotly、Three.js或WebGL - 性能優化:
大數據集處理時使用內存映射技術
實現多線程 / 異步處理
構建數據索引以支持快速隨機訪問 - 擴展功能:
添加數據轉換功能(如坐標系統轉換)
實現數據標注和標簽管理
開發機器學習模型訓練數據準備工具
這個框架可以根據具體需求進行擴展,添加更多的數據處理功能和可視化模塊。在實際開發中,還需要考慮用戶界面設計、錯誤處理和性能優化等方面。