=========================================================================================
=============================== SharedMemSegment ?Start ==========================
// Fast-DDS/src/cpp/utils/shared_memory/SharedMemSegment.hpp
class SharedSegmentBase
{
=============================== 內部類 ?start ==========================
class Id
{
public:
typedef UUID<8> type;
Id(); ? // 返回共享內存變量的ID
Id(const Id& other); ?// 設置共享內存變量的ID
void generate(); ?// 給當前Id隨機生成一個UUID
private:
type uuid_;
}
=============================== 內部類 ?stop ==========================
public:
using sharable_lock = boost::interprocess::sharable_lock<M>;
using sharable_mutex = boost::interprocess::interprocess_sharable_mutex;
? using condition_variable = RobustInterprocessCondition;
using mutex = boost::interprocess::interprocess_mutex;
using named_mutex = boost::interprocess::named_mutex;
using spin_wait = boost::interprocess::spin_wait;
static constexpr uint32_t EXTRA_SEGMENT_SIZE = 512; ?// 每個共享內存變量需要額外的內存來維護其信息
public:
static deleted_unique_ptr<SharedSegmentBase::named_mutex> open_or_create_and_lock_named_mutex(const std::string& mutex_name); // 創建帶名稱的進程間互斥變量
void* get_address_from_offset(SharedSegment::Offset offset); ?// 返回當前內存塊上創建的對象的句柄對應的進程內地址
SharedSegment::Offset get_offset_from_address(void* address); ?// 返回當前內存塊上創建的對象在進程內地址所對應的對象句柄
private:
std::string name_; ?// 共享內存名稱?? ?
}
template<typename T, typename U>
class SharedSegment : public SharedSegmentBase
{
public:
typedef T managed_shared_memory_type; ?// boost::interprocess::basic_managed_shared_memory<...>或者boost::interprocess::basic_managed_mapped_file<...>
typedef U managed_shared_object_type;?? ? // boost::interprocess::shared_memory_object或者boost::interprocess::file_mapping
?? ?SharedSegment(boost::interprocess::create_only_t, const std::string& name, size_t size); ?// 構造函數,創建共享內存塊
static void remove(const std::string& name); ?// 移除共享內存塊(感覺就是unmap操作)
void remove(); ?// 移除當前SharedSegment的共享內存塊
Offset mem_size() const; ?// 以字節為單位返回SharedSegment的大小
private:
std::unique_ptr<managed_shared_memory_type> segment_; ?// boost::interprocess::basic_managed_shared_memory對象
}
using SharedMemSegment = SharedSegment<boost::interprocess::basic_managed_shared_memory<...>, boost::interprocess::shared_memory_object>
=============================== SharedMemSegment ?Stop ==========================
// Fast-DDS/src/cpp/utils/shared_memory/RobustInterprocessCondition.hpp
// 雙向列表的節點
struct SemaphoreNode
{
bi::interprocess_semaphore sem {0}; ? // 跨進程信號量
uint32_t next; ? ?? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ?// 指向上一個節點的索引
uint32_t prev;?? ? ?? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ?// 指向下一個節點的索引
};
// 雙向列表
class SemaphoreList
{
private:
uint32_t head_; ? ?// 指向列表開頭SemaphoreNode元素的索引
uint32_t tail_;?? ??? ? // 指向列表結尾SemaphoreNode元素的索引
public:
static constexpr uint32_t LIST_NULL = static_cast<uint32_t>(-1);
void push(uint32_t sem_index, SemaphoreNode* sem_pool); ?// 添加元素(的索引)到列表中
uint32_t pop(SemaphoreNode* sem_pool);?? ?// 從列表中移除并且返回尾部的元素的索引
uint32_t tail();?? ??? ?// 返回列表尾部的元素的索引
uint32_t head();?? ? ?// 返回列表頭部的元素的索引
void remove(uint32_t sem_index, SemaphoreNode* sem_pool); ?// 將sem_index索引指定的元素從列表中移除
}
// 進程間共享鎖
// 原理,在當前主機上創建文件,并且通過flock函數來執行鎖定和解鎖等操作, 從而完成進程間的鎖
// 對于共享鎖綁定的文件來說,使用前首先嘗試flock(fd, LOCK_EX | LOCK_NB)看下文件是否已經被別的進程上了互斥鎖
// 如果當前進程可以上LOCK_EX鎖,說明該文件沒有被別的進程上互斥鎖
// 這樣的情況下再通過flock(fd, LOCK_UN | LOCK_NB)上共享鎖
class RobustSharedLock
{
public:
RobustSharedLock(std::string& name,...); // 構造函數,其中會直接嘗試對文件上鎖
~RobustSharedLock();?? ?// 析構函數,其中會對文件的
static bool is_locked(const std::string& name); ?// 查詢name對應的文件是否被別的進程上了互斥鎖
static bool remove(const std::string& name); ?// 刪除name對應的文件
private:
std::string name_;?? ?// 文件名稱
int fd_;?? ??? ??? ??? ??? ??? ?// 文件描述符
}
// 進程間排他鎖(讀寫鎖)
class RobustExclusiveLock
{
// 結構基本等同于 RobustSharedLock 共享鎖,只是上鎖的flock函數中用的是LOCK_EX,而不是LOCK_SH。
}
// 進程間條件變量
class RobustInterprocessCondition
{
public:
void notify_one();?? ?// 喚醒list_listening_中尾部的listener
void notify_all(); ?// 喚醒list_listening_中所有的listener
private:
void init_sem_list(); ?// 初始化?? ?semaphores_pool_,將semaphores_pool_數組初始化為一個順序的雙向列表,每個Node的prev指向上一個Node, next指向下一個Node
uint32_t enqueue_listener(); ?// 從list_free_中pop一個Node加入到list_listening_中
void dequeue_listener(uint32_t sem_index);?? ?// 從list_listening_中移除下標索引為sem_index的元素,并且將該元素重新push到list_free_的尾部
void do_wait(bi::interprocess_mutex& mut);?? ?// 加入一個listener(SemaphoreNode)到list_listening_,并且在該listener上執行wait(infinite)操作
bool do_timed_wait(const boost::posix_time::ptime& abs_time, bi::interprocess_mutex& mut); ?// 加入一個listener(SemaphoreNode)到list_listening_,并且在該listener上執行timed_wait(abs_time)操作
private:
static constexpr uint32_t MAX_LISTENERS = 512;
SemaphoreNode semaphores_pool_[MAX_LISTENERS]; ? // 一個condition中有512個Listener,每個listener是一個SemaphoreNode
SemaphoreList list_listening_; ? // 初始化時list_listening_列表中沒有元素
SemaphoreList list_free_;?? ??? ??? ? ? // 初始化時list_free_擁有semaphores_pool_中所有元素的索引
boost::interprocess::interprocess_mutex semaphore_lists_mutex_; ?// 進程間互斥鎖,保護list_listening_和list_free_的操作
}
=============================== MultiProducerConsumerRingBuffer ?start ==========================
// 環形隊列(支持多生產者多消費者),其讀寫都是無鎖的
template <class T>
class MultiProducerConsumerRingBuffer
{
=============================== 內部類 ?start ==========================
// 環形隊列中的數據單元
class Cell
{
public:
void data(const T& data);?? ?// 填充數據
const T& data();?? ?// 返回數據
uint32_t ref_counter() const; ?// 返回該單元的引用計數
?? ??? ?std::atomic<uint32_t> ref_counter_;
T data_; ? // 單元中保存的數據,一般是BufferDescriptor
}
?? ?// 保存了所屬環形隊列的寫入指針和空余Cell數量
union PtrType
{
struct Pointer
{
uint32_t write_p; ? ?// write_pointer 對環形隊列的寫入指針
uint32_t free_cells; ? // 環形隊列中還未使用的Cell
}
ptr;
uint64_t u;
};
?? ?// 環形隊列的Listener
class Listener
{
public:
Listener(MultiProducerConsumerRingBuffer<T>& buffer, uint32_t write_p); ?// 構造函數
~Listener(); ??? ?// 析構函數,從RingBuffer中取消當前listener的注冊
Cell* head(); ??? ?// 返回該Listener從RingBuffer中可以讀取的第一個Cell單元,同時增加該Cell單元的引用計數
bool pop();?? ??? ? ?// 標識listener已經讀取完了read_p指向的Cell,并且將該Cell的引用計數-1,
// 如果發現該Cell沒有任何Listener需要訪問了,則將環形隊列的free_cells+1,并且將該listener的read_p_向后+1
private:
MultiProducerConsumerRingBuffer<T>& buffer_;?? ?// 對環形隊列的引用
uint32_t read_p_; ?// read_pointer 該對環形隊列的讀取指針,如果該listener的read_p和環形隊列的write_p相等,說明對于該listener來說,目前沒有可以讀取的Cell單元
}
?? ?// 保存了所屬環形隊列的所有屬性,包括寫入指針,空余Cell數量,Cell總數量以及listener數量
struct Node
{
alignas(8) std::atomic<PtrType> pointer_;
uint32_t total_cells_; ?// 標識Node所屬環形隊列有多少Cell單元
uint32_t registered_listeners_; ?// 標識Node所屬環形隊列有多少Listener
};
=============================== 內部類 ?stop ==========================
public:
MultiProducerConsumerRingBuffer(Cell* cells_base, uint32_t total_cells); ? ?// RingBuffer構造函數,可以看到所有的Cell單元都是外部傳入的,一般這些Cell單元都是在共享內存塊上分配的,第二個參數是Cell單元總數量
bool push(constT& data);?? ?// 向RingBuffer中加入數據,如果沒有listener,則添加失敗,如果當前RingBuffer中free_cells_=0,則添加失敗,返回buffer已滿,否則,找到空余的Cell,將Data拷貝到該Cell的data_成員中
bool is_buffer_full(); ? ?// 如果free_cells_=0,則表示當前環形隊列已經滿了
bool is_buffer_empty();?? ? ?// 如果free_cells_ = total_cells_,說明當前環形隊列是空的,內部的所有Cell都沒有填充數據
std::unique_ptr<Listener> register_listener();?? ?// 向該環形隊列注冊listener
private:
void unregister_listener(Listener& listener); ?// 從環形隊列中取消特定listener的注冊
static uint32_t get_pointer_value(uint32_t pointer); ?// 去掉pointer中最高位的loop flag,返回pointer對應的實際索引,用于在cells_數組中定位
private:
Node* node_; ? // Node對象,保存環形隊列的狀態 (write_p_, free_cells_, total_cells_, listeners_count_)
Cell* cells_;?? ? // 環形隊列所擁有的Cell
}
=============================== MultiProducerConsumerRingBuffer ?Stop ==========================
=============================== SharedMemGlobal ?Start ==========================
// Fast-DDS/src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp
// 該文件中全局類是SharedMemGlobal
class SharedMemGlobal
{
=============================== 內部類 ?start ==========================
// BufferDescriptor內部類
struct BufferDescriptor
{
SharedMemSegment::Id source_segment_id; ? ? ? ? // BufferDescriptor所對應的共享對象所屬的共享內存的ID
SharedMemSegment::Offset buffer_node_offset;?? ??? ?// BufferDescriptor對對應的共享對象的句柄(可以通過boost的接口轉換為進程內的地址)
uint32_t validity_id;?? ??? ??? ??? ??? ??? ?// 有效標志
}
typedef MultiProducerConsumerRingBuffer<BufferDescriptor>::Listener Listener;
typedef MultiProducerConsumerRingBuffer<BufferDescriptor>::Cell PortCell;
// PortNode內部類
// 注意,PortNode是構建在共享內存中的對象(SharedMemGlobal::init_port函數中創建,對象名為"port_node_abi5",其中5為ABI版本號
// PortNode* port_node = nullptr;
// port_node = segment->get().construct<PortNode>(("port_node_abi" + std::to_string(CURRENT_ABI_VERSION)).c_str())();
// PortNode用來保存Port的狀態信息
struct PortNode ??
{
SharedMemSegment::Offset buffer; ? ? ?// Port內的環形隊列中的Cell數組在共享內存塊中的地址
SharedMemSegment::Offset buffer_node;?? ?// Port內的環形隊列中的Node對象在共享內存塊中的地址
// 上面兩個都是共享內存中對象的句柄,分別代表了環形隊列中的Cell數組和Node節點對象,在SharedMemGlobal的init_port中分配
uint32_t port_id; ?? ??? ??? ??? ??? ??? ??? ??? ??? ??? ?// 端口ID
uint32_t num_listeners;?? ??? ??? ??? ??? ??? ??? ??? ?// 端口的監聽者數量
uint32_t healthy_check_timeout_ms;?? ? ?// ?
UUID<8> uuid;?? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ?// 端口的UUID
SharedMemSegment::condition_variable empty_cv; ?// 進程間條件變量 ?(類型為RobustInterprocessCondition)
SharedMemSegment::mutex empty_cv_mutex; ? // 進程間互斥鎖 ? (類型為boost::interprocess::interprocess_mutex),對Port有修改操作的時候需要使用該鎖進行互斥(例如Push,Pop,create_listener等)
static constexpr size_t LISTENERS_STATUS_SIZE = 1024; ? // 最大允許1024個Listener
struct ListenerStatus ? // 每一個Listener的狀態
{
uint8_t is_in_use ? ? ? ? ? ? ? : 1; ?// 標識listener目前是否活躍
uint8_t is_waiting ? ? ? ? ? ? ?: 1; ?// 標識listener目前是否在等待port上有新的消息
uint8_t is_processing ? ? ? ? ? : 1; ?// 標識listener目前是否正在處理port上的消息
BufferDescriptor descriptor; ? ? ? ? ?// 標識listener目前正在處理的消息的BufferDescriptor
};
ListenerStatus listeners_status[LISTENERS_STATUS_SIZE]; ? // 保存該Port的所有Listener
char domain_name[MAX_DOMAIN_NAME_LENGTH + 1]; ?// Port所屬的域名(?)
}
// Port內部類
class Port
{
private:
std::shared_ptr<SharedMemSegment> port_segment_; ?// Port使用到的共享內存
PortNode* node_; ? // PortNode成員,保存該Port的listeners和狀態
std::unique_ptr<MultiProducerConsumerRingBuffer<BufferDescriptor>> buffer_; ?// 保存Port上傳輸數據的環形隊列
std::unique_ptr<RobustExclusiveLock> read_exclusive_lock_; ? // 跨進程讀寫鎖
std::unique_ptr<RobustSharedLock> read_shared_lock_;?? ??? ??? ??? ? // 跨進程排他鎖
public:
enum class OpenMode ? // Port的打開模式
{
ReadShared, (port上可以有多個listener和多個writer,ReadShared和ReadExclusive兩種模式是互斥的)
ReadExclusive, ?(port上可以有多個Writer,但是只能有一個listener)
Write ?(標識該port隨時可以進行Write動作)
};
Port(SharedMemSegment* port_segment, PortNode* node, ...); ? // 構造函數,保存該Port的PortNode和RingBuffer的Node以及Cell(這些都是在共享內存中創建的對象)
// 增加該Port的引用計數(保存在PortNode中)
~Port(); ? ?// 析構函數,將Port的引用計數-1,?
bool try_push(const BufferDescriptor& buffer_descriptor, bool* listeners_active); ?// 向Port的環形隊列中增加BufferDescriptor,并且返回當前環形隊列是否有listener注冊了
void wait_pop(Listener& listener, const std::atomic<bool>& is_listener_closed, uint32_t listener_index); ?// 讓某個該Port上的listener等待直到Port的環形隊列中有數據,這個會修改該listener的ListenerStatus的狀態
// 并且在PortNode的empty_cv_mutex上wait知道Port上有push后喚醒
bool is_port_ok();?? ?// 標識該Port是否正常工作
uint32_t port_id(); ?// 返回Port ID
OpenMode open_mode(); ?// 返回Port的打開模式(讀互斥/讀共享/任意寫)
void close_listener(std::atomic<bool>* is_listener_closed); ? // 強制關閉當前Port上的所有listener
void pop(Listener& listener, bool& was_cell_freed); ? // 讓某個Port的listener彈出可以讀取的cell(如果該cell沒有listener需要讀取,則增加free_cells_),并且將Listener的讀取指針往下加一
std::unique_ptr<Listener> create_listener(uint32_t* listener_index); ? // 創建該Port的Listener,并且返回該Listener的index
void unregister_listener(std::unique_ptr<Listener>* listener, uint32_t listener_index); ?// 取消該Port的listener
bool get_and_remove_blocked_processing(BufferDescriptor& buffer_descriptor);?? ?// 將第一個還在處理bufferdescriptor的listener停止,并且返回其還在處理的BufferDescriptor
void listener_processing_start(uint32_t listener_index, const BufferDescriptor& buffer_descriptor); ?// 讓指定的listener開始處理某個BufferDescriptor
void listener_processing_stop(uint32_t listener_index); ?// 標識某個listener已經完成對BufferDescriptor的處理
void lock_read_exclusive(); ?// 對Port施加互斥鎖(只能有一個listener), RobustExclusiveLock
void lock_read_shared(); ? // 對Port施加共享鎖(可以有多個listener,多個Writer),RobustSharedLock
void unlock_read_locks(); ? // 釋放讀鎖
}?? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ?
=============================== 內部類 ?stop ==========================
static bool is_zombie(uint32_t port_id, const std::string& domain_name); ?// 判斷某個port_id所屬的進程是否已經是僵尸進程了,判斷方式:
// 1. 如果可以獲取該port_id的互斥鎖,并且該互斥鎖已經存在
// 2. 如果可以獲取該Port_id的共享鎖,并且該共享鎖已經存在
std::shared_ptr<Port> init_port(uint32_t port_id, std::unique_ptr<SharedMemSegment>& segment, uint32_t max_buffer_descriptors, Port::OpenMode open_mode, uint32_t healthy_check_timeout_ms);
// 1. 在共享內存中創建PortNode的共享對象并且初始化PortNode對象的狀態(port_node = segment->get().construct<PortNode>...)
// 2. 在共享內存中創建用于存放BufferDescriptor的RingBuffer的Node和Cell(segment->get().construct<MultiProducerConsumerRingBuffer<BufferDescriptor>::Cell>,
segment->get().construct<MultiProducerConsumerRingBuffer<BufferDescriptor>::Node>)
// 3. 初始化RingBuffer的Node的狀態 (MultiProducerConsumerRingBuffer<BufferDescriptor>::init_node(buffer_node, max_buffer_descriptors);)
// 注意,這個open_port主要用來打開其他進程創建的Port
std::shared_ptr<Port> open_port(uint32_t port_id, uint32_t max_buffer_descriptors, uint32_t healthy_check_timeout_ms, Port::OpenMode open_mode = Port::OpenMode::ReadShared); ?// 打開端口, open_port_internal
// 1. 如果該端口之前未被正確關閉,則需要關閉(SharedMemSegment::remove(port_segment_name.c_str());)
// 2. 重新映射該端口的共享內存(new SharedMemSegment(boost::interprocess::open_only, port_segment_name.c_str()));)
// 3. 找到該端口的PortNode(port_node = port_segment->get().find<PortNode>)
// 4. 創建該端口的Port對象(port = std::make_shared<Port>(std::move(port_segment), port_node);)
// 5. 如果打開模式中配置了互斥鎖,則對端口加上互斥鎖
std::string domain_name_; ? // Port的domain名稱(FastRtps),會影響Port的共享內存變量的名稱(Fastrtps_port27481)
// auto port_segment_name = domain_name_ + "_port" + std::to_string(port_id);
}
=============================== SharedMemManager ?Start ==========================
// Fast-DDS/src/cpp/rtps/transport/shared_mem/SharedMemManager.hpp
class SharedMemManager
{
=============================== 內部類 ?start ==========================
// BufferNode 內部類
// BufferNode對象的構建是在共享內存塊上創建的,例如:?
// Alloc the buffer nodes
auto buffers_nodes = segment_->get().construct<BufferNode>(boost::interprocess::anonymous_instance)[max_allocations]();
struct BufferNode ?// 用于表示一個Buffer的狀態
{
struct Status ? // 總共占用8字節
{
uint64_t validity_id : 24;?? ??? ??? ?//?
uint64_t enqueued_count : 20;?? ??? ?// Push到Port時增加1,從Port中Pop時遞減1
uint64_t processing_count : 20; // 當listener處理該Buffer時對該成員加1,處理完成后-1
};
Status status;?? ??? ?// Buffer狀態
uint32_t data_size;?? ??? ?// 數據大小
SharedMemSegment::Offset data_offset;?? ??? ?// 共享業務數據的句柄(通過句柄可以獲取該業務數據所占用共享內存在當前進程中的地址)
bool invalidate_buffer();?? ??? ??? ??? ??? ??? ??? ??? ?// 將該Buffer的狀態改為Invalidate?? ?
bool invalidate_if_not_processing();?? ??? ?// 當該Buffer沒有被listener處理時,修改器狀態為Invalidate
bool dec_enqueued_inc_processing_counts(uint32_t listener_validity_id);?? ??? ?// enqueued_count-1, processing_count+1 ,在SharedMemManager::Listener對該Buffer做pop處理時會調用該函數
bool inc_processing_count(uint32_t listener_validity_id);?? ??? ?// 增加buffer的processing_count
bool inc_enqueued_count(uint32_t listener_validity_id);
bool dec_enqueued_count(uint32_t listener_validity_id);
bool is_not_referenced();
bool dec_processing_count(uint32_t listener_validity_id);
}
// Buffer 內部類
class Buffer
{
public:
virtual void* data() = 0;
virtual uint32_t size() = 0;
}
// SharedMemBuffer 內部類
class SharedMemBuffer : public Buffer
{
public:
SharedMemBuffer(std::shared_ptr<SharedMemSegment>& segment, SharedMemSegment::Id& segment_id, BufferNode* buffer_node, uint32_t original_validity_id); ?// 創建SharedMemBuffer
~SharedMemBuffer(); ?// 析構函數
void* data();?? ??? ?// 返回所包裝的bufferNode對應的共享內存中的業務數據在當前進程中的地址
uint32_t size();?? ?// 返回包裝的bufferNode對應的共享內存中的業務數據的大小
SharedMemSegment::Offset node_offset();?? ??? ?// 返回所包裝的bufferNode對應的數據的句柄(可以跨進程訪問,類似于內核句柄)
SharedMemSegment::Id segment_id();?? ??? ?// 返回所依賴的共享內存區間SharedMemSegment的ID
uint32_t validity_id();?? ??? ?// 返回包裝的bufferNode的validity_id
void inc_enqueued_count(uint32_t validity_id); ?// 遞增所包裝的bufferNode的enqueued_count
void dec_enqueued_count(uint32_t validity_id); ?// 遞減所包裝的bufferNode的enqueued_count
private:
std::shared_ptr<SharedMemSegment> segment_;?? ??? ?// 用于給業務數據分配共享內存的SharedMemSegment對象指針
void* data_;?? ?// 所包裝的bufferNode對應的共享內存中的業務數據在當前進程中的地址
BufferNode* buffer_node_;?? ??? ?// 保存Buffer狀態的BufferNode
uint32_t original_validity_id_; ? // 構建該Buffer時BufferNode中validity_id的值
}
// Segment 內部類,內部包含BufferNode的集合,代表一段共享內存區間,提供共享內存塊的分配和回收,下面對接SharedMemSegment
class Segment
{
public:
Segment(uint32_t size, uint32_t payload_size, uint32_t max_allocations, const std::string& domain_name);?? ?// 構造函數,內部創建SharedMemSegment對象
~Segment();?? ?// 析構函數
SharedMemSegment::Id id();?? ?// 返回依賴的SharedMemSegment對象的Id
std::shared_ptr<Buffer> alloc_buffer(uint32_t size, steady_clock::time_point& max_blocking_time_point);?? ??? ?// 從SharedMemSegment對象中分配共享內存SharedMemBuffer
uint64_t mem_size();?? ??? ?// 返回SharedMemSegment對象映射的共享內存的總大小
private:
std::string segment_name_; ?// 所依賴的SharedMemSegment對象的名稱
std::list<BufferNode*> free_buffers_; ?// 空閑的BufferNode列表
std::list<BufferNode*> allocated_buffers_; ?// 已被分配的BufferNode列表
std::mutex alloc_mutex_;?? ??? ?// 分配共享內存時使用的鎖
std::shared_ptr<SharedMemSegment> segment_;?? ? // 依賴的SharedMemSegment對象
SharedMemSegment::Id segment_id_;?? ? // 依賴的SharedMemSegment對象的ID
uint64_t overflows_count_; ?// 分配共享內存失敗的次數
uint32_t free_bytes_; ?// 可分配的剩余共享內存大小(單位byte)
private:
void generate_segment_id_and_name(std::string& domain_name); ?// 生成SharedMemSegment對象的Name和ID
BufferNode* pop_free_node();?? ?// 從free_buffers_中彈出最后一個可用的BufferNode并且返回
void release_buffer(BufferNode* buffer_node);?? ??? ?// 從SharedMemSegment對象中釋放參數中BufferNode申請的共享內存
bool recover_buffers(uint32_t required_data_size);?? ?// 從allocated_buffers_中釋放沒有被listener使用的BufferNode直到free_bytes_大于required_data_size
}
// Listener 內部類,內部創建SharedMemGlobal::Listener對象用于監聽SharedMemGlobal::Port,讀取其他進程Port上Push的BufferDescriptor,進而獲取其他進程發出的共享數據
class Listener
{
public:
Listener(SharedMemManager* shared_mem_manager, std::shared_ptr<SharedMemGlobal::Port> port);?? ?// 構造函數,創建SharedMemGlobal::Listener對象
~Listener();?? ??? ?// 析構函數,取消創建SharedMemGlobal::Listener對象對Port的注冊
std::shared_ptr<Buffer> pop();?? ?// 從Port中返回可以讀取的第一個Buffer
void stop_processing_buffer(); ?// 通知Port當前Listener的狀態改為停止狀態
void regenerate_port();?? ??? ?// 當Port狀態不OK的時候,重新創建該PortID的Port
void close();?? ??? ?// 關閉當前Listener,這個會接觸正在運行的pop調用
private:
std::shared_ptr<SharedMemGlobal::Port> global_port_; ?? ?// 要監聽的SharedMemGlobal::Port對象
std::unique_ptr<SharedMemGlobal::Listener> global_listener_; ? // 內部創建用于監聽Port的SharedMemGlobal::Listener對象
uint32_t listener_index_;?? ??? ?// 當前global_listener_在SharedMemGlobal::Port對象的PortNode的Listener容器中的索引
SharedMemManager* shared_mem_manager_;?? ??? ?// 外部傳入的SharedMemManager對象,用于根據BufferDescriptor中的segment_id定位到其他進程創建的SharedMemSegment對象,從而訪問到對方進程創建的共享Buffer的BufferNode
std::atomic<bool> is_closed_;?? ??? ?// 標識Listener的close狀態,這個標志會影響Listener的pop函數,導致其退出等待
}
// Port 內部類,內部包含了SharedMemGlobal::Port對象
// 通過Port類,可以向SharedMemGlobal::Port對象中Push buffer,也可以創建Listener監聽該Port
class Port
{
public:
Port(SharedMemManager* shared_mem_manager, std::shared_ptr<SharedMemGlobal::Port> port, SharedMemGlobal::Port::OpenMode open_mode);?? ??? ?// 構造函數
bool try_push(const std::shared_ptr<Buffer>& buffer);?? ??? ?// 將Buffer送入Port(調用SharedMemGlobal::Port的try_push接口,并且增加BufferNode的enqueue_count)
void recover_blocked_processing();?? ??? ?// 當內部SharedMemGlobal::Port對象變為僵尸Port時,將該Port中所有正在被listener處理的Buffer的processing_count做遞減處理(配合regenerate_port一起使用)
// 因為BufferNode是跨進程共享的,如果不講processing_count,會造成該Buffer始終無法被回收
std::shared_ptr<Listener> create_listener(); // 在內部SharedMemGlobal::Port對象上創建并且返回一個Listener
private:
void regenerate_port(); ?// 首先釋放當前端口中還在讀取的Buffer(BufferNode中的process_count -1,一遍讓push的一方可以回收buffer),然后重新創建SharedMemGlobal::Port
private:
SharedMemManager* shared_mem_manager_;?? ??? ?// 共享內存對象
std::shared_ptr<SharedMemGlobal::Port> global_port_;
SharedMemGlobal::Port::OpenMode open_mode_; ??? ?// 端口打開模式
}
// Port 內部類
class SegmentWrapper
{
// SegmentWrapper的內部類 WatchTask,這是個單例類,用于監控進程中所有的SegmentWrapper
class WatchTask : public SharedMemWatchdog::Task
{
public:
static std::shared_ptr<WatchTask>& get();?? ??? ?// 返回 WatchTask 單例對象
void add_segment(std::shared_ptr<SegmentWrapper> segment); ? ?// 添加要監控的SegmentWrapper
void remove_segment(std::shared_ptr<SegmentWrapper> segment);?? ??? ?// 移除已監控的SegmentWrapper
~WatchTask();?? ??? ??? ?// 析構函數
private:
WatchTask() : watched_it_(watched_segments_.end()), shared_mem_watchdog_(SharedMemWatchdog::get());?? ? ?? ?// 構造函數
void update_watched_segments();?? ??? ?// 使用to_add_和to_remove_來更新watched_segments_容器中的SegmentWrapper
void run();?? ??? ?// 定時運行的檢測函數,調用watched_segments_中每個SegmentWrapper的check_alive進行Alive檢測,并且移除非Alive的Segment
private:
std::unordered_map<std::shared_ptr<SegmentWrapper>, uint32_t> watched_segments_;?? ??? ??? ??? ?// 保存已監控的SegmentWrapper
std::unordered_map<std::shared_ptr<SegmentWrapper>, uint32_t>::iterator watched_it_;?? ??? ?// watched_segments_容器的迭代器
std::vector<std::shared_ptr<SegmentWrapper>> to_add_;?? ??? ?// add_segment調用會現將要添加的SegmentWrapper放入這個to_add_容器,后面才會進入watched_segments_容器
std::vector<std::shared_ptr<SegmentWrapper>> to_remove_;?? ??? ?// remove_segment調用會現將要移除的SegmentWrapper放入這個to_remove_容器,后面從watched_segments_容器中移除
std::mutex to_add_remove_mutex_;?? ??? ?
}
private:
std::weak_ptr<SharedMemManager> shared_mem_manager_;?? ??? ?// 指向外部類SharedMemManager對象的弱指針
std::shared_ptr<SharedMemSegment> segment_;?? ??? ?// 需要監控的SharedMemSegment指針
SharedMemSegment::Id segment_id_;?? ??? ?// 需要監控的SharedMemSegment的Id
std::string segment_name_;?? ??? ?// 需要監控的SharedMemSegment的名稱
std::string lock_file_name_;?? ??? ?// 文件名稱為segment名稱+"_el"
std::atomic<std::chrono::steady_clock::time_point::rep> last_alive_check_time_;?? ??? ?// 最后一次檢查該Wrapper的時間
static constexpr uint32_t ALIVE_CHECK_TIMEOUT_SECS {5}; ? ?// 要求是5秒檢測一次
private:
bool check_alive(); ? ?// 該函數會被WatchTask調用,檢查所包含的SharedMemSegment是否已經失效了,如果失效了則讓SharedMemManager移除對該SharedMemSegment的映射
bool alive_check_timeout(const std::chrono::steady_clock::time_point& now) const;?? ??? ?// 用于告知WatchTask該Wrapper是否挺長時間沒檢測,需要檢測了,返回true則說明需要檢測
void close_and_remove();?? ??? ?// 關閉該SegmentWrapper,并且讓SharedMemManager移除對該SharedMemSegment的映射
public:
SegmentWrapper(std::weak_ptr<SharedMemManager> shared_mem_manager, std::shared_ptr<SharedMemSegment> segment_, SharedMemSegment::Id segment_id, const std::string& segment_name); // 構造函數
std::shared_ptr<SharedMemSegment> segment();?? ??? ?// 返回所監控的SharedMemSegment對象的指針
void update_alive_time(const std::chrono::steady_clock::time_point& time);?? ??? ?// 更新該Wrapper最后一次被檢測的時間點
}
=============================== 內部類 ?stop ==========================
public:
std::shared_ptr<Segment> create_segment(uint32_t size, uint32_t max_allocations); ? ?// 創建Segment,內部創建SharedMemSegment,并且包含空閑的BufferNode列表和已分配的BufferNode列表
uint32_t segment_allocation_extra_size(uint32_t max_allocations); ? ?// 計算Segment的額外空間 (需要把BufferNode的占用也算進去)
std::shared_ptr<Port> open_port(uint32_t port_id, uint32_t max_descriptors, uint32_t healthy_check_timeout_ms, SharedMemGlobal::Port::OpenMode open_mode); ?// 打開Port(不存在的情況下會創建)
void remove_port(uint32_t port_id); ? ?// 移除Port共享內存在當前進程的映射(SharedMemSegment::remove)
SharedMemGlobal* global_segment(); ? ?// 返回SharemMemManager內部的SharedMemGlobal對象指針
uint64_t segments_mem();?? ??? ?// 返回內部Segment所包裝的SharedMemSegment映射的共享內存區域的大小
private:
std::shared_ptr<Port> regenerate_port(std::shared_ptr<SharedMemGlobal::Port> port,SharedMemGlobal::Port::OpenMode open_mode); ?// 根據參數port的port_id重新創建Port
std::shared_ptr<SharedMemSegment> find_segment(SharedMemSegment::Id id);?? ??? ?// 通過Id映射SharedMemSegment到當前進程并且使用SegmentWrapper包裝后進行管理(生命周期)
void release_segment(SharedMemSegment::Id id);?? ??? ?// 取消ids_segments_中已經映射的SharedMemSegment
void remove_segments_from_watch();?? ??? ?// 移除所有的SegmentWrapper的Watch
private:
std::mutex ids_segments_mutex_;?? ??? ?// 操作ids_segments_時需要獲取的鎖
uint64_t segments_mem_;?? ??? ?// 內部Segment所包裝的SharedMemSegment映射的共享內存區域的大小
SharedMemGlobal global_segment_;?? ??? ?// SharedMemGlobal對象
std::unordered_map<SharedMemSegment::Id::type, std::shared_ptr<SegmentWrapper>, std::hash<SharedMemSegment::Id::type>> ids_segments_; ?// 管理所有打開并且映射到當前進程的SharedMemSegment
}
=============================== SharedMemManager ?Stop ==========================