QT聊天項目DAY18

1.文件傳輸

1.1 客戶端

采用分塊傳輸(20MB/塊),以及MD5碼校驗并將讀出的二進制數據采用Base64編碼進行傳輸

1.1.0 通信協議

1.1.1 UI

采用垂直布局,該布局大小為570 * 160,間隔全是0,UI方面不詳細介紹了

1.1.2 MainWindow

頭文件

#ifndef MAINWINDOWS_H
#define MAINWINDOWS_H#include <QObject>
#include <QString>
#include <QtWidgets/QMainWindow>
#include "ui_mainWindows.h"class mainWindows : public QMainWindow
{Q_OBJECTpublic:mainWindows(QWidget *parent = nullptr);~mainWindows();private:void InitUI();void BindSlots();private slots:void OnFileBtnClicked();void OnConnectBtnClicked();void OnUploadBtnClicked();void OnTestBtnClicked();private:Ui::mainWindowsClass ui;QString _fileName;                                                                              // 點擊選擇文件時,保存選中的文件路徑QString _fileMD5;                                                                               // 文件的MD5碼QString _serverIP;int _serverPort;
};#endif // MAINWINDOWS_H

構造函數

設置按鈕的可用狀態

綁定按鈕的槽函數,實時更新進度條,當連接成功時設置按鈕的可用狀態,以及網絡錯誤給出提示

點擊選擇文件按鈕時,保存文件的路徑信息

連接按鈕點擊時,向服務器發起連接

測試通信

上傳文件,分段發送

1.打開文件

2.計算MD5值

3.獲取文件名以及文件大小

4.計算文件需要分幾段去發送,目前我定義一段為20M

5. 循環發送文件之前的一些準備

6.循環發送文件內容

首先讀取文件內容,第一次讀取,這段內容的序號就是1,然后將數據轉換成Base64,最后標記當前分段是否是要發送的文件末尾,然后TCP使用json序列化,將所有數據存儲到Json中進行發送

7.最后關閉文件

1.1.3 TcpMgr

頭文件

#ifndef TCPMGR_H
#define TCPMGR_H#include <QObject>
#include <QTcpSocket>
#include "Singletion.h"class TcpMgr  : public QObject, public Singletion<TcpMgr>
{Q_OBJECTfriend class Singletion<TcpMgr>;public:~TcpMgr();// 刪除拷貝和復制TcpMgr(const TcpMgr&) = delete;TcpMgr& operator=(const TcpMgr&) = delete;public:void ConnectToHost(const QString& hostName, quint16 port);void DisconnectFromHost();void SendData(const quint16 MsgID, QByteArray& data);bool IsConnected() const;private:TcpMgr(QObject* parent = 0);void ProcessData();void BindSlots();signals:void sigSendMsg(const quint16 MsgID, QByteArray& data);void sigConnected(bool isConnected);void sigLogicProcess(int MsgID, QJsonObject& jsonObj);void sigNetError(QString errorMsg);private slots:void slotConnected();																						// 當連接成功建立時void slotReadyRead();																						// TCP緩存有數據可讀時void slotDisconnected();																					// 當連接斷開時void slotErrorOccured(QAbstractSocket::SocketError socketError);											// 當發生錯誤時void slotSendMsg(const quint16 MsgID, QByteArray& data);													// 發送消息private:QTcpSocket* m_tcpSocket;QByteArray _buffer;
};#endif // TCPMGR_H

構造函數

QT會自己為槽函數設置隊列,不用自己在設置隊列和互斥,直接用信號,最方便

解析服務器返回的數據

1.將服務器發來的數據讀取出來并進行存儲,然后循環取出每一個消息,ID+DATA_LEN+DATA有效的避免了粘包問題

2.取出消息頭部(6字節)

3.取出消息ID(2字節)和消息長度(4字節)

4.判斷緩沖區的內容是否大于該消息總長度

5.讀取數據并將這段數據從緩沖區中移除,將二進制文件解析成Json

發送消息

ID + DATA_LEN + DATA

1.1.4 處理服務器響應報文 LogicSystem

TcpMgr將消息整合成Json格式,交給LogicSystem處理

構造函數

創建工作線程,該線程負責處理消息隊列里的任務

工作線程,當有信息添加進來會立即調用槽函數,該函數會在線程中執行,然后創建消息副本,處理上次發送的消息

class Worker : public QObject
{Q_OBJECTpublic:~Worker();Worker(QObject* parent = 0);friend void operator<<(Worker* worker, Message& message);private:void RegisterFunc(quint16 id, Func func);void DisTestMessage(QJsonObject& json);void DisUploadMessage(QJsonObject& json);void AddTask(Message& message);void ProcessTask(Message& message);public slots:void ConsumMessage();signals:void sigTransFile(int transSize);private:QMap<quint16, Func> m_funcMap;QQueue<Message> m_messageQueue;QMutex m_mutex;
};

添加任務

消費任務

處理測試信息

處理文件上傳響應,更新進度條

優雅退出

1.2 服務器

整個服務器框架,依舊采用事件循環池+監聽連接+將連接轉交給CSession(接受和發送數據全在線程中處理)+具體的處理放在LogicSystem中進行

LogicSystem處理消息

將消息添加到消息隊列中

