FastDDS (SharedMemory)


=========================================================================================
=============================== SharedMemSegment ?Start ==========================
// Fast-DDS/src/cpp/utils/shared_memory/SharedMemSegment.hpp

class SharedSegmentBase
{
=============================== 內部類 ?start ==========================
class Id
{
public:
typedef UUID<8> type;
Id(); ? // 返回共享內存變量的ID
Id(const Id& other); ?// 設置共享內存變量的ID
void generate(); ?// 給當前Id隨機生成一個UUID
private:
type uuid_;
}
=============================== 內部類 ?stop ==========================
public:
using sharable_lock = boost::interprocess::sharable_lock<M>;
using sharable_mutex = boost::interprocess::interprocess_sharable_mutex;

? using condition_variable = RobustInterprocessCondition;
using mutex = boost::interprocess::interprocess_mutex;
using named_mutex = boost::interprocess::named_mutex;
using spin_wait = boost::interprocess::spin_wait;
static constexpr uint32_t EXTRA_SEGMENT_SIZE = 512; ?// 每個共享內存變量需要額外的內存來維護其信息
public:
static deleted_unique_ptr<SharedSegmentBase::named_mutex> open_or_create_and_lock_named_mutex(const std::string& mutex_name); // 創建帶名稱的進程間互斥變量
void* get_address_from_offset(SharedSegment::Offset offset); ?// 返回當前內存塊上創建的對象的句柄對應的進程內地址
SharedSegment::Offset get_offset_from_address(void* address); ?// 返回當前內存塊上創建的對象在進程內地址所對應的對象句柄
private:
std::string name_; ?// 共享內存名稱?? ?
}

template<typename T, typename U>
class SharedSegment : public SharedSegmentBase
{
public:
typedef T managed_shared_memory_type; ?// boost::interprocess::basic_managed_shared_memory<...>或者boost::interprocess::basic_managed_mapped_file<...>
typedef U managed_shared_object_type;?? ? // boost::interprocess::shared_memory_object或者boost::interprocess::file_mapping

?? ?SharedSegment(boost::interprocess::create_only_t, const std::string& name, size_t size); ?// 構造函數,創建共享內存塊
static void remove(const std::string& name); ?// 移除共享內存塊(感覺就是unmap操作)
void remove(); ?// 移除當前SharedSegment的共享內存塊
Offset mem_size() const; ?// 以字節為單位返回SharedSegment的大小

private:
std::unique_ptr<managed_shared_memory_type> segment_; ?// boost::interprocess::basic_managed_shared_memory對象
}

using SharedMemSegment = SharedSegment<boost::interprocess::basic_managed_shared_memory<...>, boost::interprocess::shared_memory_object>
=============================== SharedMemSegment ?Stop ==========================

// Fast-DDS/src/cpp/utils/shared_memory/RobustInterprocessCondition.hpp

// 雙向列表的節點
struct SemaphoreNode
{
bi::interprocess_semaphore sem {0}; ? // 跨進程信號量
uint32_t next; ? ?? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ?// 指向上一個節點的索引
uint32_t prev;?? ? ?? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ?// 指向下一個節點的索引
};

// 雙向列表
class SemaphoreList
{
private:
uint32_t head_; ? ?// 指向列表開頭SemaphoreNode元素的索引
uint32_t tail_;?? ??? ? // 指向列表結尾SemaphoreNode元素的索引

public:
static constexpr uint32_t LIST_NULL = static_cast<uint32_t>(-1);

void push(uint32_t sem_index, SemaphoreNode* sem_pool); ?// 添加元素(的索引)到列表中
uint32_t pop(SemaphoreNode* sem_pool);?? ?// 從列表中移除并且返回尾部的元素的索引
uint32_t tail();?? ??? ?// 返回列表尾部的元素的索引
uint32_t head();?? ? ?// 返回列表頭部的元素的索引
void remove(uint32_t sem_index, SemaphoreNode* sem_pool); ?// 將sem_index索引指定的元素從列表中移除
}

// 進程間共享鎖
// 原理,在當前主機上創建文件,并且通過flock函數來執行鎖定和解鎖等操作, 從而完成進程間的鎖
// 對于共享鎖綁定的文件來說,使用前首先嘗試flock(fd, LOCK_EX | LOCK_NB)看下文件是否已經被別的進程上了互斥鎖
// 如果當前進程可以上LOCK_EX鎖,說明該文件沒有被別的進程上互斥鎖
// 這樣的情況下再通過flock(fd, LOCK_UN | LOCK_NB)上共享鎖
class RobustSharedLock
{
public:
RobustSharedLock(std::string& name,...); // 構造函數,其中會直接嘗試對文件上鎖
~RobustSharedLock();?? ?// 析構函數,其中會對文件的
static bool is_locked(const std::string& name); ?// 查詢name對應的文件是否被別的進程上了互斥鎖
static bool remove(const std::string& name); ?// 刪除name對應的文件
private:
std::string name_;?? ?// 文件名稱
int fd_;?? ??? ??? ??? ??? ??? ?// 文件描述符

}

// 進程間排他鎖(讀寫鎖)
class RobustExclusiveLock
{
// 結構基本等同于 RobustSharedLock 共享鎖,只是上鎖的flock函數中用的是LOCK_EX,而不是LOCK_SH。
}

