完善博文 共享內存一寫多讀無鎖實現的代碼邏輯部分

使用共享內存(內存映射)實現發布訂閱模式

  • 多進程實現PubSub發布訂閱模式,從而實現進程間的通信。
  • 通信方式可以是TCP/UDP,管道Pipe/消息隊列,共享內存shared memory等等。其中TCP/UDP的方式是可以用作局域網以及跨平臺的通信,Pipe/消息隊列是進程間基于系統實現比較基礎的通信,這兩者有大量優秀的第三方庫支持,如ZeroMQ,只要加入我們自定義數據的轉換方式即可方便實現;而共享內存是實現進程間通信最快的方式,但因為共享內存的設計并不是用來做類似PubSub這種模式的實現的,并且共享內存實質上就是一段進程間共享的內存空間,使用自由度是極高的,所以也很少有第三方庫來實現共享內存方式的進程間通信。
  • 因此本文的重點是如何使用共享內存shared memory來實現高效的PubSub發布訂閱模式。

需求

  • 消息通過事先分配好的共享內存空間來傳遞
  • 需要有一定的機制來管理消息的發送(寫)和接收(讀)
  • 需要實現發布訂閱模式,也就是一個發布者(一寫)多個訂閱者(多讀)
  • 考慮到平臺的原因,最后采用了文件映射內存的這種方式,在各種系統中都有比較通用的實現

邏輯分析

  • 顯然,只要創建了一個文件并且設置好需要的大小,即可以使用mmap映射到進程的內存空間,并且在退出時可以用munmap將映射釋放掉。但是空間真正的釋放是要把文件刪掉的,因此我們需要一個計數器來記錄使用這塊共享內存的進程數,類似共享指針shared_ptr的實現,在計數為零時把文件刪掉。在修改這個計數的時候還需要一把進程間讀寫鎖:
  • 對于只有單個訂閱者,數據之后包含一個標志位,發布者寫完后置為true,訂閱者讀完之后置為false,可能再加上一個信號燈的控制,來避免頻繁讀寫;
  • 對于多個訂閱者,數據中的這個標志位變成一個計數,發布者寫完之后將計數器置為訂閱者的數量,訂閱者讀完之后將計數器減1,再加上一個進程條件變量的控制,來避免頻繁讀寫。
  • 這兩種方案都有一定的弊端,最大的問題在于,訂閱者還需要修改共享內存的內容,這樣就發揮不出讀寫鎖支持多讀的優勢了。我們需要一個更好的機制。
  • 一個簡單的實現是數據中帶有一個單調遞增的標簽,訂閱者讀到數據后本地保存一下這個標簽的值,如果下次讀到的這個值不比保存的值大,就認為讀到了舊數據,忽略之。這個標簽比較好的實現是用當前的系統時間而不是計數,因為發布者可能會重啟清零,就算重啟后可以從已經寫入的數據中讀取,但后面為了實現無鎖隊列會讓這個事情變得麻煩。這樣還有一個問題是,依然會頻繁地去讀取這個標簽。因此需要加入進程條件變量的控制來減少這種頻繁。接下來是2,實現消息發送(寫)和接收(讀)的管理。因為我們已經有了一把讀寫鎖,很自然地想到可以用它來管理讀寫啊。事實上并不是這樣,因為發布者寫完數據之后可能會有一段時間不會占有寫鎖,這時候就要一種機制來限制訂閱者不會重復來讀這個數據。對于這個實現,已有的方案有:
  • 對于每一個訂閱者都開辟一塊共享內存,可以按一對一的方式同時復制多份數據;
  • 使用生產消費模式,使用循環隊列來實現讀寫分離。
  • 第1種方案是解決了讀寫鎖爭搶的問題,但是增加了內存復制的開銷,反而沒有第2種方案好。但是我們要稍微修改一下傳統的生產消費模式的實現,只用一個指針來指向最新的數據。之所以這樣做是因為內存是事先分配好的,我們把它改造成環形的內存緩沖區,很難保證數據讀取的序列性;再者就是循環的尾指針應該由訂閱者自己來維護,因為每個訂閱者處理的速度是不一樣的。
  • 如此一來,所有數據的修改完全是由發布者來做的,也就是說對于訂閱者來說,這是個無鎖隊列:

代碼實現