void LogicSystem::PostMsgToQueue(LogicNode* logicNode)
{unique_lock<mutex> lock(_mutex);_msgQueue.push(logicNode);if (_msgQueue.size() >= 1){lock.unlock();																									// 解鎖_consumer.notify_one();}
}

1.2.1 單線程處理客戶端請求

LogicSystem構造函數

在該線程中循環處理消息,消息隊列為空就阻塞等待被喚醒

LogicSystem::LogicSystem():bStop(false)
{RegisterCallBack();_outPath = get<string>(ServerStatic::ParseConfig("FileOutPath", "Path"));											// 獲取文件存儲路徑_workThread = thread(&LogicSystem::DealMsg, this);																	// 后臺線程不停的處理消息
}

循環處理消息,沒有消息就阻塞,被喚醒就處理消息,如果析構了,就處理完剩余的消息

調用客戶端請求對應的處理函數

void LogicSystem::RegisterCallBack()
{_funcMap[MSG_ID_TEST_REQUEST] = bind(&LogicSystem::DisTestMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);_funcMap[MSG_ID_UPLOAD_FILE_REQUEST] = bind(&LogicSystem::DisUploadFileMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
}

處理測試消息

處理文件上傳消息

1.Json解析

2.解碼

3.取出客戶端發送的內容

4.判斷是否是第一個包,來創建文件打開方式

5.文件打開與寫入

6.返回上傳文件響應

單線程LogicSystem

#ifndef LOGICSYSTEM_H
#define LOGICSYSTEM_H
#include "GlobalHead.h"
#include "Singletion.h"
#include "Struct.h"// 把函數地址當作變量類型來使用
class CSession;
class LogicNode;
using FunCallBack = function<void(CSession* session, const short& msg_id, const string& msg_data)>;class LogicSystem : public std::enable_shared_from_this<LogicSystem>, public Singletion<LogicSystem>
{friend class Singletion<LogicSystem>;																// 為了訪問該類的構造函數public:~LogicSystem();																						void PostMsgToQueue(LogicNode* logicNode);private:LogicSystem();void DealMsg();																						// 處理信息void RegisterCallBack();																			// 注冊回調函數void ConsumerFunc();private:void DisTestMessage(CSession* session, const short& msg_id, const string& msg_data);				// 處理測試消息void DisUploadFileMessage(CSession* session, const short& msg_id, const string& msg_data);			// 處理上傳文件消息private:thread _workThread;																					// 工作線程處理消息queue<LogicNode*> _msgQueue;mutex _mutex;condition_variable _consumer;																		// 消費者,條件變量bool bStop;map<short, FunCallBack> _funcMap;																	// 回調函數映射表unordered_map<int, UserInfo*> _userMaps;string _outPath;																					// 文件存儲路徑
};#endif // LOGICSYSTEM_H#include "LogicSystem.h"
#include <fstream>
#include "CSession.h"
#include "MsgNode.h"
#include "Enum.h"
#include "ServerStatic.h"
#include "UserMgr.h"
#include "base64.h"using namespace std;LogicSystem::LogicSystem():bStop(false)
{RegisterCallBack();_outPath = get<string>(ServerStatic::ParseConfig("FileOutPath", "Path"));											// 獲取文件存儲路徑_workThread = thread(&LogicSystem::DealMsg, this);																	// 后臺線程不停的處理消息
}LogicSystem::~LogicSystem()
{bStop = true;_consumer.notify_all();_workThread.join();
}void LogicSystem::PostMsgToQueue(LogicNode* logicNode)
{unique_lock<mutex> lock(_mutex);_msgQueue.push(logicNode);if (_msgQueue.size() >= 1){lock.unlock();																									// 解鎖_consumer.notify_one();}
}void LogicSystem::DealMsg()
{while (true){unique_lock<mutex> lock(_mutex);_consumer.wait(lock, [this] {return!_msgQueue.empty(); });														// 等待隊列不為空if (bStop){while (!_msgQueue.empty()){ConsumerFunc();}break;}ConsumerFunc();}
}void LogicSystem::ConsumerFunc()
{LogicNode* logicNode = _msgQueue.front();short id = logicNode->_rn->_msg_id;auto it = _funcMap.find(id);if (it != _funcMap.end()){it->second(logicNode->_cs.get(), id, string(logicNode->_rn->_data, logicNode->_rn->_curLen));}_msgQueue.pop();delete logicNode;
}void LogicSystem::RegisterCallBack()
{_funcMap[MSG_ID_TEST_REQUEST] = bind(&LogicSystem::DisTestMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);_funcMap[MSG_ID_UPLOAD_FILE_REQUEST] = bind(&LogicSystem::DisUploadFileMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
}void LogicSystem::DisTestMessage(CSession* session, const short& msg_id, const string& msg_data)
{Json::Reader reader;Json::Value root;if (!reader.parse(msg_data, root)){cout << "parse json error" << endl;return;}string data = root["data"].asString();cout << "receive test message: " << data << endl;Json::Value response;ConnectionRAII conn([&response, &session](){string response_str = response.toStyledString();session->Send(response_str, MSG_ID_TEST_RESPONSE);});response["code"] = ErrorCodes::SUCCESS;response["data"] = "recv test message success";
}void LogicSystem::DisUploadFileMessage(CSession* session, const short& msg_id, const string& msg_data)
{Json::Reader reader;Json::Value root;if (!reader.parse(msg_data, root)){cout << "parse json error" << endl;return;}string data = root["data"].asString();Json::Value response;ConnectionRAII conn([&response, &session](){string response_str = response.toStyledString();session->Send(response_str, MSG_ID_UPLOAD_FILE_RESPONSE);});// 解碼string decoded = base64_decode(data);int seq = root["seq"].asInt();string name = root["name"].asString();int totalSize = root["totalSize"].asInt();int transSize = root["transSize"].asInt();cout << "receive upload file message: " << name << " seq: " << seq << " totalSize: " << totalSize << " transSize: " << transSize << endl;string filePath = _outPath + "/" + name;cout << "outPath: " << filePath << endl;ofstream ofs;// 第一個包if (seq == 1){ofs.open(filePath, ios::binary | ios::trunc);}else{ofs.open(filePath, ios::binary | ios::app);}// 檢查文件是否打開成功if (!ofs.is_open()){cerr << "open file error" << endl;response["code"] = ErrorCodes::FILE_OPEN_ERROR;return;}// 寫入文件ofs.write(decoded.data(), decoded.size());if (!ofs){cerr << "write file error" << endl;response["code"] = ErrorCodes::FILE_WRITE_ERROR;return;}// 關閉文件ofs.close();cout << "write file success" << endl;// 響應response["code"] = ErrorCodes::SUCCESS;response["seq"] = seq;response["name"] = name;response["transSize"] = transSize;response["totalSize"] = totalSize;
}

1.2.2 多線程處理客戶端請求

1.2.2.1?自己嘗試使用多線程來接受文件

創建線程池

class ThreadPool
{
public:~ThreadPool();ThreadPool(int threadNum);void Stop() { bStop = true; }friend void operator<<(ThreadPool& pool, shared_ptr<FileInfo> fileInfo);private:void PostFileToBuffer(shared_ptr<FileInfo> fileInfo);												// 將文件添加到緩存區void WriteFile(vector<shared_ptr<FileInfo>> fileInfos);												// 從緩沖區中讀取文件并寫入到內存中void DealFileBuffer();																				// 處理文件緩沖區private:vector<thread> _threads;int _threadNum;bool bStop = false;																					// 線程池停止標志mutex _waitFileBuffer_mutex;																		// 等待文件緩沖區互斥鎖condition_variable _waitFileBuffer_cv;																// 等待文件緩沖區非空vector<shared_ptr<FileInfo>> _fileBuffer;															// 文件緩沖區
};

構造函數

將待寫入文件的信息,先解析到數組里,然后多線程去寫入

寫入到文件中

添加待寫入字符串到數組中

存儲文件的結構體

LogicSystem構造函數

將文件寫入緩沖區

完整代碼

頭文件

#ifndef LOGICSYSTEM_H
#define LOGICSYSTEM_H
#include "GlobalHead.h"
#include "Singletion.h"
#include "Struct.h"// 把函數地址當作變量類型來使用
class CSession;
class LogicNode;
using FunCallBack = function<void(CSession* session, const short& msg_id, const string& msg_data)>;class ThreadPool
{
public:~ThreadPool();ThreadPool(int threadNum);void Stop() { bStop = true; }friend void operator<<(ThreadPool& pool, shared_ptr<FileInfo> fileInfo);private:void PostFileToBuffer(shared_ptr<FileInfo> fileInfo);												// 將文件添加到緩存區void WriteFile(vector<shared_ptr<FileInfo>> fileInfos);												// 從緩沖區中讀取文件并寫入到內存中void DealFileBuffer();																				// 處理文件緩沖區private:vector<thread> _threads;int _threadNum;bool bStop = false;																					// 線程池停止標志mutex _waitFileBuffer_mutex;																		// 等待文件緩沖區互斥鎖condition_variable _waitFileBuffer_cv;																// 等待文件緩沖區非空vector<shared_ptr<FileInfo>> _fileBuffer;															// 文件緩沖區
};class LogicSystem : public std::enable_shared_from_this<LogicSystem>, public Singletion<LogicSystem>
{friend class Singletion<LogicSystem>;																// 為了訪問該類的構造函數friend class ThreadPool;public:~LogicSystem();																						void PostMsgToQueue(LogicNode* logicNode);LogicSystem(const LogicSystem&) = delete;															// 禁止拷貝構造函數LogicSystem& operator=(const LogicSystem&) = delete;												// 禁止拷貝賦值運算符private:LogicSystem();void DealMsg();																						// 處理信息void RegisterCallBack();																			// 注冊回調函數void ConsumerFunc();private:void DisTestMessage(CSession* session, const short& msg_id, const string& msg_data);				// 處理測試消息void DisUploadFileMessage(CSession* session, const short& msg_id, const string& msg_data);			// 處理上傳文件消息private:thread _workThread;																					// 工作線程處理消息queue<LogicNode*> _msgQueue;mutex _mutex;condition_variable _consumer;																		// 消費者,條件變量bool bStop;map<short, FunCallBack> _funcMap;																	// 回調函數映射表unordered_map<int, UserInfo*> _userMaps;string _outPath;																					// 文件存儲路徑ThreadPool* _threadPool;																			// 線程池
};#endif // LOGICSYSTEM_H

實現文件

#include "LogicSystem.h"
#include <fstream>
#include "CSession.h"
#include "MsgNode.h"
#include "Enum.h"
#include "ServerStatic.h"
#include "UserMgr.h"
#include "base64.h"using namespace std;#pragma region /* ThreadPool */ThreadPool::~ThreadPool()
{bStop = true;_waitFileBuffer_cv.notify_all();for (auto& t : _threads){t.join();}
}ThreadPool::ThreadPool(int threadNum): _threadNum(threadNum)
{for (int i = 0; i < threadNum; i++){_threads.push_back(thread(&ThreadPool::DealFileBuffer, this));}
}void operator<<(ThreadPool& pool, shared_ptr<FileInfo> fileInfo)
{pool.PostFileToBuffer(fileInfo);
}void ThreadPool::PostFileToBuffer(shared_ptr<FileInfo> fileInfo)
{unique_lock<mutex> lock(_waitFileBuffer_mutex);_fileBuffer.push_back(fileInfo);sort(_fileBuffer.begin(), _fileBuffer.end());											// 排序_fileBuffer.push_back(fileInfo);if (_fileBuffer.size() >= 1){lock.unlock();_waitFileBuffer_cv.notify_one();}
}void ThreadPool::DealFileBuffer()
{while (true){unique_lock<mutex> lock(_waitFileBuffer_mutex);_waitFileBuffer_cv.wait(lock, [this] {return!_fileBuffer.empty(); });vector<shared_ptr<FileInfo>> fileInfos = std::move(_fileBuffer);_fileBuffer.clear();lock.unlock();if (bStop){WriteFile(std::move(fileInfos));break;}WriteFile(std::move(fileInfos));}
}void ThreadPool::WriteFile(vector<shared_ptr<FileInfo>> fileInfos)
{for (shared_ptr<FileInfo> fileInfo : fileInfos){cout << "write file: " << fileInfo->_name << " seq: " << fileInfo->_curSeq << "writePath: " << fileInfo->_writePath << endl;ofstream ofs;// 第一個包if (fileInfo->_curSeq == 1){ofs.open(fileInfo->_writePath, ios::binary | ios::trunc);}else{ofs.open(fileInfo->_writePath, ios::binary | ios::app);}// 檢查文件是否打開成功if (!ofs.is_open()){cerr << "open file error" << endl;return;}// 寫入文件ofs.write(fileInfo->_data.data(), fileInfo->_data.size());if (!ofs){cerr << "write file error" << endl;return;}// 關閉文件ofs.close();cout << "write file success" << endl;}
}#pragma endregion#pragma region /* LogicSystem */
LogicSystem::LogicSystem():bStop(false)
{RegisterCallBack();_outPath = get<string>(ServerStatic::ParseConfig("FileOutPath", "Path"));											// 獲取文件存儲路徑_workThread = thread(&LogicSystem::DealMsg, this);																	// 后臺線程不停的處理消息_threadPool = new ThreadPool(5);																					// 創建線程池
}LogicSystem::~LogicSystem()
{bStop = true;_consumer.notify_all();_workThread.join();delete _threadPool;
}void LogicSystem::PostMsgToQueue(LogicNode* logicNode)
{unique_lock<mutex> lock(_mutex);_msgQueue.push(logicNode);if (_msgQueue.size() >= 1){lock.unlock();																									// 解鎖_consumer.notify_one();}
}void LogicSystem::DealMsg()
{while (true){unique_lock<mutex> lock(_mutex);_consumer.wait(lock, [this] {return!_msgQueue.empty(); });														// 等待隊列不為空if (bStop){while (!_msgQueue.empty()){ConsumerFunc();}break;}ConsumerFunc();}
}void LogicSystem::ConsumerFunc()
{LogicNode* logicNode = _msgQueue.front();short id = logicNode->_rn->_msg_id;auto it = _funcMap.find(id);if (it != _funcMap.end()){it->second(logicNode->_cs.get(), id, string(logicNode->_rn->_data, logicNode->_rn->_curLen));}_msgQueue.pop();delete logicNode;
}void LogicSystem::RegisterCallBack()
{_funcMap[MSG_ID_TEST_REQUEST] = bind(&LogicSystem::DisTestMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);_funcMap[MSG_ID_UPLOAD_FILE_REQUEST] = bind(&LogicSystem::DisUploadFileMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
}void LogicSystem::DisTestMessage(CSession* session, const short& msg_id, const string& msg_data)
{Json::Reader reader;Json::Value root;if (!reader.parse(msg_data, root)){cout << "parse json error" << endl;return;}string data = root["data"].asString();cout << "receive test message: " << data << endl;Json::Value response;ConnectionRAII conn([&response, &session](){string response_str = response.toStyledString();session->Send(response_str, MSG_ID_TEST_RESPONSE);});response["code"] = ErrorCodes::SUCCESS;response["data"] = "recv test message success";
}void LogicSystem::DisUploadFileMessage(CSession* session, const short& msg_id, const string& msg_data)
{Json::Reader reader;Json::Value root;if (!reader.parse(msg_data, root)){cout << "parse json error" << endl;return;}string data = root["data"].asString();Json::Value response;ConnectionRAII conn([&response, &session](){string response_str = response.toStyledString();session->Send(response_str, MSG_ID_UPLOAD_FILE_RESPONSE);});// 解碼string decoded = base64_decode(data);int seq = root["seq"].asInt();string name = root["name"].asString();int totalSize = root["totalSize"].asInt();int transSize = root["transSize"].asInt();cout << "receive upload file message: " << name << " seq: " << seq << " totalSize: " << totalSize << " transSize: " << transSize << endl;string filePath = _outPath + "/" + name;cout << "outPath: " << filePath << endl;// 將文件寫入緩沖區shared_ptr<FileInfo> fileInfo_ptr = make_shared<FileInfo>(name, filePath, seq, decoded);*_threadPool << fileInfo_ptr;// 響應response["code"] = ErrorCodes::SUCCESS;response["seq"] = seq;response["name"] = name;response["transSize"] = transSize;response["totalSize"] = totalSize;
}#pragma endregion

但是速率并沒有增加多少,有問題

不采用多線程寫入文件,而采用多線程解析文件并存入緩沖區

線程池中包含一個互斥變量,邏輯處理類中包含一個互斥變量

線程池給消息隊列上鎖

喚醒阻塞的線程執行Tcp消息任務

將上傳文件任務按照序列seq解析到緩沖區中,此時會喚醒因為文件緩沖區為空而等待的線程

LogicSystem給文件緩沖區上鎖

喚醒因為文件緩沖區為空而阻塞等待執行寫入的線程

線程寫入內容到文件

void LogicSystem::DealFileBuffer()
{while (true){unique_lock<mutex> lock(_waitFileBuffer_mutex);_waitFileBuffer_cv.wait(lock, [this] {return!_fileBuffer.empty(); });vector<shared_ptr<FileInfo>> fileInfos = std::move(_fileBuffer);_fileBuffer.clear();lock.unlock();if (bStop){WriteFile(std::move(fileInfos));break;}WriteFile(std::move(fileInfos));}
}void LogicSystem::WriteFile(vector<shared_ptr<FileInfo>> fileInfos)
{ofstream ofs;for (shared_ptr<FileInfo> fileInfo : fileInfos){// 獲取響應Json::Value &response = fileInfo->_response;// 返回響應ConnectionRAII conn([&response, &fileInfo](){string response_str = response.toStyledString();fileInfo->_session->Send(response_str, MSG_ID_UPLOAD_FILE_RESPONSE);});cout << "write file: " << fileInfo->_name << " seq: " << fileInfo->_curSeq << "writePath: " << fileInfo->_writePath << endl;// 打開文件if (!ofs.is_open()){// 第一個包if (fileInfo->_curSeq == 1){ofs.open(fileInfo->_writePath, ios::binary | ios::trunc);ConnectionRAII conn([&ofs](){ofs.close();});}else{ofs.open(fileInfo->_writePath, ios::binary | ios::app);}}// 檢查文件是否打開成功if (!ofs.is_open()){cerr << "open file error" << endl;response["code"] = ErrorCodes::FILE_OPEN_ERROR;return;}// 寫入文件ofs.write(fileInfo->_data.data(), fileInfo->_data.size());if (!ofs){cerr << "write file error" << endl;response["code"] = ErrorCodes::FILE_WRITE_ERROR;return;}cout << "write file success" << endl;}
}

接收文件速率并沒有什么提高

頭文件

#ifndef LOGICSYSTEM_H
#define LOGICSYSTEM_H
#include "GlobalHead.h"
#include "Singletion.h"
#include "Struct.h"// 把函數地址當作變量類型來使用
class CSession;
class LogicNode;
using FunCallBack = function<void(CSession* session, const short& msg_id, const string& msg_data)>;class ThreadPool : public Singletion<ThreadPool>
{friend class Singletion<ThreadPool>;																// 為了訪問該類的構造函數
public:~ThreadPool();void Stop() { bStop = true; }friend void operator<<(ThreadPool& pool, shared_ptr<LogicNode> logicNode);protected:ThreadPool(int threadNum);private:void DealMsg();																						// 處理信息void ConsumerFunc();void PraseFileInfo(shared_ptr<LogicNode> logicNode);												// 解析文件信息private:vector<thread> _threads;int _threadNum;bool bStop = false;																					// 線程池停止標志mutex _waitTcpMsg_mutex;																			// 等待tcp消息互斥鎖condition_variable _waitTcpMsg_cv;																	// 等待tcp消息非空queue<shared_ptr<LogicNode>> _msgQueue;																// TCP消息隊列
};class LogicSystem : public std::enable_shared_from_this<LogicSystem>, public Singletion<LogicSystem>
{friend class Singletion<LogicSystem>;																// 為了訪問該類的構造函數friend class ThreadPool;public:~LogicSystem();																						void PostMsgToQueue(shared_ptr<LogicNode> logicNode);LogicSystem(const LogicSystem&) = delete;															// 禁止拷貝構造函數LogicSystem& operator=(const LogicSystem&) = delete;												// 禁止拷貝賦值運算符private:LogicSystem();private:void RegisterCallBack();																			// 注冊回調函數void DisTestMessage(CSession* session, const short& msg_id, const string& msg_data);				// 處理測試消息void PostFileToBuffer(shared_ptr<FileInfo> fileInfo);												// 將文件添加到緩存區void WriteFile(vector<shared_ptr<FileInfo>> fileInfos);												// 從緩沖區中讀取文件并寫入到內存中void DealFileBuffer();																				// 處理文件緩沖區private:map<short, FunCallBack> _funcMap;																	// 回調函數映射表string _outPath;																					// 文件存儲路徑thread _workThread;																					// 工作線程shared_ptr<ThreadPool> _threadPool;																	// 線程池bool bStop = false;																					// 線程池停止標志mutex _waitFileBuffer_mutex;																		// 等待文件緩沖區互斥鎖condition_variable _waitFileBuffer_cv;																// 等待文件緩沖區非空vector<shared_ptr<FileInfo>> _fileBuffer;															// 文件緩沖區
};#endif // LOGICSYSTEM_H

實現文件

#include "LogicSystem.h"
#include <fstream>
#include "CSession.h"
#include "MsgNode.h"
#include "Enum.h"
#include "ServerStatic.h"
#include "UserMgr.h"
#include "base64.h"using namespace std;#pragma region /* ThreadPool */ThreadPool::~ThreadPool()
{bStop = true;_waitTcpMsg_cv.notify_all();for (auto& t : _threads){t.join();}
}ThreadPool::ThreadPool(int threadNum): _threadNum(threadNum)
{for (int i = 0; i < threadNum; i++){_threads.push_back(thread(&ThreadPool::DealMsg, this));}
}void operator<<(ThreadPool& pool, shared_ptr<LogicNode> Message)
{unique_lock<mutex> lock(pool._waitTcpMsg_mutex);pool._msgQueue.push(Message);if (pool._msgQueue.size() >= 1){lock.unlock();																									// 解鎖pool._waitTcpMsg_cv.notify_all();}
}void ThreadPool::DealMsg()
{while (true){unique_lock<mutex> lock(_waitTcpMsg_mutex);_waitTcpMsg_cv.wait(lock, [this] {return!_msgQueue.empty(); });													// 等待隊列不為空cout << "_msgQueue size: " << _msgQueue.size() << endl;if (bStop){while (!_msgQueue.empty()){ConsumerFunc();}break;}ConsumerFunc();}
}void ThreadPool::ConsumerFunc()
{shared_ptr<LogicNode> logicNode = _msgQueue.front();_msgQueue.pop();uint16_t id = logicNode->_rn->_msg_id;if (id == MSG_ID_UPLOAD_FILE_REQUEST){PraseFileInfo(logicNode);}else if (id == MSG_ID_TEST_REQUEST){LogicSystem::GetInstance()->DisTestMessage(logicNode->_cs.get(), id, string(logicNode->_rn->_data, logicNode->_rn->_curLen));}
}void ThreadPool::PraseFileInfo(shared_ptr<LogicNode> logicNode)
{const string& msg_data = string(logicNode->_rn->_data, logicNode->_rn->_curLen);Json::Reader reader;Json::Value root;if (!reader.parse(msg_data, root)){cout << "parse json error" << endl;return;}string data = root["data"].asString();// 解碼string decoded = base64_decode(data);int seq = root["seq"].asInt();string name = root["name"].asString();int totalSize = root["totalSize"].asInt();int transSize = root["transSize"].asInt();cout << "receive upload file message: " << name << " seq: " << seq << " totalSize: " << totalSize << " transSize: " << transSize << endl;string filePath = LogicSystem::GetInstance()->_outPath + "/" + name;// 響應Json::Value response;response["code"] = ErrorCodes::SUCCESS;response["seq"] = seq;response["name"] = name;response["transSize"] = transSize;response["totalSize"] = totalSize;// 將文件寫入緩沖區shared_ptr<FileInfo> fileInfo_ptr = make_shared<FileInfo>(name, filePath, seq, decoded, response, logicNode->_cs);LogicSystem::GetInstance()->PostFileToBuffer(fileInfo_ptr);
}#pragma endregion#pragma region /* LogicSystem */
LogicSystem::LogicSystem():bStop(false)
{RegisterCallBack();_outPath = get<string>(ServerStatic::ParseConfig("FileOutPath", "Path"));											// 獲取文件存儲路徑_workThread = thread(&LogicSystem::DealFileBuffer, this);															// 后臺線程不停的處理消息_threadPool = ThreadPool::GetInstance(5);																			// 獲取線程池
}LogicSystem::~LogicSystem()
{bStop = true;_waitFileBuffer_cv.notify_all();_workThread.join();
}void LogicSystem::PostMsgToQueue(shared_ptr<LogicNode> logicNode)
{*_threadPool << logicNode;cout << "post message to queue" << endl;
}void LogicSystem::RegisterCallBack()
{_funcMap[MSG_ID_TEST_REQUEST] = bind(&LogicSystem::DisTestMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
}void LogicSystem::DisTestMessage(CSession* session, const short& msg_id, const string& msg_data)
{Json::Reader reader;Json::Value root;if (!reader.parse(msg_data, root)){cout << "parse json error" << endl;return;}string data = root["data"].asString();cout << "receive test message: " << data << endl;Json::Value response;ConnectionRAII conn([&response, &session](){string response_str = response.toStyledString();session->Send(response_str, MSG_ID_TEST_RESPONSE);});response["code"] = ErrorCodes::SUCCESS;response["data"] = "recv test message success";
}void LogicSystem::PostFileToBuffer(shared_ptr<FileInfo> fileInfo)
{unique_lock<mutex> lock(_waitFileBuffer_mutex);_fileBuffer.push_back(fileInfo);sort(_fileBuffer.begin(), _fileBuffer.end());											// 排序if (_fileBuffer.size() >= 1){lock.unlock();_waitFileBuffer_cv.notify_one();}
}void LogicSystem::DealFileBuffer()
{while (true){unique_lock<mutex> lock(_waitFileBuffer_mutex);_waitFileBuffer_cv.wait(lock, [this] {return!_fileBuffer.empty(); });vector<shared_ptr<FileInfo>> fileInfos = std::move(_fileBuffer);_fileBuffer.clear();lock.unlock();if (bStop){WriteFile(std::move(fileInfos));break;}WriteFile(std::move(fileInfos));}
}void LogicSystem::WriteFile(vector<shared_ptr<FileInfo>> fileInfos)
{ofstream ofs;for (shared_ptr<FileInfo> fileInfo : fileInfos){// 獲取響應Json::Value &response = fileInfo->_response;// 返回響應ConnectionRAII conn([&response, &fileInfo](){string response_str = response.toStyledString();fileInfo->_session->Send(response_str, MSG_ID_UPLOAD_FILE_RESPONSE);});cout << "write file: " << fileInfo->_name << " seq: " << fileInfo->_curSeq << "writePath: " << fileInfo->_writePath << endl;// 打開文件if (!ofs.is_open()){// 第一個包if (fileInfo->_curSeq == 1){ofs.open(fileInfo->_writePath, ios::binary | ios::trunc);ConnectionRAII conn([&ofs](){ofs.close();});}else{ofs.open(fileInfo->_writePath, ios::binary | ios::app);}}// 檢查文件是否打開成功if (!ofs.is_open()){cerr << "open file error" << endl;response["code"] = ErrorCodes::FILE_OPEN_ERROR;return;}// 寫入文件ofs.write(fileInfo->_data.data(), fileInfo->_data.size());if (!ofs){cerr << "write file error" << endl;response["code"] = ErrorCodes::FILE_WRITE_ERROR;return;}cout << "write file success" << endl;}
}#pragma endregion

1.2.2.2 老師的多線程思路

將處理客戶端消息包的具體邏輯挪移到LogicWork類中,該類是一個線程類

頭文件

#include <string>
#include <queue>
#include <map>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <functional>
#include <atomic>using namespace std;
class CSession;
class LogicNode;using FunCallBack = function<void(shared_ptr<CSession> session, const short& msg_id, const string& msg_data)>;class LogicWorker
{
public:LogicWorker();~LogicWorker();friend void operator<<(shared_ptr<LogicWorker> worker , const shared_ptr<LogicNode> node);			// 向消息隊列中添加消息private:void RegisterCallBack();																			// 注冊回調函數void DisTestMessage(shared_ptr<CSession> session, const uint16_t& msg_id, const string& msg_data);	// 處理測試消息void DisUplaodFile(shared_ptr<CSession> session, const uint16_t& msg_id, const string& msg_data);	// 處理文件上傳消息void DealMsg();																						// 處理信息void ConsumerFunc(shared_ptr<LogicNode> logicNode);private:map<short, FunCallBack> _funcMap;																	// 回調函數映射表atomic<bool> _b_stop;																				// 線程池停止標志thread _woker_thread;																				// 工作線程mutex _wait_tcpMsg_mutex;																			// 等待tcp消息互斥鎖condition_variable _wait_tcpMsg_cv;																	// 等待tcp消息非空queue<shared_ptr<LogicNode>> _msgQueue;																// TCP消息隊列
};#endif // LOGICWORKER_H

實現文件

#include "LogicWorker.h"
#include "CSession.h"
#include "MsgNode.h"
#include "Enum.h"
#include "Struct.h"
#include "FileSystem.h"LogicWorker::LogicWorker():_b_stop(false)
{RegisterCallBack();_woker_thread = thread(&LogicWorker::DealMsg, this);
}LogicWorker::~LogicWorker()
{_b_stop = true;_wait_tcpMsg_cv.notify_one();_woker_thread.join();
}void LogicWorker::RegisterCallBack()
{_funcMap[MSG_ID_TEST_REQUEST] = bind(&LogicWorker::DisTestMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);_funcMap[MSG_ID_UPLOAD_FILE_REQUEST] = bind(&LogicWorker::DisUplaodFile, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
}void LogicWorker::DisTestMessage(shared_ptr<CSession> session, const uint16_t& msg_id, const string& msg_data)
{Json::Reader reader;Json::Value root;if (!reader.parse(msg_data, root)){cout << "parse json error" << endl;return;}string data = root["data"].asString();cout << "receive test message: " << data << endl;Json::Value response;ConnectionRAII conn([&response, &session](){string response_str = response.toStyledString();session->Send(response_str, MSG_ID_TEST_RESPONSE);});response["code"] = ErrorCodes::SUCCESS;response["data"] = "recv test message success";
}void LogicWorker::DisUplaodFile(shared_ptr<CSession> session, const uint16_t& msg_id, const string& msg_data)
{Json::Reader reader;Json::Value root;if (!reader.parse(msg_data, root)){cout << "parse json error" << endl;return;}string data = root["data"].asString();int seq = root["seq"].asInt();string name = root["name"].asString();int totalSize = root["totalSize"].asInt();int transSize = root["transSize"].asInt();int totalSeq = root["totalSeq"].asInt();hash<string> hashFunc;size_t fileHash = hashFunc(name);int index = fileHash % THREAD_COUNT;shared_ptr<FileTask> fileTask = make_shared<FileTask>(session, name, seq, totalSize, transSize, totalSeq, data);FileSystem::GetInstance()->PostMsgToQueue(fileTask, index);
}void LogicWorker::DealMsg()
{while (!_b_stop){unique_lock<mutex> lock(_wait_tcpMsg_mutex);_wait_tcpMsg_cv.wait(lock, [this] {return!_msgQueue.empty(); });													// 等待隊列不為空cout << "_msgQueue size: " << _msgQueue.size() << endl;if(_b_stop)break;queue<shared_ptr<LogicNode>> msgQueue = move(_msgQueue);															// 移動隊列lock.unlock();while (!msgQueue.empty()){ConsumerFunc(msgQueue.front());msgQueue.pop();}}
}void LogicWorker::ConsumerFunc(shared_ptr<LogicNode> logicNode)
{uint16_t id = logicNode->_rn->_msg_id;auto it = _funcMap.find(id);if (it != _funcMap.end()){it->second(logicNode->_cs, id, string(logicNode->_rn->_data, logicNode->_rn->_msgLen));}
}void operator<<(shared_ptr<LogicWorker> worker, const shared_ptr<LogicNode> node)
{{unique_lock<mutex> lock(worker->_wait_tcpMsg_mutex);worker->_msgQueue.push(node);}worker->_wait_tcpMsg_cv.notify_one();
}

執行文件寫入的線程類FileWorker

頭文件

#ifndef FILEWORKER_H
#define FILEWORKER_H#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <atomic>
#include <string>
using namespace std;class CSession;struct 	FileTask
{FileTask(shared_ptr<CSession> session, string file_name, int seq, int total_size, int trans_size, int last_seq, string file_data):_session(session), _file_name(file_name), _seq(seq), _total_size(total_size), _trans_size(trans_size), _last_seq(last_seq), _file_data(file_data){}~FileTask() {}shared_ptr<CSession> _session;string _file_name;int _seq;int _total_size;int _trans_size;int _last_seq;string _file_data;
};class FileWorker
{
public:FileWorker();~FileWorker();friend void operator<<(shared_ptr<FileWorker> worker, shared_ptr<FileTask> task);private:void ExecuteTask(shared_ptr<FileTask> task);void Run();private:thread _worker_thread;queue<shared_ptr<FileTask>> _task_queue;atomic<bool> _b_stop;mutex _wait_file_task_mutex;condition_variable _wait_file_task_cv;
};#endif // FILEWORKER_H

實現文件

#include "FileWorker.h"
#include <fstream>
#include "base64.h"
#include "ServerStatic.h"
#include "CSession.h"
#include "Enum.h"FileWorker::FileWorker():_b_stop(false)
{_worker_thread = thread(&FileWorker::Run, this);
}FileWorker::~FileWorker()
{while (!_task_queue.empty()){_task_queue.pop();}_b_stop = true;_wait_file_task_cv.notify_one();_worker_thread.join();
}void FileWorker::Run()
{while (!_b_stop){unique_lock<mutex> lock(_wait_file_task_mutex);_wait_file_task_cv.wait(lock, [this]() { return !_task_queue.empty() || _b_stop; });if (_b_stop)break;queue<shared_ptr<FileTask>> task_queue = move(_task_queue);lock.unlock();while (!task_queue.empty()){shared_ptr<FileTask> task = task_queue.front();task_queue.pop();ExecuteTask(task);}}
}void operator<<(shared_ptr<FileWorker> worker, shared_ptr<FileTask> task)
{{unique_lock<mutex> lock(worker->_wait_file_task_mutex);worker->_task_queue.push(task);}worker->_wait_file_task_cv.notify_one();
}void FileWorker::ExecuteTask(shared_ptr<FileTask> task)
{// 響應Json::Value response;response["seq"] = task->_seq;response["name"] = task->_file_name;response["transSize"] = task->_trans_size;response["totalSize"] = task->_total_size;ConnectionRAII RAII([&response, task](){string response_str = response.toStyledString();task->_session->Send(response_str, MSG_IDS::MSG_ID_UPLOAD_FILE_RESPONSE);});// 解碼string decode = base64_decode(task->_file_data);string filePath = get<string>(ServerStatic::ParseConfig("FileOutPath", "Path")) + "/" + task->_file_name;int last_seq = task->_last_seq;// 寫入文件ofstream ofs;if (task->_seq == 1){ofs.open(filePath, ios::binary | ios::trunc);}else{ofs.open(filePath, ios::binary | ios::app);}if (!ofs.is_open()){response["code"] = ErrorCodes::FILE_OPEN_ERROR;return;}ofs.write(decode.c_str(), decode.size());if (!ofs){response["code"] = ErrorCodes::FILE_WRITE_ERROR;return;}ofs.close();response["code"] = ErrorCodes::SUCCESS;cout << "FileWorker::ExecuteTask: " << filePath << " write curr_seq : " << task->_seq << "	success!!!" << endl;
}

該多線程是基于多個客戶端的基礎上來實現的,為每一個會話單獨的分配線程去負責對應的網絡傳輸,而不是我理解的多個線程去處理某一個文件傳輸任務,此外處理Tcp請求信息的是一個專屬線程,專門處理文件傳輸的又是一個線程,在添加Tcp消息和任務時都是添加到對應的線程類所在的隊列中

針對不同的會話ID來創建對應的hash_value,但是還是有很大概率會分配到同一個處理TCP消息的線程中;

如下

針對不同的文件名創建對應的hash_value,來分配對應的線程處理文件傳輸

同樣的也會有這種問題

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/918442.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/918442.shtml
英文地址,請注明出處:http://en.pswp.cn/news/918442.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

centos系統sglang單節點本地部署大模型

前置工作 本地部署大模型的基本概念和前置工作-CSDN博客 模型部署 這里通過docker容器進行部署。我這里是h20*8,部署deepseek-v3-0324,這個配置和模型都比較大,大家根據自己的硬件對應調整 步驟一 我們要通過sglang部署模型,先拉取sglang的docker鏡像,這里下載失敗的…

【dij算法/最短路/分層圖】P4568 [JLOI2011] 飛行路線

題目描述 Alice 和 Bob 現在要乘飛機旅行&#xff0c;他們選擇了一家相對便宜的航空公司。該航空公司一共在 nnn 個城市設有業務&#xff0c;設這些城市分別標記為 000 到 n?1n-1n?1&#xff0c;一共有 mmm 種航線&#xff0c;每種航線連接兩個城市&#xff0c;并且航線有一定…

告別傳統,CVPR三論文用GNN動態圖重塑視覺AI

本文選自gongzhonghao【圖靈學術SCI論文輔導】關注我們&#xff0c;掌握更多頂會頂刊發文資訊今天&#xff0c;為大家推薦一個極具前沿價值與實用潛力的研究方向&#xff1a;圖神經網絡&#xff08;GNN&#xff09;。作為深度學習領域的新興力量&#xff0c;圖神經網絡在近年頂…

HTTP/HTTPS代理,支持RSA和SM2算法

在日常工作和學習中&#xff0c;我們經常遇到HTTP和HTTPS的相關問題&#xff0c;要解決這些問題&#xff0c;有時就需要搭建各種實驗環境&#xff0c;重現業務場景&#xff0c;比如&#xff1a; 將HTTP轉為HTTPS。本地只能發送HTTP請求&#xff0c;但是遠程服務器卻只能接收HT…

如何提高AI寫作論文的查重率?推薦七個AI寫作論文工具

隨著AI技術在學術領域的廣泛應用&#xff0c;越來越多的學生和研究人員開始使用AI寫作工具來提高寫作效率&#xff0c;幫助完成畢業論文、科研論文等。然而&#xff0c;AI生成的內容是否會提高論文的查重率&#xff1f;是否能有效避免重復和提高通過率&#xff1f;這些問題成為…

跨平臺、低延遲、可嵌入:實時音視頻技術在 AI 控制系統中的進化之路

引言&#xff1a;面向未來的實時音視頻基座 在萬物互聯與智能化加速落地的時代&#xff0c;實時音視頻技術早已不再只是社交娛樂的附屬功能&#xff0c;而是智慧城市、應急指揮、遠程操控、工業智造、教育培訓、安防監控等系統的“神經中樞”。一條高性能、可控、低延遲的視頻…

Spring WebFlux開發指導

Spring WebFlux是一個響應式的web服務器端應用開發框架&#xff0c;響應式是指&#xff0c;當前端組件的狀態發生變化&#xff0c;則生成事件通知&#xff0c;根據需求可異步或者同步地向服務器端接口發送請求&#xff0c;當服務器端網絡IO組件的狀態發生變化&#xff0c;則生成…

09-docker鏡像手動制作

文章目錄一.手動制作單服務的nginx鏡像1.啟動一個基礎容器&#xff0c;此處我使用的是centos7鏡像。2.修改容器中的軟件源3.安裝nginx服務并啟動nginx服務4.修復nginx的首頁文件5.退出容器6.將退出的容器提交為鏡像7.測試鏡像的可用性二.手動制作多服務的nginx sshd鏡像1.啟用…

Android.mk教程

語法 Android.mk 的必備三行 LOCAL_PATH : $(call my-dir) # Android.mk的目錄&#xff0c;call調用函數include $(CLEAR_VARS) # 除了LOCAL_PATH清除所有LOCAL_XXXinclude $(BUILD_SHARED_LIBRARY) # BUILD_XXX, 指定構建類型 # BUILD_SHARED_LIBRARY → .so動態庫 # BUILD…

稠密檢索:基于神經嵌入的高效語義搜索范式

本文由「大千AI助手」原創發布&#xff0c;專注用真話講AI&#xff0c;回歸技術本質。拒絕神話或妖魔化。搜索「大千AI助手」關注我&#xff0c;一起撕掉過度包裝&#xff0c;學習真實的AI技術&#xff01; 1. 背景與定義 稠密檢索&#xff08;Dense Retrieval&#xff09;是一…

AI日報0807 | GPT-5或今晚1點來襲:四大版本全曝光

關注&#xff1a;未來世界2099每日分享&#xff1a;全球最新AI資訊【應用商業技術其他】服務&#xff1a;【學習Q】【資源Q】【學習資料】【行業報告】&#xff08;無限免費下載&#xff09;應用 1、訊飛星火代碼畫布震撼上線&#xff1a;動嘴就能開發&#xff0c;工作效率翻倍…

認識爬蟲 —— 正則表達式提取

本質是對字符串的處理&#xff0c;正則表達式描述的是一種字符串匹配的模式。簡而言之&#xff0c;用具備一定特征意義的表達式對字符串進行檢查&#xff0c;將符合條件的子字符串提取出來。導入模塊import re一、單字符匹配match(表達式&#xff0c;匹配對象)&#xff1a;匹配…

單鏈表專題---暴力算法美學(1)(有視頻演示)

1.1 移除鏈表元素 題目要求&#xff1a;給你一個鏈表的頭節點head 和一個整數val,請你刪除鏈表中所有滿足Node.val val 的節點&#xff0c;并返回新的頭節點。 思路一&#xff1a;遍歷鏈表&#xff0c;遇到val就刪除&#xff0c;pcur指向val的下一個節點&#xff0c;最后只剩…

機器學習-決策樹(DecisionTree)

0 回歸決策樹展示 import pandas as pd import numpy as np from sklearn.tree import DecisionTreeRegressor from sklearn.metrics import root_mean_squared_error, r2_score from sklearn.model_selection import GridSearchCV,KFold from sklearn.model_selection import…

【Java Web】JDBC 連接 MySQL 實現數據庫 CRUD(增刪改查)詳解

在 Java Web 開發中&#xff0c;與數據庫交互是不可避免的&#xff0c;而 JDBC&#xff08;Java Database Connectivity&#xff09; 是 Java 官方提供的標準數據庫連接接口&#xff0c;幾乎所有 Java 項目中都用過它。 本文通過一個完整示例&#xff0c;帶你從零實現 增&#…

HTTP 請求返回狀態碼和具體含義?200、400、403、404、502、503、504等

HTTP 狀態碼是服務器對客戶端請求的響應狀態標識&#xff0c;分為五大類&#xff08;以第一位數字區分&#xff09;&#xff0c;常用狀態碼如下&#xff1a; 1. 信息類&#xff08;1xx&#xff09;&#xff1a;請求已接收&#xff0c;繼續處理 100 Continue&#xff1a;服務器已…

13-netty基礎-手寫rpc-消費方生成代理-05

netty系列文章&#xff1a; 01-netty基礎-socket02-netty基礎-java四種IO模型03-netty基礎-多路復用select、poll、epoll04-netty基礎-Reactor三種模型05-netty基礎-ByteBuf數據結構06-netty基礎-編碼解碼07-netty基礎-自定義編解碼器08-netty基礎-自定義序列化和反序列化09-n…

ThreadLocal有哪些內存泄露問題,如何避免?

每個Thread都有一個ThreadLocal.ThreadLocalMap的map&#xff0c;該map的key為ThreadLocal實例&#xff0c;它為一個弱引 用&#xff0c;我們知道弱引用有利于GC回收。當ThreadLocal的key null時&#xff0c;GC就會回收這部分空間&#xff0c;但是value卻不一 定能夠被回收&am…

從0到1學LangChain之Agent代理:解鎖大模型應用新姿勢

從0到1學LangChain之Agent代理&#xff1a;解鎖大模型應用新姿勢 本文較長&#xff0c;建議點贊收藏&#xff0c;以免遺失。更多AI大模型開發 學習視頻/籽料/面試題 都在這>>Github<< 什么是 LangChain Agent 代理 如果把大模型比作一個超級大腦&#xff0c;那么…

Spring Boot 2.6.0+ 循環依賴問題及解決方案

Spring Boot 2.6.0 循環依賴問題及解決方案 目錄 背景解決方案 1. 配置文件開啟循環依賴&#xff08;侵入性最低&#xff0c;臨時方案&#xff09;2. Lazy 延遲注入&#xff08;侵入性低&#xff0c;推薦優先嘗試&#xff09;3. 手動從容器獲取&#xff08;ApplicationContex…