// 進程間條件變量
class RobustInterprocessCondition
{
public:
void notify_one();?? ?// 喚醒list_listening_中尾部的listener
void notify_all(); ?// 喚醒list_listening_中所有的listener

private:
void init_sem_list(); ?// 初始化?? ?semaphores_pool_,將semaphores_pool_數組初始化為一個順序的雙向列表,每個Node的prev指向上一個Node, next指向下一個Node
uint32_t enqueue_listener(); ?// 從list_free_中pop一個Node加入到list_listening_中
void dequeue_listener(uint32_t sem_index);?? ?// 從list_listening_中移除下標索引為sem_index的元素,并且將該元素重新push到list_free_的尾部
void do_wait(bi::interprocess_mutex& mut);?? ?// 加入一個listener(SemaphoreNode)到list_listening_,并且在該listener上執行wait(infinite)操作
bool do_timed_wait(const boost::posix_time::ptime& abs_time, bi::interprocess_mutex& mut); ?// 加入一個listener(SemaphoreNode)到list_listening_,并且在該listener上執行timed_wait(abs_time)操作

private:
static constexpr uint32_t MAX_LISTENERS = 512;
SemaphoreNode semaphores_pool_[MAX_LISTENERS]; ? // 一個condition中有512個Listener,每個listener是一個SemaphoreNode
SemaphoreList list_listening_; ? // 初始化時list_listening_列表中沒有元素
SemaphoreList list_free_;?? ??? ??? ? ? // 初始化時list_free_擁有semaphores_pool_中所有元素的索引
boost::interprocess::interprocess_mutex semaphore_lists_mutex_; ?// 進程間互斥鎖,保護list_listening_和list_free_的操作
}

=============================== MultiProducerConsumerRingBuffer ?start ==========================
// 環形隊列(支持多生產者多消費者),其讀寫都是無鎖的
template <class T>
class MultiProducerConsumerRingBuffer
{
=============================== 內部類 ?start ==========================
// 環形隊列中的數據單元
class Cell
{
public:
void data(const T& data);?? ?// 填充數據
const T& data();?? ?// 返回數據
uint32_t ref_counter() const; ?// 返回該單元的引用計數

?? ??? ?std::atomic<uint32_t> ref_counter_;
T data_; ? // 單元中保存的數據,一般是BufferDescriptor
}

?? ?// 保存了所屬環形隊列的寫入指針和空余Cell數量
union PtrType
{
struct Pointer
{
uint32_t write_p; ? ?// write_pointer 對環形隊列的寫入指針
uint32_t free_cells; ? // 環形隊列中還未使用的Cell
}
ptr;
uint64_t u;
};

?? ?// 環形隊列的Listener
class Listener
{
public:
Listener(MultiProducerConsumerRingBuffer<T>& buffer, uint32_t write_p); ?// 構造函數
~Listener(); ??? ?// 析構函數,從RingBuffer中取消當前listener的注冊
Cell* head(); ??? ?// 返回該Listener從RingBuffer中可以讀取的第一個Cell單元,同時增加該Cell單元的引用計數
bool pop();?? ??? ? ?// 標識listener已經讀取完了read_p指向的Cell,并且將該Cell的引用計數-1,
// 如果發現該Cell沒有任何Listener需要訪問了,則將環形隊列的free_cells+1,并且將該listener的read_p_向后+1
private:
MultiProducerConsumerRingBuffer<T>& buffer_;?? ?// 對環形隊列的引用
uint32_t read_p_; ?// read_pointer 該對環形隊列的讀取指針,如果該listener的read_p和環形隊列的write_p相等,說明對于該listener來說,目前沒有可以讀取的Cell單元
}

?? ?// 保存了所屬環形隊列的所有屬性,包括寫入指針,空余Cell數量,Cell總數量以及listener數量
struct Node
{
alignas(8) std::atomic<PtrType> pointer_;
uint32_t total_cells_; ?// 標識Node所屬環形隊列有多少Cell單元

uint32_t registered_listeners_; ?// 標識Node所屬環形隊列有多少Listener
};
=============================== 內部類 ?stop ==========================
public:
MultiProducerConsumerRingBuffer(Cell* cells_base, uint32_t total_cells); ? ?// RingBuffer構造函數,可以看到所有的Cell單元都是外部傳入的,一般這些Cell單元都是在共享內存塊上分配的,第二個參數是Cell單元總數量
bool push(constT& data);?? ?// 向RingBuffer中加入數據,如果沒有listener,則添加失敗,如果當前RingBuffer中free_cells_=0,則添加失敗,返回buffer已滿,否則,找到空余的Cell,將Data拷貝到該Cell的data_成員中
bool is_buffer_full(); ? ?// 如果free_cells_=0,則表示當前環形隊列已經滿了
bool is_buffer_empty();?? ? ?// 如果free_cells_ = total_cells_,說明當前環形隊列是空的,內部的所有Cell都沒有填充數據
std::unique_ptr<Listener> register_listener();?? ?// 向該環形隊列注冊listener

private:
void unregister_listener(Listener& listener); ?// 從環形隊列中取消特定listener的注冊
static uint32_t get_pointer_value(uint32_t pointer); ?// 去掉pointer中最高位的loop flag,返回pointer對應的實際索引,用于在cells_數組中定位

private:
Node* node_; ? // Node對象,保存環形隊列的狀態 (write_p_, free_cells_, total_cells_, listeners_count_)
Cell* cells_;?? ? // 環形隊列所擁有的Cell
}
=============================== MultiProducerConsumerRingBuffer ?Stop ==========================


=============================== SharedMemGlobal ?Start ==========================
// Fast-DDS/src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp
// 該文件中全局類是SharedMemGlobal