#include <iostream>
#include <cstring>
#include <vector>
#include <functional>
#include <memory>
#include <sys/mman.h>
#include <atomic>
#include <thread>
#include <sys/stat.h>
#include <unistd.h>
#include <sys/types.h>
#include <fcntl.h>struct ShmData{bool written_;long timestamp_;size_t size_;char data_[1];ShmData():written_(false){}void Write(const char *data,const size_t len){written_ = false;memcpy(data_,data,len);size_ = len;timestamp_ = GetTimestamp();written_ = true;}bool Read(std::vector<char>* data,long* time = nullptr){if (!written_){return false;}if (time){*time = timestamp_;}data->resize(size_);memcpy(data->data(),data_,size_);return true;}static long GetTimestamp(){struct timespec ts;clock_gettime(CLOCK_REALTIME,&ts);return ts.tv_sec * 1000000 + ts.tv_nsec / 1000;}
};struct ShmQueue{size_t size_;int count_;int head_;char data_[1];ShmQueue(const size_t size,const int count):size_(sizeof(ShmData) + size),count_(count),head_(0){new(data_)ShmData;}void Write(const char* data,const size_t len){const int next = (head_ + 1) % count_;(reinterpret_cast<ShmData *>(data_ + next * size_))->Write(data,len);head_ = next;}bool Read(std::vector<char>*data,long* time){return (reinterpret_cast<ShmData *>(data_ + head_ * size_))->Read(data,time);}
};struct ShmSlice{int attached_;pthread_rwlock_t rwlock_;pthread_mutex_t mutex_;pthread_cond_t cond_;char data_[1];ShmSlice(const size_t size,const int count,const bool init = false){if (init){//init rwlockpthread_rwlockattr_t rwattr;pthread_rwlockattr_init(&rwattr);pthread_rwlockattr_setpshared(&rwattr,PTHREAD_PROCESS_SHARED);pthread_rwlock_init(&rwlock_,&rwattr);//init mutexpthread_mutexattr_t mattr;pthread_mutexattr_init(&mattr);pthread_mutexattr_setpshared(&mattr,PTHREAD_PROCESS_SHARED);pthread_mutex_init(&mutex_,&mattr);//init condition variablepthread_condattr_t cattr;pthread_condattr_init(&cattr);pthread_condattr_setpshared(&cattr,PTHREAD_PROCESS_SHARED);pthread_cond_init(&cond_,&cattr);//init shm queuenew(data_)ShmQueue(size,count);}LockWrite();if (init){attached_ = 1;} else{++attached_;}UnLockWrite();}~ShmSlice(){LockWrite();UnLockWrite();if (0 == attached_){pthread_cond_destroy(&cond_);pthread_mutex_destroy(&mutex_);pthread_rwlock_destroy(&rwlock_);}}int count(){LockRead();const int count = attached_;UnlockRead();return count;}void Write(const char* data,const size_t len){LockWrite();(reinterpret_cast<ShmQueue*>(data_))->Write(data,len);UnLockWrite();}bool Read(std::vector<char> *data,long* time){return (reinterpret_cast<ShmQueue *>(data_))->Read(data,time);}void LockWrite(){pthread_rwlock_wrlock(&rwlock_);}void UnLockWrite(){pthread_rwlock_unlock(&rwlock_);}void LockRead(){pthread_rwlock_rdlock(&rwlock_);}void UnlockRead(){pthread_rwlock_unlock(&rwlock_);}void LockMutex(){while (EOWNERDEAD == pthread_mutex_lock(&mutex_)){UnlockMutex();}}void UnlockMutex(){pthread_mutex_unlock(&mutex_);}void NotifyOne(){pthread_cond_signal(&cond_);}void NotifyAll(){pthread_cond_broadcast(&cond_);}void wait(){LockMutex();pthread_cond_wait(&cond_,&mutex_);UnlockMutex();}bool WaitFor(struct timespec *ts,const std::function<bool()>&cond){if (cond && cond()){return true;}LockMutex();pthread_cond_timedwait(&cond_,&mutex_,ts);UnlockMutex();bool ret;if (cond){ret = cond();} else{struct timespec now;clock_gettime(CLOCK_REALTIME,&now);ret = now.tv_sec < ts->tv_sec || (now.tv_sec == ts->tv_sec && now.tv_nsec <= ts->tv_nsec);}return ret;}
};class ShmManger{
public:ShmManger(std::string file_name,const int size): name_(std::move(file_name)),size_(sizeof(ShmSlice) + sizeof(ShmQueue) + 3 * (sizeof(ShmData) + size)){bool init = false;//open file descriptorint fd = open(name_.c_str(),O_RDWR | O_CREAT | O_EXCL,0600);if(fd < 0){fd = open(name_.c_str(),O_RDWR,0600);}else{//set file sizestruct stat fs;fstat(fd,&fs);if (fs.st_size < 1){ftruncate(fd,size_);}init = true;}//mmapvoid *shmaddr = mmap(NULL,size_,PROT_READ | PROT_WRITE,MAP_SHARED,fd,0);new (shmaddr) ShmSlice(size,3,init);auto deleter = [](ShmSlice *ptr){ptr->~ShmSlice();};slice_ = std::shared_ptr<ShmSlice>(reinterpret_cast<ShmSlice *>(shmaddr),deleter);close(fd);}~ShmManger(){running_ = false;slice_->NotifyAll();if (read_thread_.joinable()){read_thread_.join();}const int count = slice_->count();auto ptr = slice_.get();slice_.reset();if(count > 1){//unmapmunmap(ptr,size_);} else{//remove fileremove(name_.c_str());}}void Publish(const std::vector<char> &data){slice_->Write(data.data(),data.size());slice_->NotifyAll();}void Subscribe(std::function<void (const std::vector<char>&)>callback){callback = std::move(callback);running_ = true;read_thread_ = std::thread(&ShmManger::ReadThread,this);}private:void ReadThread(){long read_time = 0;while (running_){std::vector<char> data;long time;struct timespec ts;clock_gettime(CLOCK_REALTIME,&ts);ts.tv_sec += 5;if (!slice_->WaitFor(&ts,[&]{return slice_->Read(&data,&time) && time > read_time;})){continue;}read_time = time;//deal with datacallback_(data);}}std::string name_;int size_;std::shared_ptr<ShmSlice>slice_;std::function<void(const std::vector<char>&)>callback_;std::atomic_bool running_;std::thread read_thread_;
};
int main() {std::cout << "Hello, World!" << std::endl;return 0;
}

參考鏈接

  • 共享內存一寫多讀無鎖實現
  • 共享內存消息隊列
  • 【轉載】同步和互斥的POSIX支持(互斥鎖,條件變量,自旋鎖)
  • Linux線程-互斥鎖pthread_mutex_t
  • C語言open()函數:打開文件函數
  • C語言mmap()函數:建立內存映射

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/446803.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/446803.shtml
英文地址,請注明出處:http://en.pswp.cn/news/446803.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

想對你說的話,就在這里!

甜(Tu)言(Wei)蜜(Qing)語(Hua)最近在github上看到了一個朋友開發的 土味情話在線生成器 &#xff0c;感覺還不錯&#xff0c;在這里推薦一下。 github地址&#xff1a;在線生成土味情話

linux讀寫文件 簡單版

代碼 //write void write_file(const std::string file_name){FILE *fp nullptr;fp fopen(file_name.c_str(),"w");fprintf(fp,"This is testing for mutex\n");fclose(fp); } //read void read_file(const std::string file_name){std::ifstream fp(fi…

具有中國風的傳統顏色(炫酷)

一個小小的中國風的傳統顏色&#xff0c;你覺得應該是什么樣子的呢&#xff1f; 看了下面這個&#xff0c;我一個搞移動開發的都想去搞前端開發了。 廢話不多說了&#xff0c;直接看效果&#xff1a; 訪問地址&#xff1a;中國傳統顏色手冊 github地址&#xff1a;Chinese…

Android Studio安裝問題及填坑

安裝過程 安裝Android Studio 其他問題 1.Android Studio出現Error:Unable to tunnel through proxy. Proxy returns “HTTP/1.1 400 Bad Request” 2.Could not resolve all artifacts for configuration :classpath 3.!No cached version of com.android.tools.build:gr…

Linux strtol將十六進制轉化為十進制

代碼 #include <iostream> #include "crypto_util.h"int get_file(const std::string file_name){size_t get_file_id 0;std::cout << hsm::common::get_md5_digest_hex(file_name) << std::endl;get_file_id strtol(reinterpret_cast<const…

Android WebView使用攻略

目錄前言一、簡介二、作用三、使用介紹1、Webview類常用方法1.1、加載url1.2、WebView的狀態1.3、關于前進 / 后退網頁1.4、清除緩存數據2、常用工具類2.1、WebSettings類2.2、WebViewClient類2.3、WebChromeClient類3、注意事項&#xff1a;如何避免WebView內存泄露&#xff1…

C++If與Switch語句

IF if語句不加括號就只是一個語句 舉例: int a5,b2; if(a)//按邏輯值來理解,0為假,其他為真,這里等價于a!0—>a為真時 ab; else ba; 計算三角形面積代碼 #include<iostream> #include<cmath>//數學公式庫 #include<iomanip> //格式控制 using namesp…

linux fork多進程 demo

注釋 使用系統調用fork()創建三個子進程&#xff1b;各個子進程顯示和輸出一些提示信息和自己的進程標識符&#xff1b;父進程顯示自己的進程ID和一些提示信息&#xff0c;然后調用waitpid()等待多個子進程結束&#xff0c;并在子進程結束后顯示輸出提示信息表示程序結束。 代…

Android WebView 與 JS 交互

目錄二、具體分析2.1 Android通過WebView調用 JS 代碼方式1&#xff1a;通過WebView的loadUrl()方式2&#xff1a;通過WebView的evaluateJavascript()方法對比使用建議2.2、JS通過WebView調用 Android 代碼2.2.1、方法分析方式1&#xff1a;通過 WebView的addJavascriptInterfa…

關于鎖的注意事項

文件鎖 Linux 提供了 fcntl 系統調用&#xff0c;可以鎖定文件但是文件鎖是和進程相關聯的&#xff0c;一個進程中的多個線程/協程對同一個文件進行的鎖操作會互相覆蓋掉&#xff0c;從而無效。fcntl 創建的鎖是建議性鎖&#xff0c;只有寫入的進程和讀取的進程都遵循建議才有效…

安卓實現登錄與注冊界面

使用Intent與Bundle傳遞數據 登錄界面login.xml 1.使用Relativelayout相對布局 <?xml version"1.0" encoding"utf-8"?> <RelativeLayout xmlns:android"http://schemas.android.com/apk/res/android"android:layout_width"mat…

Android Button字母自動全部大寫的問題

兩種解決方案&#xff1a; 方法一&#xff1a; 在 xml 布局中設置屬性 android:textAllCaps"false" <Buttonandroid:layout_width"wrap_content"android:layout_height"match_parent"android:text"添加動作組"android:textAllCap…

安卓Activity與intent跳轉

Activity生命周期 Activity啟動模式 Intent跳轉 _________startActivity() 1.Intent intentnew Intent(A.this,B.class); startActivity(intent); 2.startActivity(new Intent(A.this,B.class)); _________startActivityForResult() Intent intentnew Intent(A.this,B.class…

將讀寫鎖放到共享內存中,實現進程之間對數據的讀寫訪問控制

代碼 #include <unistd.h> #include <sys/types.h> #include <sys/ipc.h> #include <sys/shm.h> #include <assert.h> #include <pthread.h> #include <stdio.h> #include <stdlib.h> #include <fstream> #include <…

Android WebView 使用漏洞

目錄一、類型二、具體分析2.1、WebView任意代碼執行漏洞2.1.1、addJavascriptInterface 接口引起遠程代碼執行漏洞漏洞產生原因解決方案關于該方法的其他細節總結2.1.2、searchBoxJavaBridge_接口引起遠程代碼執行漏洞漏洞產生原因解決方案2.1.3、accessibility和 accessibilit…

將讀寫鎖放到共享內存,實現進程之間對于同一文件的讀寫操作

思路 將讀寫鎖和讀寫鎖的屬性以及一個用于存儲共享內存的地址的int型變量三者封裝成一個struct結構將這個結構體放到共享內存中&#xff0c;以及將讀寫鎖的屬性設置成全局性質&#xff0c;然后使用這個屬性初始化鎖&#xff0c;以及將鎖的地址關聯到結構體的內存地址這個變量定…

Android Studio 查看頁面布局層次結構

Android Studio有個可以查看手機上app頁面布局層次結構的工具。可以協助我們對布局進行優化&#xff0c;去掉沒有必要的節點等&#xff0c;通過這個工具可以清晰的看見頁面整個結構&#xff1b;廢話少說直接上圖&#xff0c;再說過程。 這就是我們想要看到的&#xff0c;每個節…

Java web后端 第一章框架搭建

Redis 通用Mapper 通用Mapper->MyBatis動態SQL封裝包,增刪改查 0 SQL語句 PageHelper PageHelper–>實現分頁操作,不需要limit,直接使用靜態方法 電商系統技術特點 分布式(數據很多,一臺電腦存儲一部分數據) 高并發,集群(并發量很高,后臺不只一個電腦) ,海量數據 主…

進程鎖 讀寫文件的小例子 C++代碼

代碼 #include <unistd.h> #include <sys/types.h> #include <sys/ipc.h> #include <sys/shm.h> #include <cassert> #include <pthread.h> #include <cstdio> #include <cstdlib> #include <fstream> #include <io…

Java 中sleep()與wait()的區別

目錄一、原理不同二、鎖的處理機制不同三、使用的區域不同四、異常捕獲不同五、總結一、原理不同 sleep()是Thread類的靜態方法&#xff0c;是線程用來控制自身流程的&#xff0c;它會使此線程暫停執行指定的時間&#xff0c;而把執行機會讓給其他的線程&#xff0c;等到計時時…