CyberRT共享內存類圖
共享內存消息發布
數據用共享內存發布時,首先會創建ShmTransmitter對象,包含兩個主要成員segment和notifier,Segment用于創建共享內存(上面綠色部分),Notifer 最終構建ReadableInfo通知給其他進程。
使用哪個ConditionNotifier-> notify或MulticastNotifier->notify,是在創建時根據配置文件決定的。
ConditionNotifier 在構建時會創建Indicator對象保存到共享內存中。
調ConditionNotifier-> notify,實際時將ReadableInfo保存到Indicator對象。
ConditionNotifier 共享內存數據接收
在接收數據時,也會創建同樣的共享內存。如果共享內存存在,則直接打開。
在接收端也有同樣的共享內存操作ConditionNotifier 。
ShmDispatcher會持有多個通道segment,用std::unordered_map<channelid, segment>表示。
同時啟動一個后臺線程ThreadFunc 線程輪詢處理消息回調。
void ShmDispatcher::ThreadFunc() {ReadableInfo readable_info;// 輪詢處理while (!is_shutdown_.load()) {// 100ms, Listen會轉換100000 ms,對比seq,如果不等處理消息。每次輪詢會等待遞減50ms。if (!notifier_->Listen(100, &readable_info)) {ADEBUG << "listen failed.";continue;}if (readable_info.host_id() != host_id_) {ADEBUG << "shm readable info from other host.";continue;}//從共享內存Indicator中讀出的數據uint64_t channel_id = readable_info.channel_id();uint32_t block_index = readable_info.block_index();{ReadLockGuard<AtomicRWLock> lock(segments_lock_);if (segments_.count(channel_id) == 0) {continue;}// check block index// std::unordered_map<uint64_t, uint32_t> previous_indexes_; // 保存key: channelID, value: block_indexif (previous_indexes_.count(channel_id) == 0) {previous_indexes_[channel_id] = UINT32_MAX;}uint32_t& previous_index = previous_indexes_[channel_id];if (block_index != 0 && previous_index != UINT32_MAX) {if (block_index == previous_index) {ADEBUG << "Receive SAME index " << block_index << " of channel "<< channel_id;} else if (block_index < previous_index) {ADEBUG << "Receive PREVIOUS message. last: " << previous_index<< ", now: " << block_index;} else if (block_index - previous_index > 1) {ADEBUG << "Receive JUMP message. last: " << previous_index<< ", now: " << block_index;}}previous_index = block_index;ReadMessage(channel_id, block_index);}}
}
MulticastNotifier共享內存數據接收
MulticastNotifier時采用多播socket實現的,默認
std::string mcast_ip("239.255.0.100");
uint16_t mcast_port = 8888;
創建兩個socket notify_fd_ 用于發生消息,listen_addr用于接收消息。
在發送端調用Notify時,時調的MulticastNotifier::Nofify(const ReadableInfo& info)
bool MulticastNotifier::Notify(const ReadableInfo& info) {if (is_shutdown_.load()) {return false;}std::string info_str;info.SerializeTo(&info_str);ssize_t nbytes =sendto(notify_fd_, info_str.c_str(), info_str.size(), 0,(struct sockaddr*)¬ify_addr_, sizeof(notify_addr_));return nbytes > 0;
}
接收端用同樣的方式輪詢
bool MulticastNotifier::Listen(int timeout_ms, ReadableInfo* info) {if (is_shutdown_.load()) {return false;}if (info == nullptr) {AERROR << "info nullptr.";return false;}struct pollfd fds;fds.fd = listen_fd_;fds.events = POLLIN;int ready_num = poll(&fds, 1, timeout_ms);if (ready_num > 0) {char buf[32] = {0}; // larger than ReadableInfo::kSizessize_t nbytes = recvfrom(listen_fd_, buf, 32, 0, nullptr, nullptr);if (nbytes == -1) {AERROR << "fail to recvfrom, " << strerror(errno);return false;}return info->DeserializeFrom(buf, nbytes);} else if (ready_num == 0) {ADEBUG << "timeout, no readableinfo.";} else {if (errno == EINTR) {AINFO << "poll was interrupted.";} else {AERROR << "fail to poll, " << strerror(errno);}}return false;
}
bool Block::TryLockForWrite() {int32_t rw_lock_free = kRWLockFree;//lock_num_ == rw_lock_free, kWriteExclusive賦值給lock_num_,返回true//lock_num_ != rw_lock_free, lock_num_賦值給rw_lock_free,返回falseif (!lock_num_.compare_exchange_weak(rw_lock_free, kWriteExclusive,std::memory_order_acq_rel,std::memory_order_relaxed)) {ADEBUG << "lock num: " << lock_num_.load();return false;}return true;
}
總結
1、CyberRT的共享內存讀寫都時需要加鎖的。
2、每次寫數據可以是不連續的block
3、每次當Block.lock_num_= 0:空閑,>0:有讀操作, -1 : 寫操作。
效率不是高。