class SharedMemGlobal
{
=============================== 內部類 ?start ==========================
// BufferDescriptor內部類
struct BufferDescriptor
{
SharedMemSegment::Id source_segment_id; ? ? ? ? // BufferDescriptor所對應的共享對象所屬的共享內存的ID
SharedMemSegment::Offset buffer_node_offset;?? ??? ?// BufferDescriptor對對應的共享對象的句柄(可以通過boost的接口轉換為進程內的地址)
uint32_t validity_id;?? ??? ??? ??? ??? ??? ?// 有效標志
}

typedef MultiProducerConsumerRingBuffer<BufferDescriptor>::Listener Listener;
typedef MultiProducerConsumerRingBuffer<BufferDescriptor>::Cell PortCell;

// PortNode內部類
// 注意,PortNode是構建在共享內存中的對象(SharedMemGlobal::init_port函數中創建,對象名為"port_node_abi5",其中5為ABI版本號
// PortNode* port_node = nullptr;
// port_node = segment->get().construct<PortNode>(("port_node_abi" + std::to_string(CURRENT_ABI_VERSION)).c_str())();
// PortNode用來保存Port的狀態信息
struct PortNode ??
{
SharedMemSegment::Offset buffer; ? ? ?// Port內的環形隊列中的Cell數組在共享內存塊中的地址
SharedMemSegment::Offset buffer_node;?? ?// Port內的環形隊列中的Node對象在共享內存塊中的地址
// 上面兩個都是共享內存中對象的句柄,分別代表了環形隊列中的Cell數組和Node節點對象,在SharedMemGlobal的init_port中分配
uint32_t port_id; ?? ??? ??? ??? ??? ??? ??? ??? ??? ??? ?// 端口ID
uint32_t num_listeners;?? ??? ??? ??? ??? ??? ??? ??? ?// 端口的監聽者數量
uint32_t healthy_check_timeout_ms;?? ? ?// ?
UUID<8> uuid;?? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ?// 端口的UUID
SharedMemSegment::condition_variable empty_cv; ?// 進程間條件變量 ?(類型為RobustInterprocessCondition)
SharedMemSegment::mutex empty_cv_mutex; ? // 進程間互斥鎖 ? (類型為boost::interprocess::interprocess_mutex),對Port有修改操作的時候需要使用該鎖進行互斥(例如Push,Pop,create_listener等)
static constexpr size_t LISTENERS_STATUS_SIZE = 1024; ? // 最大允許1024個Listener

struct ListenerStatus ? // 每一個Listener的狀態
{
uint8_t is_in_use ? ? ? ? ? ? ? : 1; ?// 標識listener目前是否活躍
uint8_t is_waiting ? ? ? ? ? ? ?: 1; ?// 標識listener目前是否在等待port上有新的消息
uint8_t is_processing ? ? ? ? ? : 1; ?// 標識listener目前是否正在處理port上的消息
BufferDescriptor descriptor; ? ? ? ? ?// 標識listener目前正在處理的消息的BufferDescriptor
};
ListenerStatus listeners_status[LISTENERS_STATUS_SIZE]; ? // 保存該Port的所有Listener
char domain_name[MAX_DOMAIN_NAME_LENGTH + 1]; ?// Port所屬的域名(?)
}

// Port內部類
class Port
{
private:
std::shared_ptr<SharedMemSegment> port_segment_; ?// Port使用到的共享內存
PortNode* node_; ? // PortNode成員,保存該Port的listeners和狀態
std::unique_ptr<MultiProducerConsumerRingBuffer<BufferDescriptor>> buffer_; ?// 保存Port上傳輸數據的環形隊列
std::unique_ptr<RobustExclusiveLock> read_exclusive_lock_; ? // 跨進程讀寫鎖
std::unique_ptr<RobustSharedLock> read_shared_lock_;?? ??? ??? ??? ? // 跨進程排他鎖
public:
enum class OpenMode ? // Port的打開模式
{
ReadShared, (port上可以有多個listener和多個writer,ReadShared和ReadExclusive兩種模式是互斥的)
ReadExclusive, ?(port上可以有多個Writer,但是只能有一個listener)
Write ?(標識該port隨時可以進行Write動作)
};
Port(SharedMemSegment* port_segment, PortNode* node, ...); ? // 構造函數,保存該Port的PortNode和RingBuffer的Node以及Cell(這些都是在共享內存中創建的對象)
// 增加該Port的引用計數(保存在PortNode中)
~Port(); ? ?// 析構函數,將Port的引用計數-1,?
bool try_push(const BufferDescriptor& buffer_descriptor, bool* listeners_active); ?// 向Port的環形隊列中增加BufferDescriptor,并且返回當前環形隊列是否有listener注冊了
void wait_pop(Listener& listener, const std::atomic<bool>& is_listener_closed, uint32_t listener_index); ?// 讓某個該Port上的listener等待直到Port的環形隊列中有數據,這個會修改該listener的ListenerStatus的狀態
// 并且在PortNode的empty_cv_mutex上wait知道Port上有push后喚醒
bool is_port_ok();?? ?// 標識該Port是否正常工作
uint32_t port_id(); ?// 返回Port ID
OpenMode open_mode(); ?// 返回Port的打開模式(讀互斥/讀共享/任意寫)
void close_listener(std::atomic<bool>* is_listener_closed); ? // 強制關閉當前Port上的所有listener
void pop(Listener& listener, bool& was_cell_freed); ? // 讓某個Port的listener彈出可以讀取的cell(如果該cell沒有listener需要讀取,則增加free_cells_),并且將Listener的讀取指針往下加一
std::unique_ptr<Listener> create_listener(uint32_t* listener_index); ? // 創建該Port的Listener,并且返回該Listener的index
void unregister_listener(std::unique_ptr<Listener>* listener, uint32_t listener_index); ?// 取消該Port的listener
bool get_and_remove_blocked_processing(BufferDescriptor& buffer_descriptor);?? ?// 將第一個還在處理bufferdescriptor的listener停止,并且返回其還在處理的BufferDescriptor
void listener_processing_start(uint32_t listener_index, const BufferDescriptor& buffer_descriptor); ?// 讓指定的listener開始處理某個BufferDescriptor
void listener_processing_stop(uint32_t listener_index); ?// 標識某個listener已經完成對BufferDescriptor的處理
void lock_read_exclusive(); ?// 對Port施加互斥鎖(只能有一個listener), RobustExclusiveLock
void lock_read_shared(); ? // 對Port施加共享鎖(可以有多個listener,多個Writer),RobustSharedLock
void unlock_read_locks(); ? // 釋放讀鎖
}?? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ??? ?

=============================== 內部類 ?stop ==========================

static bool is_zombie(uint32_t port_id, const std::string& domain_name); ?// 判斷某個port_id所屬的進程是否已經是僵尸進程了,判斷方式:
// 1. 如果可以獲取該port_id的互斥鎖,并且該互斥鎖已經存在
// 2. 如果可以獲取該Port_id的共享鎖,并且該共享鎖已經存在
std::shared_ptr<Port> init_port(uint32_t port_id, std::unique_ptr<SharedMemSegment>& segment, uint32_t max_buffer_descriptors, Port::OpenMode open_mode, uint32_t healthy_check_timeout_ms);
// 1. 在共享內存中創建PortNode的共享對象并且初始化PortNode對象的狀態(port_node = segment->get().construct<PortNode>...)
// 2. 在共享內存中創建用于存放BufferDescriptor的RingBuffer的Node和Cell(segment->get().construct<MultiProducerConsumerRingBuffer<BufferDescriptor>::Cell>,
segment->get().construct<MultiProducerConsumerRingBuffer<BufferDescriptor>::Node>)
// 3. 初始化RingBuffer的Node的狀態 (MultiProducerConsumerRingBuffer<BufferDescriptor>::init_node(buffer_node, max_buffer_descriptors);)
// 注意,這個open_port主要用來打開其他進程創建的Port
std::shared_ptr<Port> open_port(uint32_t port_id, uint32_t max_buffer_descriptors, uint32_t healthy_check_timeout_ms, Port::OpenMode open_mode = Port::OpenMode::ReadShared); ?// 打開端口, open_port_internal
// 1. 如果該端口之前未被正確關閉,則需要關閉(SharedMemSegment::remove(port_segment_name.c_str());)
// 2. 重新映射該端口的共享內存(new SharedMemSegment(boost::interprocess::open_only, port_segment_name.c_str()));)
// 3. 找到該端口的PortNode(port_node = port_segment->get().find<PortNode>)
// 4. 創建該端口的Port對象(port = std::make_shared<Port>(std::move(port_segment), port_node);)
// 5. 如果打開模式中配置了互斥鎖,則對端口加上互斥鎖

