線程同步與互斥
- 一.線程互斥
- 1.互斥相關概念
- 2.互斥鎖 Mutex
- 3.互斥鎖接口
- 4.互斥鎖實現原理
- 5.互斥鎖封裝
- 二.線程同步
- 1.同步相關概念
- 2.條件變量 Condition Variable
- 3.條件變量接口
- 4.條件變量封裝
- 5.信號量 Semaphore
- 6.信號量接口
- 7.信號量封裝
- 8.生產者 - 消費者模型
- 1.基于 Blocking Queue 的生產者 - 消費者模型
- 2.基于 Ring Queue 的生產者 - 消費者模型
- 三.線程池
- 1.日志和策略模式
本節重點:
- 深刻理解線程互斥的原理和操作。
- 深刻理解線程同步。
- 掌握生產消費模型。
- 設計日志和線程池。
- 理解線程安全和可重入,掌握鎖相關概念。
一.線程互斥
1.互斥相關概念
- 共享資源:可以被多個進程或線程可以共同訪問和使用的資源。
- 臨界資源:被保護的共享資源,它在同一時刻只允許一個進程或線程訪問該資源。
- 臨界區:每個線程內部,訪問臨界資源的代碼,就叫做臨界區。
- 原子性:不會被任何調度機制打斷的操作,該操作只有兩態,要么完成,要么未完成。
- 互斥:在同一時刻,只允許一個線程或進程訪問共享資源。確保對共享資源的操作具有原子性,避免多個線程或進程同時對共享資源進行讀寫操作而導致的數據競爭和不一致問題。
2.互斥鎖 Mutex
- 大部分情況,線程使用的數據都是局部變量,變量的地址空間在線程棧空間內,這種情況,變量歸屬單個線程,其它線程無法獲得這種變量。
- 但有時候,很多變量都需要在線程間共享,這樣的變量稱為共享變量,可以通過數據的共享,完成線程之間的交互。但是多個線程并發的操作共享變量,存在數據安全問題。
#include <iostream>
#include <vector>
#include "Pthread.hpp"
using namespace ThreadModule;#define NUM 4int ticketnum = 10000; // 共享資源void Ticket()
{while(true){if(ticketnum > 0){usleep(1000);// 1.搶票std::cout << "get a new ticket, id: " << ticketnum << std::endl;ticketnum--;// 2.入庫模擬// usleep(1000);}else{break;}}
}int main()
{std::vector<Thread> threads;// 1.構建線程對象for(int i = 0; i < NUM; i++){threads.emplace_back(Ticket);}// 2.啟動線程for(auto& thread : threads){thread.Start();}// 3.等待線程for(auto& thread : threads){thread.Join();}return 0;
}
xzy@hcss-ecs-b3aa:~$ ./ticket
get a new ticket, id: 10000
get a new ticket, id: 9999
...
get a new ticket, id: 2
get a new ticket, id: 1
get a new ticket, id: 0
get a new ticket, id: -1
get a new ticket, id: -2
為什么可能無法獲得正確結果?
- if 語句判斷條件為真以后,代碼可以并發的切換到其它線程。
- usleep(1000); 這個模擬漫長業務的過程,在這個漫長的業務過程中,可能有很多個線程會進入該代碼段。
- ticketnum-- 操作本身就不是一個原子操作。
# 取出ticket--部分的匯編代碼
xzy@hcss-ecs-b3aa:~$ objdump -d ticket > ticket.s
xzy@hcss-ecs-b3aa:~$ vim ticket.s
25a3: 8b 05 6b 5a 00 00 mov 0x5a6b(%rip),%eax
25a9: 83 e8 01 sub $0x1,%eax
25ac: 89 05 62 5a 00 00 mov %eax,0x5a62(%rip)
ticketnum-- 操作并不是原子操作,而是對應三條匯編指令:
- load:將共享變量 ticket 從內存加載到寄存器中。
- update:更新寄存器里面的值,執行-1操作。
- store:將新值,從寄存器寫回共享變量 ticket 的內存地址。
要解決以上問題,需要做到三點:
- 代碼必須要有互斥行為:當代碼進入臨界區執行時,不允許其它線程進入該臨界區。
- 如果多個線程同時要求執行臨界區的代碼,并且臨界區沒有線程在執行,那么只能允許一個線程進入該臨界區。
- 如果線程不在臨界區中執行,那么該線程不能阻止其它線程進入臨界區。
要做到這三點,本質上就是需要一把鎖。Linux上提供的這把鎖叫互斥鎖(也叫互斥量)
- 所有對資源的保護,都是對臨界區代碼的保護,因為資源是通過代碼訪問的!
- 加鎖一定不能大塊代碼進行加鎖,要保證細粒度!
- 鎖本身是全局的,也是共享資源。鎖保護共享資源,那么誰保證鎖?加鎖和解鎖被設計稱為原子!要么執行完,要么未被執行,不需要被保護!
- 二元信號量就是鎖,加鎖的本質就是對資源展開預定,整體使用資源!
- 如果申請鎖的時候,被其它線程拿走了?其它線程要進行阻塞等待,保證只能有一個線程訪問資源!
- 線程在訪問臨界區代碼時,線程可以切換?可以切換,但是鎖被線程拿走了,其它線程無法進入臨界區!串行!原子性!效率低的原因!
3.互斥鎖接口
功能: 初始化互斥鎖// 靜態分配(編譯確定內存的大小和位置): 不需要銷毀
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;// 動態分配(運行確定內存的大小和位置): 需要銷毀
原型: int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
參數mutex: 要初始化的互斥鎖指針
參數attr: nullptr功能: 銷毀互斥鎖
原型: int pthread_mutex_destroy(pthread_mutex_t *mutex);
參數mutex: 要銷毀的互斥鎖指針
注意:
- 使用 PTHREAD_ MUTEX_ INITIALIZER 初始化的互斥鎖不需要銷毀。
- 不要銷毀一個已經加鎖的互斥鎖。
- 已經銷毀的互斥鎖,要確保后面不會有線程再嘗試加鎖。
功能: 加鎖, 其它線程不可以訪問共享資源
原型: int pthread_mutex_lock(pthread_mutex_t *mutex);功能: 解鎖, ,其它線程可以訪問共享資源
原型: int pthread_mutex_unlock(pthread_mutex_t *mutex);
調用 pthread_mutex_lock 時,可能會遇到以下情況:
- 互斥鎖處于未鎖狀態,該函數會將互斥鎖鎖定,同時返回成功。
- 發起函數調用時,其它線程已經鎖定互斥鎖,或者存在其它線程同時申請互斥量,但沒有競爭到互斥鎖,那么 pthread_mutex_lock 調用會陷入阻塞(執行流被掛起,調度其它線程),等待互斥鎖解鎖。
// Pthread.hpp 在上一篇博客<線程概念與控制>, 模版封裝線程庫中// ticket.cc
#include <iostream>
#include <vector>
#include <string>
#include "Pthread.hpp"
using namespace ThreadModule;#define NUM 4// pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; // 鎖也是共享資源
int ticketnum = 1000; // 共享資源class ThreadData
{
public:std::string name;pthread_mutex_t *lock_ptr;
};void Ticket(ThreadData& td)
{while (true){// pthread_mutex_lock(&lock)pthread_mutex_lock(td.lock_ptr); // 加鎖if (ticketnum > 0){usleep(1000);// 1.搶票std::cout << td.name << " get a new ticket, id: "<< ticketnum <<std::endl;ticketnum--;// pthread_mutex_unlock(&lock)pthread_mutex_unlock(td.lock_ptr); // 解鎖// 2.入庫模擬: 耗時, 防止該線程再次搶鎖(訪問資源)usleep(50);}else{// pthread_mutex_unlock(&lock)pthread_mutex_unlock(td.lock_ptr); // 解鎖break;}}
}int main()
{pthread_mutex_t lock;pthread_mutex_init(&lock, nullptr);std::vector<Thread<ThreadData>> threads;// 1.構建線程對象for (int i = 0; i < NUM; i++){ThreadData* td = new ThreadData();td->lock_ptr = &lock;threads.emplace_back(Ticket, *td);td->name = threads[i].Name();}// 2.啟動線程for (auto &thread : threads){thread.Start();}// 3.等待線程for (auto &thread : threads){thread.Join();}pthread_mutex_destroy(&lock);return 0;
}
xzy@hcss-ecs-b3aa:~$ ./ticket
Thread-1 get a new ticket, id: 1000
Thread-2 get a new ticket, id: 999
Thread-3 get a new ticket, id: 998
...
Thread-2 get a new ticket, id: 3
Thread-3 get a new ticket, id: 2
Thread-4 get a new ticket, id: 1
4.互斥鎖實現原理
- 單純的 i++ 或者 ++i 都不是原子的,有可能會有數據安全問題。
- 為了實現互斥鎖操作,大多數體系結構都提供了 swap 或 exchange 指令,該指令的作用是把寄存器和內存單元的數據相交換,由于只有一條指令,保證了原子性,即使是多處理器平臺,訪問內存的總線周期也有先后,一個處理器上的交換指令執行時另一個處理器的交換指令只能等待總線周期。現在我們把 lock 和 unlock 的偽代碼改一下。
5.互斥鎖封裝
- RAII 的核心思想是將資源的獲取和初始化放在對象的構造函數中,將資源的釋放放在對象的析構函數中。
- 實現RAII的加鎖方式:構造函數實現加鎖,析構函數實現解鎖。
// Mutex.hpp
#pragma once#include <pthread.h>namespace MutexModule
{class Mutex{// 互斥鎖: 不支持拷貝構造、拷貝賦值Mutex(const Mutex &m) = delete;Mutex &operator=(const Mutex &m) = delete;public:Mutex(){::pthread_mutex_init(&_mutex, nullptr);}~Mutex(){::pthread_mutex_destroy(&_mutex);}pthread_mutex_t *LockAddr() { return &_mutex; }void Lock(){::pthread_mutex_lock(&_mutex);}void Unlock(){::pthread_mutex_unlock(&_mutex);}private:pthread_mutex_t _mutex;};class LockGuard{public:LockGuard(Mutex &mutex): _mutex(mutex){_mutex.Lock();}~LockGuard(){_mutex.Unlock();}private:Mutex &_mutex; // 使用引用: 互斥鎖不支持拷貝};
}// Main.cc
#include <string>
#include <unistd.h>
#include "Mutex.hpp"
using namespace MutexModule;int ticket = 1000;
Mutex mtx;void *Ticket(void* args)
{std::string name = static_cast<char*>(args);while(true){// mtx.Lock();LockGuard lg(mtx); // 臨時對象: 初始化時自動加鎖, 出while循環時自動解鎖(RAII風格的加鎖方式)if(ticket > 0){usleep(1000);std::cout << name << " buys a ticket: " << ticket << std::endl;ticket--;// mtx.Unlock();}else{// mtx.Unlock();break;}}return nullptr;
}int main()
{pthread_t t1, t2, t3;pthread_create(&t1, nullptr, Ticket, (void*)"thread-1");pthread_create(&t2, nullptr, Ticket, (void*)"thread-2");pthread_create(&t3, nullptr, Ticket, (void*)"thread-3");pthread_join(t1, nullptr);pthread_join(t2, nullptr);pthread_join(t3, nullptr);return 0;
}
xzy@hcss-ecs-b3aa:~$ ./ticket
thread-3 buys a ticket: 1000
thread-3 buys a ticket: 999
thread-3 buys a ticket: 998
...
thread-3 buys a ticket: 3
thread-3 buys a ticket: 2
thread-3 buys a ticket: 1
二.線程同步
1.同步相關概念
在互斥的代碼中,發現同一個線程多次訪問資源,導致其它進程遲遲訪問不到資源,導致進程饑餓問題。
- 同步:在保證數據安全的前提下,讓線程能夠按照某種特定的順序訪問臨界資源,從而有效避免線程饑餓問題,叫做同步。
- 競態條件:多個線程或進程對共享資源的訪問順序和時間不確定,而導致程序異常。
- 互斥保證在同一時間,只有一個線程/進程訪問共享資源,進而保證數據安全性,但是安全不一定合理/高效。同步是在互斥的前提下,讓系統變得更加合理/高效。
- 如何做到線程同步?條件變量!
2.條件變量 Condition Variable
- 例如:互斥的三個線程買票時,第一個搶到鎖的線程,發現沒有票時,它會不斷地加鎖、什么都做不了、解鎖(解鎖后最接近加鎖條件),導致其它線程處于饑餓狀態。當主線程發票之后,也是該線程搶到票。
#include <iostream>
#include <unistd.h>
#include <pthread.h>int ticket = 0;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;void *route(void *args)
{std::string name = static_cast<char *>(args);while (1){pthread_mutex_lock(&mutex);if (ticket > 0){usleep(1000); // 微秒std::cout << name << " buys a new ticket: " << ticket << std::endl;ticket--;pthread_mutex_unlock(&mutex);}else{std::cout << name << " do nothing" << std::endl;sleep(1);pthread_mutex_unlock(&mutex);}}return nullptr;
}int main()
{pthread_t t1, t2, t3;pthread_create(&t1, nullptr, route, (void *)"thread 1");pthread_create(&t2, nullptr, route, (void *)"thread 2");pthread_create(&t3, nullptr, route, (void *)"thread 3");int cnt = 3;while(true){sleep(5);ticket += cnt;std::cout << "主線程發票啦, ticket: " << ticket << std::endl;}pthread_join(t1, nullptr);pthread_join(t2, nullptr);pthread_join(t3, nullptr);return 0;
}
xzy@hcss-ecs-b3aa:~$ ./testCond
thread 1 do nothing
thread 1 do nothing
thread 1 do nothing
thread 1 do nothing
thread 1 do nothing
主線程發票啦, ticket: 3
thread 1 buys a new ticket: 3
thread 1 buys a new ticket: 2
thread 1 buys a new ticket: 1
條件變量:通常與互斥鎖一起使用。互斥鎖用于保護共享資源,防止多個線程同時訪問和修改這些資源而導致數據不一致;而條件變量則用于在線程之間傳遞狀態信息,使得線程可以根據特定條件的滿足與否來決定是繼續執行還是等待。條件變量內部維護的是線程隊列,實現線程同步!
3.條件變量接口
功能: 初始化條件變量// 靜態分配(編譯確定內存的大小和位置): 不需要銷毀
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;// 動態分配(運行確定內存的大小和位置): 需要銷毀
原型: int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
參數cond: 要初始化的條件變量指針
參數attr: nullptr功能: 銷毀條件變量
原型: int pthread_cond_destroy(pthread_cond_t *cond);
參數cond: 要銷毀的條件變量指針
功能: 讓當前線程在指定的條件變量上阻塞等待, 直到其它線程通過線程發送信號/廣播, 解除線程阻塞, 再在鎖上等待, 直到申請鎖成功
原型: int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
工作原理:
- 當線程調用 pthread_cond_wait() 時,它會自動釋放傳入的互斥鎖 mutex,這是為了讓其它線程有機會獲取該互斥鎖,進而修改共享資源,使得等待的條件有可能得到滿足。
- 線程進入阻塞狀態,等待在條件變量 cond 上,此時線程不會占用 CPU 資源。
- 當其它線程調用 pthread_cond_signal() 或 pthread_cond_broadcast() 對同一個條件變量 cond 發出信號時,該線程被喚醒。
- 線程被喚醒后,會嘗試重新獲取之前釋放的互斥鎖 mutex。若互斥鎖當前被其它線程占用,該線程會繼續阻塞,直至成功獲取互斥鎖。一旦獲取到鎖,線程就會從 pthread_cond_wait() 函數返回,繼續執行后續代碼。
功能: 該函數用于向指定的條件變量cond發出信號, 喚醒一個正在該條件變量上等待的線程
原型: int pthread_cond_signal(pthread_cond_t *cond);功能: 該函數用于向指定的條件變量cond發出廣播信號, 喚醒所有正在該條件變量上等待的線程
原型: int pthread_cond_broadcast(pthread_cond_t *cond);
注意:pthread_cond_signal() 和 pthread_cond_broadcast() 不會自動釋放互斥鎖,調用該函數的線程仍然持有互斥鎖。調用后通常需要手動釋放互斥鎖,被喚醒的多個線程會競爭獲取互斥鎖,獲取到鎖的線程才能繼續執行。
#include <iostream>
#include <unistd.h>
#include <pthread.h>pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;void *active(void *args)
{std::string name = static_cast<char *>(args);while (true){pthread_mutex_lock(&mutex);// 沒有對資源釋放就緒的判定// std::cout << name << " is waiting" << std::endl;pthread_cond_wait(&cond, &mutex); // mutex???std::cout << name << " is active" << std::endl;pthread_mutex_unlock(&mutex);}
}int main()
{pthread_t tid1, tid2, tid3;pthread_create(&tid1, nullptr, active, (void *)"thread-1");pthread_create(&tid2, nullptr, active, (void *)"thread-2");pthread_create(&tid3, nullptr, active, (void *)"thread-3");sleep(1);std::cout << "main thread ctrl begin..." << std::endl;while (true){std::cout << "main wakeup thread..." << std::endl;pthread_cond_signal(&cond);// pthread_cond_broadcast(&cond);sleep(1);}pthread_join(tid1, nullptr);pthread_join(tid2, nullptr);pthread_join(tid3, nullptr);return 0;
}
# 按照線程1、2、3的順序, 實現同步
xzy@hcss-ecs-b3aa:~$ ./testCond
main thread ctrl begin...
main wakeup thread...
thread-1 is active
main wakeup thread...
thread-2 is active
main wakeup thread...
thread-3 is active
main wakeup thread...
thread-1 is active
main wakeup thread...
thread-2 is active
main wakeup thread...
thread-3 is active
#include <iostream>
#include <unistd.h>
#include <pthread.h>int ticket = 0;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;void *route(void *args)
{std::string name = static_cast<char *>(args);while (1){pthread_mutex_lock(&mutex);if (ticket > 0){usleep(1000);std::cout << name << " buys a new ticket: " << ticket << std::endl;ticket--;pthread_mutex_unlock(&mutex);}else{pthread_cond_wait(&cond, &mutex);std::cout << "主線程出票完成, " << name << " 醒來" << std::endl;pthread_mutex_unlock(&mutex);}// usleep(50);}return nullptr;
}int main()
{pthread_t t1, t2, t3;pthread_create(&t1, nullptr, route, (void *)"thread 1");pthread_create(&t2, nullptr, route, (void *)"thread 2");pthread_create(&t3, nullptr, route, (void *)"thread 3");int cnt = 10;while(true){sleep(5);ticket += cnt;std::cout << "主線程發票啦, ticket: " << ticket << std::endl;pthread_cond_signal(&cond);// pthread_cond_broadcast(&cond);}pthread_join(t1, nullptr);pthread_join(t2, nullptr);pthread_join(t3, nullptr);return 0;
}
# 按照線程1、2、3的順序依次搶票, 實現同步
xzy@hcss-ecs-b3aa:~$ ./testCond
主線程發票啦, ticket: 3
主線程出票完成, thread 1 醒來
thread 1 buys a new ticket: 3
thread 1 buys a new ticket: 2
thread 1 buys a new ticket: 1
主線程發票啦, ticket: 3
主線程出票完成, thread 2 醒來
thread 2 buys a new ticket: 3
thread 2 buys a new ticket: 2
thread 2 buys a new ticket: 1
主線程發票啦, ticket: 3
主線程出票完成, thread 3 醒來
thread 3 buys a new ticket: 3
thread 3 buys a new ticket: 2
thread 3 buys a new ticket: 1
4.條件變量封裝
// Cond.hpp
#pragma once#include <pthread.h>
#include "Mutex.hpp"
using namespace MutexModule;namespace CondModule
{class Cond{public:Cond() {::pthread_cond_init(&_cond, nullptr);}~Cond() {::pthread_cond_destroy(&_cond);}void Wait(Mutex &mutex) // 線程釋放曾經持有的鎖, 不能拷貝{::pthread_cond_wait(&_cond, mutex.LockAddr());}void Signal(){::pthread_cond_signal(&_cond);}void Broadcast(){::pthread_cond_broadcast(&_cond);}private:pthread_cond_t _cond;};
}
5.信號量 Semaphore
信號量:一種用于多進程或多線程環境下實現同步與互斥的機制。避免多個進程或線程同時訪問共享資源而引發的數據不一致或其他錯誤。
- SystemV 信號量:涉及內核,系統調用的開銷較大,性能可能會受到影響,并且操作復雜。
- POSIX信號量:接口簡潔,設計上更注重性能,尤其是對于線程間的同步和互斥操作。
信號量本質上是一個計數器,用于記錄系統中某種資源的可用數量,也就是最多允許線程進入共享資源的數量。配合PV兩個原子操作來控制對共享資源的訪問。PV 原子操作:
- P 操做:如果信號量的值大于 0,將信號量的值減 1,然后繼續執行;如果信號量的值為 0,則調用線程會被阻塞,直到信號量的值大于 0。
- V 操作:將信號量的值加 1。如果有其它線程正在等待該信號量,那么會喚醒其中一個等待的線程。
6.信號量接口
功能: 初始化信號量
原型: int sem_init(sem_t *sem, int pshared, unsigned int value);
參數:sem: 信號量對象的指針pshared: 0表示線程間共享, 非0表示進程間共享value: 信號量的初始值功能: 銷毀化信號量
原型: int sem_destroy(sem_t *sem);功能: P操作
原型: int sem_wait(sem_t *sem);功能: V操作
原型: int sem_post(sem_t *sem);
- 二元信號量:可用的資源只有一份,只允許一個線程訪問共享資源,類似互斥鎖。當信號量的值為 1 時,表示資源可用;當值為 0 時,表示資源已被占用。如下用二元信號量寫法代替互斥鎖:
#include <iostream>
#include <unistd.h>
#include <semaphore.h>int ticket = 1000;
sem_t sem;void* Ticket(void *args)
{std::string name = static_cast<char*>(args);while(true){sem_wait(&sem); // 申請信號量if(ticket > 0){usleep(1000);std::cout << name << " buys a ticket: " << ticket << std::endl;ticket--;sem_post(&sem); // 釋放信號量}else{sem_post(&sem); // 釋放信號量break;}}return nullptr;
}int main()
{sem_init(&sem, 0, 1);pthread_t t1, t2, t3;pthread_create(&t1, nullptr, Ticket, (void*)"thread-1");pthread_create(&t2, nullptr, Ticket, (void*)"thread-2");pthread_create(&t3, nullptr, Ticket, (void*)"thread-3");pthread_join(t1, nullptr);pthread_join(t2, nullptr);pthread_join(t3, nullptr);sem_destroy(&sem);return 0;
}
xzy@hcss-ecs-b3aa:~$ ./ticket
thread-1 buys a ticket: 1000
thread-1 buys a ticket: 999
thread-1 buys a ticket: 998
...
thread-1 buys a ticket: 3
thread-1 buys a ticket: 2
thread-1 buys a ticket: 1
當允許最多3個線程并發訪問共享資源時:如何確保數據安全問題?加鎖!
#include <iostream>
#include <unistd.h>
#include <semaphore.h>
#include "Mutex.hpp"int ticket = 1000;
sem_t sem;
Mutex mutex;void* Ticket(void *args)
{std::string name = static_cast<char*>(args);while(true){sem_wait(&sem); // 申請信號量LockGuard lockguard(mutex); // RAII方式加鎖if(ticket > 0){usleep(1000);std::cout << name << " buys a ticket: " << ticket << std::endl;ticket--;sem_post(&sem); // 釋放信號量}else{sem_post(&sem); // 釋放信號量break;}}return nullptr;
}int main()
{sem_init(&sem, 0, 3); // 允許最多3個線程訪問共享資源pthread_t t1, t2, t3;pthread_create(&t1, nullptr, Ticket, (void*)"thread-1");pthread_create(&t2, nullptr, Ticket, (void*)"thread-2");pthread_create(&t3, nullptr, Ticket, (void*)"thread-3");pthread_join(t1, nullptr);pthread_join(t2, nullptr);pthread_join(t3, nullptr);sem_destroy(&sem);return 0;
}
xzy@hcss-ecs-b3aa:~$ ./ticket
thread-1 buys a ticket: 1000
thread-1 buys a ticket: 999
thread-1 buys a ticket: 998
...
thread-1 buys a ticket: 3
thread-1 buys a ticket: 2
thread-1 buys a ticket: 1
7.信號量封裝
// Sem.hpp
#pragma once
#include <semaphore.h>
namespace SemMoudel
{class Sem{public:Sem(int value = 1): _value(value){::sem_init(&_sem, 0, _value);}~Sem(){::sem_destroy(&_sem);}void P(){::sem_wait(&_sem);}void V(){::sem_post(&_sem);}private:sem_t _sem;int _value;};
}
8.生產者 - 消費者模型
單線程通常是串行執行,多線程通常是單核CPU并發、多核CPU并行。并發比串行效率高,并行比并發效率高。多線程中的某一個線程在執行IO操作時,線程掛起,會釋放 CPU 資源,允許操作系統將 CPU 時間片分配給其它就緒的線程,也就是以并發的方式提高 CPU 效率。
- 生產者 - 消費者模型:多線程或多進程協作設計模式,用于解決生產者和消費者之間數據交互問題。
- 生產者和消費者彼此之間不直接通訊,而通過緩沖區來進行通訊。生產者首先檢查緩沖區是否還有空閑空間,如果有,則生產一個數據項并將其放入緩沖區;如果緩沖區已滿,生產者需要阻塞等待,直到有消費者從緩沖區中取走數據,騰出空間。消費者檢查緩沖區是否有可用的數據項,如果有,則從緩沖區中取出一個數據項進行處理;如果緩沖區為空,消費者需要等待,直到生產者向緩沖區中添加了新的數據項。這個緩沖區用來給生產者和消費者解耦的。
生產者 - 消費者模型效率高的原因:
- 解耦:生產者和消費者不需要直接交互,它們只需要與緩沖區進行交互,從而降低了兩者之間的耦合度,使得系統的可維護性和可擴展性得到提高。
- 支持并發,但臨界區需要同步互斥,防止并發造成數據不一致問題
- 緩存機制:平衡了生產和消費的速度差異。
總結:
- 一個交易場所:臨界資源。
- 兩個角色:生產者和消費者。
- 三種關系:生產者與生產者:互斥關系。消費者與消費者:互斥關系。生產者與消費者:互斥和同步關系(生產者需要等待緩沖區有空閑空間才能生產數據,消費者需要等待緩沖區有數據才能消費)
1.基于 Blocking Queue 的生產者 - 消費者模型
- 阻塞隊列:一種常用于實現生產者 - 消費者模型的數據結構。
- 其與普通的隊列區別:當隊列為空時,從隊列獲取元素的操作將會被阻塞,直到隊列中被放入了元素。當隊列滿時,往隊列里存放元素的操作也會被阻塞,直到有元素被從隊列中取出。生產者和消費者存在同步關系。
- 考慮使用 互斥鎖 + 條件變量 實現阻塞隊列!
利用 pthread.h 線程庫的 Mutex 和 Cond 實現基于 Blocking Queue 的生產者 - 消費者模型:
// BlockQueue.hpp
#pragma once#include <iostream>
#include <queue>
#include <pthread.h>namespace BlockQueueMoudel
{static int gcap = 10;template <class T>class BlockQueue{private:bool IsFull() { return _q.size() == _cap; }bool IsEmpty() { return _q.empty(); }public:BlockQueue(int cap = gcap): _cap(cap), _p_wait_num(0), _c_wait_num(0){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_producer_cond, nullptr);pthread_cond_init(&_consumer_cond, nullptr);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_producer_cond);pthread_cond_destroy(&_consumer_cond);}// 生產者void Push(const T &in){pthread_mutex_lock(&_mutex);// 生產數據是有條件的, 容量不能為滿while (IsFull()) // while替代if: 防止偽喚醒{std::cout << "生產者進入等待..." << std::endl;_p_wait_num++;// 生產者線程等待時, 需解鎖讓線程掛起, 調度消費者線程, 消費一個數據后,// 通知生產者線程條件滿足(可以生產數據), 接著需要再阻塞等待加鎖pthread_cond_wait(&_producer_cond, &_mutex); // 等待必須在臨界區: IsFull訪問了臨界資源// wait完成后: 生產者線程被喚醒 && 重新申請并持有鎖(仍在臨界區)_p_wait_num--;std::cout << "生產者已被喚醒..." << std::endl;}// IsFull()不滿足 || 生產者線程被喚醒_q.push(in);// 肯定有數據: 若消費者線程在等待, 直接喚醒if (_c_wait_num){// std::cout << "喚醒消費者" << std::endl;pthread_cond_signal(&_consumer_cond);}// 喚醒在解鎖后也可以pthread_mutex_unlock(&_mutex);}// 消費者void Pop(T *out){pthread_mutex_lock(&_mutex);// 消費數據是有條件的, 容量不能為空while (IsEmpty()) // while替代if: 防止偽喚醒{std::cout << "消費者進入等待..." << std::endl;_c_wait_num++;// 消費者線程等待時, 需解鎖讓線程掛起, 調度生產者線程, 生產一個數據后,// 通知消費者線程條件滿足(可以消費數據), 接著需要再阻塞等待加鎖pthread_cond_wait(&_consumer_cond, &_mutex); // 等待必須在臨界區: IsEmpty訪問了臨界資源// wait完成后: 消費者線程被喚醒 && 重新申請并持有鎖(仍在臨界區)_c_wait_num--;std::cout << "消費者已被喚醒..." << std::endl;}// IsEmpty()不滿足 || 消費者線程被喚醒*out = _q.front();_q.pop();// 肯定有空間: 若生產者線程在等待, 直接喚醒if (_p_wait_num){// std::cout << "喚醒生產者" << std::endl;pthread_cond_signal(&_producer_cond);}// 喚醒在解鎖后也可以pthread_mutex_unlock(&_mutex);}private:std::queue<T> _q; // 保存數據, 臨界資源int _cap; // bq的最大容量pthread_mutex_t _mutex; // 互斥鎖pthread_cond_t _producer_cond; // 生產者條件變量pthread_cond_t _consumer_cond; // 消費者條件變量int _p_wait_num; // 生產者線程等待個數int _c_wait_num; // 消費者線程等待個數};
}// Main.cc
#include <unistd.h>
#include "BlockQueue.hpp"
using namespace BlockQueueMoudel;// 生產者
void *Producer(void *args)
{BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);int data = 10;while (true){// sleep(2);// 1.生產到bq隊列中bq->Push(data);std::cout << "producer 生產了一個數據: " << data << std::endl;// 2.更新下一個生產的數據data++;}
}// 消費者
void *Consumer(void *args)
{BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);while (true){sleep(2);// 1.從bq隊列獲取數據int data;bq->Pop(&data);// 2.消費數據std::cout << "consumer 消費了一個數據: " << data << std::endl;}
}int main()
{BlockQueue<int> *bq = new BlockQueue<int>(5);// 單生產者、單消費者pthread_t p, c;pthread_create(&p, nullptr, Producer, bq);pthread_create(&c, nullptr, Consumer, bq);pthread_join(p, nullptr);pthread_join(c, nullptr);// 多生產者、多消費者// pthread_t p1, p2, p3, c1, c2;// pthread_create(&p1, nullptr, Producer, bq);// pthread_create(&p2, nullptr, Producer, bq);// pthread_create(&p3, nullptr, Producer, bq);// pthread_create(&c1, nullptr, Consumer, bq);// pthread_create(&c2, nullptr, Consumer, bq);// pthread_join(p1, nullptr);// pthread_join(p2, nullptr);// pthread_join(p3, nullptr);// pthread_join(c1, nullptr);// pthread_join(c2, nullptr);delete bq;return 0;
}
xzy@hcss-ecs-b3aa:~$ ./bq
producer 生產了一個數據: 10
producer 生產了一個數據: 11
producer 生產了一個數據: 12
producer 生產了一個數據: 13
producer 生產了一個數據: 14
生產者進入等待...
consumer 消費了一個數據: 10
生產者已被喚醒...
producer 生產了一個數據: 15
生產者進入等待...
consumer 消費了一個數據: 11
生產者已被喚醒...
producer 生產了一個數據: 16
生產者進入等待...
利用自己封裝的 Mutex 和 Cond 實現基于 Blocking Queue 的生產者消費者模型:
// BlockQueue.hpp
#pragma once#include <iostream>
#include <queue>
#include <pthread.h>
#include "Mutex.hpp"
#include "Cond.hpp"
using namespace MutexModule;
using namespace CondModule;namespace BlockQueueMoudel
{static int gcap = 10;template <class T>class BlockQueue{private:bool IsFull() { return _q.size() == _cap; }bool IsEmpty() { return _q.empty(); }public:BlockQueue(int cap = gcap): _cap(cap), _p_wait_num(0), _c_wait_num(0){}~BlockQueue() {}// 生產者void Push(const T &in){LockGuard lockguard(_mutex);while (IsFull()){std::cout << "生產者進入等待..." << std::endl;_p_wait_num++;_producer_cond.Wait(_mutex);_p_wait_num--;std::cout << "生產者已被喚醒..." << std::endl;}_q.push(in);if (_c_wait_num){// std::cout << "喚醒消費者" << std::endl;_consumer_cond.Signal();}}// 消費者void Pop(T *out){LockGuard lockguard(_mutex);while (IsEmpty()){std::cout << "消費者進入等待..." << std::endl;_c_wait_num++;_consumer_cond.Wait(_mutex);_c_wait_num--;std::cout << "消費者已被喚醒..." << std::endl;}*out = _q.front();_q.pop();if (_p_wait_num){// std::cout << "喚醒生產者" << std::endl;_producer_cond.Signal();}}private:std::queue<T> _q; // 保存數據, 臨界資源int _cap; // bq的最大容量Mutex _mutex; // 互斥鎖Cond _producer_cond; // 生產者條件變量Cond _consumer_cond; // 消費者條件變量int _p_wait_num; // 生產者線程等待個數int _c_wait_num; // 消費者線程等待個數};
}// Main.cc
#include <iostream>
#include <vector>
#include <functional>
#include <unistd.h>
#include <pthread.h>
#include "BlockQueue.hpp"using namespace BlockQueueMoudel;
using task_t = std::function<void()>;std::vector<task_t> tasks;
void Sql() { std::cout << "我是一個數據庫任務" << std::endl; }
void UpLoad() { std::cout << "我是一個上傳任務" << std::endl; }
void DownLoad() { std::cout << "我是一個下載任務" << std::endl; }// 生產者
void *Producer(void *args)
{BlockQueue<task_t> *bq = static_cast<BlockQueue<task_t> *>(args);int cnt = 0;while (true){// sleep(2);// 1.從tasks數組中獲取任務bq->Push(tasks[cnt % 3]);cnt++;// 2.生產任務std::cout << "producer 生產了一個任務" << std::endl;}
}// 消費者
void *Consumer(void *args)
{BlockQueue<task_t> *bq = static_cast<BlockQueue<task_t> *>(args);while (true){sleep(2);// 1.從bq隊列中獲取任務task_t t;bq->Pop(&t);// 2.處理任務t();std::cout << "consumer 處理完成一個任務" << std::endl;}
}int main()
{tasks.push_back(Sql);tasks.push_back(UpLoad);tasks.push_back(DownLoad);BlockQueue<task_t> *bq = new BlockQueue<task_t>(5);// 單生產者、單消費者pthread_t p, c;pthread_create(&p, nullptr, Producer, bq);pthread_create(&c, nullptr, Consumer, bq);pthread_join(p, nullptr);pthread_join(c, nullptr);delete bq;return 0;
}
xzy@hcss-ecs-b3aa:~$ ./bq
producer 生產了一個任務
producer 生產了一個任務
producer 生產了一個任務
producer 生產了一個任務
producer 生產了一個任務
生產者進入等待...
我是一個數據庫任務
consumer 處理完成一個任務
生產者已被喚醒...
producer 生產了一個任務
生產者進入等待...
我是一個上傳任務
consumer 處理完成一個任務
生產者已被喚醒...
producer 生產了一個任務
生產者進入等待...
我是一個下載任務
consumer 處理完成一個任務
生產者已被喚醒...
producer 生產了一個任務
生產者進入等待...
2.基于 Ring Queue 的生產者 - 消費者模型
- 環型隊列:實現數據高效生產和消費的經典設計模式,用于解決多線程/多進程環境下生產者和消費者之間的數據共享與同步問題。
- 生產者負責將數據放入循環隊列,而消費者則從隊列中取出數據進行處理。為了確保線程安全和避免數據競爭,需要使用同步機制來控制對隊列的訪問。當隊列已滿時,生產者需要等待;當隊列為空時,消費者需要等待。
- 考慮使用 信號量 實現循環隊列!
通過生產者生產數據:空間-1,數據+1;消費者消費數據:數據-1,空間+1。實現數據和空間的平衡!
單生產者單消費者:
- 當隊列為空/滿 時:生產者和消費者訪問同一個位置(資源),同步互斥!
- 當隊列非空/滿 時:生產者和消費者訪問不同的位置(資源),并發!
多單生產者單消費者:
- 生產者和生產者互斥關系,消費者和消費者互斥關系。
- 生產者和消費者同意滿足上面的關系。
利用自己封裝的 Mutex 和 Sem 實現基于 Ring Queue 的生產者消費者模型:
// RingQueue.hpp
#pragma once#include <iostream>
#include <vector>
#include <pthread.h>
#include "Sem.hpp"
#include "Mutex.hpp"using namespace SemMoudel;
using namespace MutexModule;namespace RingQueueMoudel
{template <class T>class RingQueue{public:RingQueue(int cap): _rq(cap), _cap(cap), _p_pos(0), _c_pos(0), _data_sem(0), _space_sem(cap){}~RingQueue(){}// 生產者void Push(const T &in){// 當隊滿: 阻塞, 直到消費者消費數據_space_sem.P(); // 申請空間{// 先申請信號量, 再申請鎖: 此時信號量的申請是并行的, 效率高一點LockGuard lockguard(_p_mutex);_rq[_p_pos] = in;_p_pos++;_p_pos %= _cap;}_data_sem.V(); // 釋放數據}// 消費者void Pop(T *out){// 當隊空: 阻塞, 直到生產者生產數據_data_sem.P(); // 申請數據{// 先申請信號量, 再申請鎖: 此時信號量的申請是并行的, 效率高一點LockGuard lockguard(_c_mutex);*out = _rq[_c_pos];_c_pos++;_c_pos %= _cap;}_space_sem.V(); // 釋放空間}private:std::vector<T> _rq; // 環型隊列, 臨界資源int _cap; // 最大容量int _p_pos; // 生產者位置int _c_pos; // 消費者位置Sem _data_sem; // 數據信號量Sem _space_sem; // 空間信號量Mutex _p_mutex; // 生產者的鎖Mutex _c_mutex; // 消費者的鎖};
}// Main.cc
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include "RingQueue.hpp"using namespace RingQueueMoudel;void *Producer(void *args)
{RingQueue<int> *rq = static_cast<RingQueue<int> *>(args);int data = 0;while(true){// 1.獲取數據// 2.生產數據rq->Push(data);std::cout << "producer 生產了一個數據" << data << std::endl;data++;}
}void *Consumer(void *args)
{RingQueue<int> *rq = static_cast<RingQueue<int> *>(args);while(true){sleep(1);// 1.消費數據int data;rq->Pop(&data);// 2.處理數據std::cout << "consumer 消費了一個數據" << data << std::endl;}
}int main()
{RingQueue<int> *rq = new RingQueue<int>(5);pthread_t p, c;pthread_create(&p, nullptr, Producer, rq);pthread_create(&c, nullptr, Consumer, rq);pthread_join(p, nullptr);pthread_join(c, nullptr);// pthread_t p1, p2, c1, c2, c3;// pthread_create(&p1, nullptr, Producer, rq);// pthread_create(&p2, nullptr, Producer, rq);// pthread_create(&c1, nullptr, Consumer, rq);// pthread_create(&c2, nullptr, Consumer, rq);// pthread_create(&c3, nullptr, Consumer, rq);// pthread_join(p1, nullptr);// pthread_join(p2, nullptr);// pthread_join(c1, nullptr);// pthread_join(c2, nullptr);// pthread_join(c3, nullptr);delete rq;return 0;
}
xzy@hcss-ecs-b3aa:~$ ./rq
producer 生產了一個數據0
producer 生產了一個數據1
producer 生產了一個數據2
producer 生產了一個數據3
producer 生產了一個數據4
consumer 消費了一個數據0
producer 生產了一個數據5
consumer 消費了一個數據1
producer 生產了一個數據6
consumer 消費了一個數據2
producer 生產了一個數據7
三.線程池
在寫線程池之前,我們要做如下準備:
- 準備線程的封裝。
- 準備鎖和條件變量的封裝。
- 引入日志,對線程進行封裝。
1.日志和策略模式
- 日志:記錄系統和軟件運行中發生事件的文件,主要作用是監控運行狀態、記錄異常信息,幫助快速定位問題并支持程序員進行問題修復。它是系統維護、故障排查和安全管理的重要工具。
- 日志格式中的某些指標是必須有:時間戳、日志等級、日志內容。存在幾個指標是可選的:文件名行號、進程,線程相關id信息等。
- 日志有現成的解決方案:spdlog、glog、Boost.Log、Log4cxx等。日志位于 /var/log/ 路徑下
- 設計模式:在軟件開發過程中,針對反復出現的問題所總結歸納出的通用解決方案。
策略模式:
- 抽象策略類(基類):包含一個或多個純虛函數,用于聲明具體策略類需要實現的接口。
- 具體策略類(派生類):重寫了抽象策略類中定義的接口,每個具體策略類代表一個具體的接口。
- 上下文類:持有一個抽象策略類的指針/引用,負責根據需要選擇和使用具體的策略類。
抽象策略類的作用:定義統一接口,運行時多態,提高代碼的可維護性和可擴展性。
這里采用 設計模式 - 策略模式 來進行日志的設計,我們想要的日志格式如下:
[可讀性很好的時間] [日志等級] [進程pid] [打印對應日志的文件名][行號] - 消息內容, 支持可變參數[2025-03-08 00:43:30] [DEBUG] [882217] [Main.cc] [9] - hello world
[2025-03-08 00:43:30] [DEBUG] [882217] [Main.cc] [10] - hello world
[2025-03-08 00:43:30] [DEBUG] [882217] [Main.cc] [11] - hello world
[2025-03-08 00:43:30] [DEBUG] [882217] [Main.cc] [12] - hello world
// Log.hpp
#pragma once#include <iostream>
#include <cstdio>
#include <string>
#include <filesystem> // C++17文件系統
#include <fstream> // 文件流
#include <sstream> // 字符串流
#include <memory>
#include <unistd.h>
#include <time.h>
#include "Mutex.hpp"namespace LogModule
{using namespace MutexModule;// 獲取系統時間std::string CurrentTime(){time_t time_stamp = ::time(nullptr); // 獲取時間戳struct tm curr;localtime_r(&time_stamp, &curr); // 將時間戳轉化為可讀性強的信息char buffer[1024];snprintf(buffer, sizeof(buffer), "%4d-%02d-%02d %02d:%02d:%02d",curr.tm_year + 1900,curr.tm_mon + 1,curr.tm_mday,curr.tm_hour,curr.tm_min,curr.tm_sec);return buffer;}// 日志文件: 默認路徑和默認文件名const std::string defaultlogpath = "./log/";const std::string defaultlogname = "log.txt";// 日志等級enum class LogLevel{DEBUG = 1,INFO,WARNING,ERROR,FATAL};std::string Level2String(LogLevel level){switch (level){case LogLevel::DEBUG:return "DEBUG";case LogLevel::INFO:return "INFO";case LogLevel::WARNING:return "WARNING";case LogLevel::ERROR:return "ERROR";case LogLevel::FATAL:return "FATAL";default:return "NONE";}}// 3. 策略模式: 刷新策略class LogStrategy{public:virtual ~LogStrategy() = default; //???// 純虛函數: 無法實例化對象, 派生類可以重載該函數, 實現不同的刷新方式virtual void SyncLog(const std::string &message) = 0;};// 3.1 控制臺策略class ConsoleLogStrategy : public LogStrategy{public:ConsoleLogStrategy() {}~ConsoleLogStrategy() {}void SyncLog(const std::string &message) override{LockGuard lockguard(_mutex);std::cout << message << std::endl;}private:Mutex _mutex;};// 3.2 文件級(磁盤)策略class FileLogStrategy : public LogStrategy{public:FileLogStrategy(const std::string &logpath = defaultlogpath, const std::string &logname = defaultlogname): _logpath(logpath), _logname(logname){// 判斷_logpath目錄是否存在if (std::filesystem::exists(_logpath)){return;}try{std::filesystem::create_directories(_logpath);}catch (std::filesystem::filesystem_error &e){std::cerr << e.what() << "\n";}}~FileLogStrategy() {}void SyncLog(const std::string &message) override{LockGuard lockguard(_mutex);std::string log = _logpath + _logname;std::ofstream out(log, std::ios::app); // 以追加的方式打開文件if (!out.is_open()){return;}out << message << "\n"; // 將信息刷新到out流中out.close();}private:std::string _logpath;std::string _logname;Mutex _mutex;};// 4. 日志類: 構建日志字符串, 根據策略進行刷新class Logger{public:Logger(){// 默認往控制臺上刷新_strategy = std::make_shared<ConsoleLogStrategy>();}~Logger() {}void EnableConsoleLog(){_strategy = std::make_shared<ConsoleLogStrategy>();}void EnableFileLog(){_strategy = std::make_shared<FileLogStrategy>();}// 內部類: 記錄完整的日志信息class LogMessage{public:LogMessage(LogLevel level, const std::string &filename, int line, Logger &logger): _currtime(CurrentTime()), _level(level), _pid(::getpid()), _filename(filename), _line(line), _logger(logger){std::stringstream ssbuffer;ssbuffer << "[" << _currtime << "] "<< "[" << Level2String(_level) << "] "<< "[" << _pid << "] "<< "[" << _filename << "] "<< "[" << _line << "] - ";_loginfo = ssbuffer.str();}~LogMessage(){if(_logger._strategy){_logger._strategy->SyncLog(_loginfo);}}template <class T>LogMessage &operator<<(const T &info){std::stringstream ssbuffer;ssbuffer << info;_loginfo += ssbuffer.str();return *this;}private:std::string _currtime; // 當前日志時間LogLevel _level; // 日志水平pid_t _pid; // 進程pidstd::string _filename; // 文件名uint32_t _line; // 日志行號Logger &_logger; // 負責根據不同的策略進行刷新std::string _loginfo; // 日志信息};// 故意拷貝, 形成LogMessage臨時對象, 后續在被<<時,會被持續引用,// 直到完成輸入,才會自動析構臨時LogMessage, 至此完成了日志的刷新,// 同時形成的臨時對象內包含獨立日志數據, 未來采用宏替換, 獲取文件名和代碼行數LogMessage operator()(LogLevel level, const std::string &filename, int line){return LogMessage(level, filename, line, *this);}private:// 純虛類不能實例化對象, 但是可以定義指針std::shared_ptr<LogStrategy> _strategy; // 日志刷新策略方案};// 定義全局logger對象Logger logger;// 編譯時進行宏替換: 方便隨時獲取行號和文件名
#define LOG(level) logger(level, __FILE__, __LINE__)// 提供選擇使用何種日志策略的方法
#define ENABLE_CONSOLE_LOG() logger.EnableConsoleLog()
#define ENABLE_FILE_LOG() logger.EnableFileLog()
}// Main.cc
#include <iostream>
#include "Log.hpp"
using namespace LogModule;int main()
{// 往顯示器中寫入ENABLE_CONSOLE_LOG();LOG(LogLevel::DEBUG) << "hello world";LOG(LogLevel::DEBUG) << "hello world";LOG(LogLevel::DEBUG) << "hello world";LOG(LogLevel::DEBUG) << "hello world";// 往文件中寫入ENABLE_FILE_LOG();LOG(LogLevel::DEBUG) << "hello world";LOG(LogLevel::DEBUG) << "hello world";LOG(LogLevel::DEBUG) << "hello world";LOG(LogLevel::DEBUG) << "hello world";return 0;
}
xzy@hcss-ecs-b3aa:~$ ./testLog
[2025-03-08 00:43:30] [DEBUG] [882217] [Main.cc] [9] - hello world
[2025-03-08 00:43:30] [DEBUG] [882217] [Main.cc] [10] - hello world
[2025-03-08 00:43:30] [DEBUG] [882217] [Main.cc] [11] - hello world
[2025-03-08 00:43:30] [DEBUG] [882217] [Main.cc] [12] - hello world