std::string domain_name_; ? // Port的domain名稱(FastRtps),會影響Port的共享內存變量的名稱(Fastrtps_port27481)
// auto port_segment_name = domain_name_ + "_port" + std::to_string(port_id);

}


=============================== SharedMemManager ?Start ==========================
// Fast-DDS/src/cpp/rtps/transport/shared_mem/SharedMemManager.hpp

class SharedMemManager
{
=============================== 內部類 ?start ==========================
// BufferNode 內部類
// BufferNode對象的構建是在共享內存塊上創建的,例如:?
// Alloc the buffer nodes
auto buffers_nodes = segment_->get().construct<BufferNode>(boost::interprocess::anonymous_instance)[max_allocations]();

struct BufferNode ?// 用于表示一個Buffer的狀態
{
struct Status ? // 總共占用8字節
{
uint64_t validity_id : 24;?? ??? ??? ?//?
uint64_t enqueued_count : 20;?? ??? ?// Push到Port時增加1,從Port中Pop時遞減1
uint64_t processing_count : 20; // 當listener處理該Buffer時對該成員加1,處理完成后-1
};
Status status;?? ??? ?// Buffer狀態
uint32_t data_size;?? ??? ?// 數據大小
SharedMemSegment::Offset data_offset;?? ??? ?// 共享業務數據的句柄(通過句柄可以獲取該業務數據所占用共享內存在當前進程中的地址)
bool invalidate_buffer();?? ??? ??? ??? ??? ??? ??? ??? ?// 將該Buffer的狀態改為Invalidate?? ?
bool invalidate_if_not_processing();?? ??? ?// 當該Buffer沒有被listener處理時,修改器狀態為Invalidate
bool dec_enqueued_inc_processing_counts(uint32_t listener_validity_id);?? ??? ?// enqueued_count-1, processing_count+1 ,在SharedMemManager::Listener對該Buffer做pop處理時會調用該函數
bool inc_processing_count(uint32_t listener_validity_id);?? ??? ?// 增加buffer的processing_count
bool inc_enqueued_count(uint32_t listener_validity_id);
bool dec_enqueued_count(uint32_t listener_validity_id);
bool is_not_referenced();
bool dec_processing_count(uint32_t listener_validity_id);
}

// Buffer 內部類
class Buffer
{
public:
virtual void* data() = 0;
virtual uint32_t size() = 0;
}

// SharedMemBuffer 內部類
class SharedMemBuffer : public Buffer
{
public:
SharedMemBuffer(std::shared_ptr<SharedMemSegment>& segment, SharedMemSegment::Id& segment_id, BufferNode* buffer_node, uint32_t original_validity_id); ?// 創建SharedMemBuffer
~SharedMemBuffer(); ?// 析構函數
void* data();?? ??? ?// 返回所包裝的bufferNode對應的共享內存中的業務數據在當前進程中的地址
uint32_t size();?? ?// 返回包裝的bufferNode對應的共享內存中的業務數據的大小
SharedMemSegment::Offset node_offset();?? ??? ?// 返回所包裝的bufferNode對應的數據的句柄(可以跨進程訪問,類似于內核句柄)
SharedMemSegment::Id segment_id();?? ??? ?// 返回所依賴的共享內存區間SharedMemSegment的ID
uint32_t validity_id();?? ??? ?// 返回包裝的bufferNode的validity_id
void inc_enqueued_count(uint32_t validity_id); ?// 遞增所包裝的bufferNode的enqueued_count
void dec_enqueued_count(uint32_t validity_id); ?// 遞減所包裝的bufferNode的enqueued_count

private:
std::shared_ptr<SharedMemSegment> segment_;?? ??? ?// 用于給業務數據分配共享內存的SharedMemSegment對象指針
void* data_;?? ?// 所包裝的bufferNode對應的共享內存中的業務數據在當前進程中的地址
BufferNode* buffer_node_;?? ??? ?// 保存Buffer狀態的BufferNode
uint32_t original_validity_id_; ? // 構建該Buffer時BufferNode中validity_id的值
}

// Segment 內部類,內部包含BufferNode的集合,代表一段共享內存區間,提供共享內存塊的分配和回收,下面對接SharedMemSegment
class Segment
{
public:
Segment(uint32_t size, uint32_t payload_size, uint32_t max_allocations, const std::string& domain_name);?? ?// 構造函數,內部創建SharedMemSegment對象
~Segment();?? ?// 析構函數
SharedMemSegment::Id id();?? ?// 返回依賴的SharedMemSegment對象的Id
std::shared_ptr<Buffer> alloc_buffer(uint32_t size, steady_clock::time_point& max_blocking_time_point);?? ??? ?// 從SharedMemSegment對象中分配共享內存SharedMemBuffer
uint64_t mem_size();?? ??? ?// 返回SharedMemSegment對象映射的共享內存的總大小

private:
std::string segment_name_; ?// 所依賴的SharedMemSegment對象的名稱
std::list<BufferNode*> free_buffers_; ?// 空閑的BufferNode列表
std::list<BufferNode*> allocated_buffers_; ?// 已被分配的BufferNode列表
std::mutex alloc_mutex_;?? ??? ?// 分配共享內存時使用的鎖
std::shared_ptr<SharedMemSegment> segment_;?? ? // 依賴的SharedMemSegment對象
SharedMemSegment::Id segment_id_;?? ? // 依賴的SharedMemSegment對象的ID
uint64_t overflows_count_; ?// 分配共享內存失敗的次數
uint32_t free_bytes_; ?// 可分配的剩余共享內存大小(單位byte)

private:
void generate_segment_id_and_name(std::string& domain_name); ?// 生成SharedMemSegment對象的Name和ID
BufferNode* pop_free_node();?? ?// 從free_buffers_中彈出最后一個可用的BufferNode并且返回
void release_buffer(BufferNode* buffer_node);?? ??? ?// 從SharedMemSegment對象中釋放參數中BufferNode申請的共享內存
bool recover_buffers(uint32_t required_data_size);?? ?// 從allocated_buffers_中釋放沒有被listener使用的BufferNode直到free_bytes_大于required_data_size

}

// Listener 內部類,內部創建SharedMemGlobal::Listener對象用于監聽SharedMemGlobal::Port,讀取其他進程Port上Push的BufferDescriptor,進而獲取其他進程發出的共享數據
class Listener
{
public:
Listener(SharedMemManager* shared_mem_manager, std::shared_ptr<SharedMemGlobal::Port> port);?? ?// 構造函數,創建SharedMemGlobal::Listener對象
~Listener();?? ??? ?// 析構函數,取消創建SharedMemGlobal::Listener對象對Port的注冊
std::shared_ptr<Buffer> pop();?? ?// 從Port中返回可以讀取的第一個Buffer
void stop_processing_buffer(); ?// 通知Port當前Listener的狀態改為停止狀態
void regenerate_port();?? ??? ?// 當Port狀態不OK的時候,重新創建該PortID的Port
void close();?? ??? ?// 關閉當前Listener,這個會接觸正在運行的pop調用
private:
std::shared_ptr<SharedMemGlobal::Port> global_port_; ?? ?// 要監聽的SharedMemGlobal::Port對象
std::unique_ptr<SharedMemGlobal::Listener> global_listener_; ? // 內部創建用于監聽Port的SharedMemGlobal::Listener對象
uint32_t listener_index_;?? ??? ?// 當前global_listener_在SharedMemGlobal::Port對象的PortNode的Listener容器中的索引
SharedMemManager* shared_mem_manager_;?? ??? ?// 外部傳入的SharedMemManager對象,用于根據BufferDescriptor中的segment_id定位到其他進程創建的SharedMemSegment對象,從而訪問到對方進程創建的共享Buffer的BufferNode
std::atomic<bool> is_closed_;?? ??? ?// 標識Listener的close狀態,這個標志會影響Listener的pop函數,導致其退出等待
}

// Port 內部類,內部包含了SharedMemGlobal::Port對象
// 通過Port類,可以向SharedMemGlobal::Port對象中Push buffer,也可以創建Listener監聽該Port
class Port
{
public:
Port(SharedMemManager* shared_mem_manager, std::shared_ptr<SharedMemGlobal::Port> port, SharedMemGlobal::Port::OpenMode open_mode);?? ??? ?// 構造函數
bool try_push(const std::shared_ptr<Buffer>& buffer);?? ??? ?// 將Buffer送入Port(調用SharedMemGlobal::Port的try_push接口,并且增加BufferNode的enqueue_count)
void recover_blocked_processing();?? ??? ?// 當內部SharedMemGlobal::Port對象變為僵尸Port時,將該Port中所有正在被listener處理的Buffer的processing_count做遞減處理(配合regenerate_port一起使用)
// 因為BufferNode是跨進程共享的,如果不講processing_count,會造成該Buffer始終無法被回收
std::shared_ptr<Listener> create_listener(); // 在內部SharedMemGlobal::Port對象上創建并且返回一個Listener

private:
void regenerate_port(); ?// 首先釋放當前端口中還在讀取的Buffer(BufferNode中的process_count -1,一遍讓push的一方可以回收buffer),然后重新創建SharedMemGlobal::Port

private:
SharedMemManager* shared_mem_manager_;?? ??? ?// 共享內存對象
std::shared_ptr<SharedMemGlobal::Port> global_port_;
SharedMemGlobal::Port::OpenMode open_mode_; ??? ?// 端口打開模式
}

// Port 內部類
class SegmentWrapper
{
// SegmentWrapper的內部類 WatchTask,這是個單例類,用于監控進程中所有的SegmentWrapper
class WatchTask : public SharedMemWatchdog::Task
{
public:
static std::shared_ptr<WatchTask>& get();?? ??? ?// 返回 WatchTask 單例對象
void add_segment(std::shared_ptr<SegmentWrapper> segment); ? ?// 添加要監控的SegmentWrapper
void remove_segment(std::shared_ptr<SegmentWrapper> segment);?? ??? ?// 移除已監控的SegmentWrapper
~WatchTask();?? ??? ??? ?// 析構函數

private:
WatchTask() : watched_it_(watched_segments_.end()), shared_mem_watchdog_(SharedMemWatchdog::get());?? ? ?? ?// 構造函數
void update_watched_segments();?? ??? ?// 使用to_add_和to_remove_來更新watched_segments_容器中的SegmentWrapper
void run();?? ??? ?// 定時運行的檢測函數,調用watched_segments_中每個SegmentWrapper的check_alive進行Alive檢測,并且移除非Alive的Segment
private:
std::unordered_map<std::shared_ptr<SegmentWrapper>, uint32_t> watched_segments_;?? ??? ??? ??? ?// 保存已監控的SegmentWrapper
std::unordered_map<std::shared_ptr<SegmentWrapper>, uint32_t>::iterator watched_it_;?? ??? ?// watched_segments_容器的迭代器

std::vector<std::shared_ptr<SegmentWrapper>> to_add_;?? ??? ?// add_segment調用會現將要添加的SegmentWrapper放入這個to_add_容器,后面才會進入watched_segments_容器
std::vector<std::shared_ptr<SegmentWrapper>> to_remove_;?? ??? ?// remove_segment調用會現將要移除的SegmentWrapper放入這個to_remove_容器,后面從watched_segments_容器中移除
std::mutex to_add_remove_mutex_;?? ??? ?
}

private:
std::weak_ptr<SharedMemManager> shared_mem_manager_;?? ??? ?// 指向外部類SharedMemManager對象的弱指針
std::shared_ptr<SharedMemSegment> segment_;?? ??? ?// 需要監控的SharedMemSegment指針
SharedMemSegment::Id segment_id_;?? ??? ?// 需要監控的SharedMemSegment的Id
std::string segment_name_;?? ??? ?// 需要監控的SharedMemSegment的名稱
std::string lock_file_name_;?? ??? ?// 文件名稱為segment名稱+"_el"
std::atomic<std::chrono::steady_clock::time_point::rep> last_alive_check_time_;?? ??? ?// 最后一次檢查該Wrapper的時間
static constexpr uint32_t ALIVE_CHECK_TIMEOUT_SECS {5}; ? ?// 要求是5秒檢測一次

private:
bool check_alive(); ? ?// 該函數會被WatchTask調用,檢查所包含的SharedMemSegment是否已經失效了,如果失效了則讓SharedMemManager移除對該SharedMemSegment的映射
bool alive_check_timeout(const std::chrono::steady_clock::time_point& now) const;?? ??? ?// 用于告知WatchTask該Wrapper是否挺長時間沒檢測,需要檢測了,返回true則說明需要檢測
void close_and_remove();?? ??? ?// 關閉該SegmentWrapper,并且讓SharedMemManager移除對該SharedMemSegment的映射

public:
SegmentWrapper(std::weak_ptr<SharedMemManager> shared_mem_manager, std::shared_ptr<SharedMemSegment> segment_, SharedMemSegment::Id segment_id, const std::string& segment_name); // 構造函數
std::shared_ptr<SharedMemSegment> segment();?? ??? ?// 返回所監控的SharedMemSegment對象的指針
void update_alive_time(const std::chrono::steady_clock::time_point& time);?? ??? ?// 更新該Wrapper最后一次被檢測的時間點
}
=============================== 內部類 ?stop ==========================

public:
std::shared_ptr<Segment> create_segment(uint32_t size, uint32_t max_allocations); ? ?// 創建Segment,內部創建SharedMemSegment,并且包含空閑的BufferNode列表和已分配的BufferNode列表
uint32_t segment_allocation_extra_size(uint32_t max_allocations); ? ?// 計算Segment的額外空間 (需要把BufferNode的占用也算進去)
std::shared_ptr<Port> open_port(uint32_t port_id, uint32_t max_descriptors, uint32_t healthy_check_timeout_ms, SharedMemGlobal::Port::OpenMode open_mode); ?// 打開Port(不存在的情況下會創建)
void remove_port(uint32_t port_id); ? ?// 移除Port共享內存在當前進程的映射(SharedMemSegment::remove)
SharedMemGlobal* global_segment(); ? ?// 返回SharemMemManager內部的SharedMemGlobal對象指針
uint64_t segments_mem();?? ??? ?// 返回內部Segment所包裝的SharedMemSegment映射的共享內存區域的大小

private:
std::shared_ptr<Port> regenerate_port(std::shared_ptr<SharedMemGlobal::Port> port,SharedMemGlobal::Port::OpenMode open_mode); ?// 根據參數port的port_id重新創建Port
std::shared_ptr<SharedMemSegment> find_segment(SharedMemSegment::Id id);?? ??? ?// 通過Id映射SharedMemSegment到當前進程并且使用SegmentWrapper包裝后進行管理(生命周期)
void release_segment(SharedMemSegment::Id id);?? ??? ?// 取消ids_segments_中已經映射的SharedMemSegment
void remove_segments_from_watch();?? ??? ?// 移除所有的SegmentWrapper的Watch
private:
std::mutex ids_segments_mutex_;?? ??? ?// 操作ids_segments_時需要獲取的鎖
uint64_t segments_mem_;?? ??? ?// 內部Segment所包裝的SharedMemSegment映射的共享內存區域的大小
SharedMemGlobal global_segment_;?? ??? ?// SharedMemGlobal對象
std::unordered_map<SharedMemSegment::Id::type, std::shared_ptr<SegmentWrapper>, std::hash<SharedMemSegment::Id::type>> ids_segments_; ?// 管理所有打開并且映射到當前進程的SharedMemSegment
}
=============================== SharedMemManager ?Stop ==========================


本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/91610.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/91610.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/91610.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

sqli-labs:Less-5關卡詳細解析

1. 思路&#x1f680; 本關的SQL語句為&#xff1a; $sql"SELECT * FROM users WHERE id$id LIMIT 0,1";注入類型&#xff1a;字符串型&#xff08;單引號包裹&#xff09;提示&#xff1a;參數id需以閉合 但有意思的是&#xff0c;php代碼的輸出語句不是如下這種…

標準項目-----網頁五子棋(4)-----游戲大廳+匹配+房間代碼

頁面實現 hall.html <!DOCTYPE html> <html lang"ch"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>游戲大廳</title><l…

MySQL分析步

MySQL分析 -- 庫名 set dbName bsa_crmeb_bak; -- 表名 set tableName bsa_crmeb_bak;-- 查看bsa_crmeb_bak數據庫基本信息 SELECTSCHEMA_NAME AS 數據庫名,DEFAULT_CHARACTER_SET_NAME AS 字符集,DEFAULT_COLLATION_NAME AS 排序規則 FROM information_schema.SCHEMATA WHER…

工程化(二):為什么你的下一個項目應該使用Monorepo?(pnpm / Lerna實戰)

工程化(二)&#xff1a;為什么你的下一個項目應該使用Monorepo&#xff1f;&#xff08;pnpm / Lerna實戰&#xff09; 引子&#xff1a;前端項目的“孤島困境” 隨著你的項目或團隊不斷成長&#xff0c;一個棘手的問題會逐漸浮現&#xff1a;代碼該如何組織&#xff1f; 最…

應用藥品注冊證識別技術,為醫藥行業的合規、高效與創新發展提供核心驅動力

在醫藥行業的龐雜數據海洋中&#xff0c;藥品注冊證&#xff08;如中國的“國藥準字”、美國的NDA/ANDA批號&#xff09;是藥品合法上市流通的“身份證”。面對海量的證書審核、錄入與驗證需求&#xff0c;傳統人工處理方式不僅效率低下、成本高昂&#xff0c;更易因疲勞導致差…

Spring Boot 2.1.18 集成 Elasticsearch 6.6.2 實戰指南

Spring Boot 2.1.18 集成 Elasticsearch 6.6.2 實戰指南前言&#xff1a;一. JAVA客戶端對比二. 導入數據2.1 分析創建索引2.2 代碼實現三. ElasticSearch 查詢3.1 matchAll 查詢3.2 term查詢3.3 match查詢3.4 模糊查詢3.5 范圍查詢3.6 字符串查詢3.7 布爾查詢3.8 分頁與排序3.…

向量投影計算,舉例說明

向量投影計算,舉例說明 向量投影是指將一個向量(設為向量b\mathbf{b}b)投射到另一個向量(設為向量a\mathbf{a}a)所在直線上,得到一個與a\mathbf{a}

如何在技術世界中保持清醒和高效

“抽象泄露&#xff0c;是存在的&#xff0c;但你需要了解多少&#xff0c;需要理解多深&#xff0c;這一點是因人而異的&#xff0c;絕對不是別人能夠建議的。每個人只會站在自己的立場上去建議別人怎么做。”在寫下這句話時&#xff0c;身為一個技術開發者&#xff0c;我似乎…

服裝公司數字化轉型如何做?

WL貿易集團公司&#xff08;以下簡稱WL&#xff09;自2012年成立以來&#xff0c;在十余年的發展歷程中不斷蛻變與升級。公司始終秉持“時尚與品質優先”的核心經營理念&#xff0c;通過嚴格執行高標準、嚴要求&#xff0c;牢牢把握產品品質與交貨周期兩大關鍵&#xff0c;贏得…

GM DC Monitor 之 銀河麒麟 Docker 部署安裝手冊

官方網站&#xff1a;www.gm-monitor.com 本手冊以銀河麒麟為例&#xff0c;介紹在 Linux 系統上安裝和配置DOCKER服務的詳細步驟 一、以root用戶執行以下操作命令 1、環境優化 modprobe br_netfilter cat <<EOF > /etc/sysctl.d/docker.conf net.bridge.bridge-n…

網絡編程接口bind學習

1、概述下面2個問題你會怎么回答呢?1、bind如果綁定0號端口&#xff0c;可以工作么&#xff0c;如果能正常工作&#xff0c;綁定的什么端口 2、客戶端可以調用bind么2、解析2.1、bind如果綁定0號端口&#xff0c;可以工作么&#xff0c;如果能正常工作&#xff0c;綁定的什么端…

FinOps X 2025 核心發布:AI 時代下的 FinOps 轉型

2025年&#xff0c;人工智能技術的突破性發展正深刻重塑商業與技術格局&#xff0c;智能技術已成為各領域創新的核心驅動力。在此背景下&#xff0c;FinOps X 2025 圍繞 AI 技術對財務運營&#xff08;FinOps&#xff09;的革新作用展開深度探討&#xff0c;重點呈現了以下關鍵…

使用Min-Max進行數據特征標準化

在數據處理過程中&#xff0c;標準化是非常重要的步驟之一&#xff0c;特別是在機器學習和數據分析中。Min-Max標準化&#xff08;也稱為歸一化&#xff09;是一種常用的數據標準化方法&#xff0c;它通過將數據縮放到一個指定的范圍&#xff08;通常是0到1之間&#xff09;&am…

【Dart 教程系列第 51 篇】Iterable 中 reduce 函數的用法

這是【Dart 教程系列第 51 篇】,如果覺得有用的話,歡迎關注專欄。 博文當前所用 Dart SDK:3.5.4 文章目錄 一:reduce 作用 二:舉例說明 1:求和 2:查找最大/最小值 3:字符串拼接 4:自定義對象合并 三:注意事項 一:reduce 作用 reduce 是 Iterable 的一個方法,用于…

使用VSCode配置Flutter

本周&#xff08;學期第四周&#xff09;任務&#xff1a; 1.簡單學習Flutter&#xff0c;完成環境安裝與配置 2.探索Flutter與Unity集成方案 一、Flutter環境配置 根據Flutter官方文檔進行環境配置&#xff1a;開發 Android 應用 | Flutter 中文文檔 - Flutter 中文開發者網…

React 開發中遇見的低級錯誤

1.useState不起效果 異步 改用 useRef2.map循環{ WechatQuestionnaireData && WechatQuestionnaireData?.questions?.map((item: any) > (<div className{styles[title]}>{item.questionTitle}</div>))}注意這里的 》 后面是括號 我開始寫成{} 好久…

iphone手機使用charles代理,chls.pro/ssl 后回車 提示瀏覽器打不開該網頁

iphone手機使用charles代理,chls.pro/ssl 后回車 提示瀏覽器打不開該網頁) 1、問題現狀&#xff1a; Charles安裝證書異常問題&#xff0c;網頁訪問chls.pro/ssl提示網頁打不開&#xff0c;在charles頁面有鏈接&#xff0c;可以看到http請求和https就是看不到詳細內容 2、解決方…

第11屆藍橋杯Python青少組_國賽_高級組_2020年10月真題

第11屆藍橋杯Python青少組_國賽_高級組_2020年10月真題 更多內容請查看網站&#xff1a;【試卷中心 -----> 藍橋杯----> Python ----> 國賽】 網站鏈接 青少年軟件編程歷年真題模擬題實時更新 一、選擇題 第 1 題 執行以下程序,輸出的結果是 ( )。 print( 0.1 …

如何處理Y2K38問題

一、什么是Y2K38問題Y2K38 問題&#xff0c;也稱為 2038年問題&#xff0c;是一個類似于Y2K問題的計算機日期處理問題。1、什么是Y2K38 問題&#xff1f;Y2K38 問題是指在計算機系統中&#xff0c;某些使用 32位有符號整數 來存儲時間的程序&#xff0c;將在 2038年1月19日03時…

LeetCode熱題100——146. LRU 緩存

https://leetcode.cn/problems/lru-cache/description/?envTypestudy-plan-v2&envIdtop-100-liked 請你設計并實現一個滿足 LRU (最近最少使用) 緩存 約束的數據結構。 實現 LRUCache 類&#xff1a; LRUCache(int capacity) 以 正整數 作為容量 capacity 初始化 LRU 緩…