Linux中的workqueue機制就是為了簡化內核線程的創建。通過調用workqueue的接口就能創建內核線程。并且可以根據當前系統的CPU的個數創建線程的數量,使得線程處理的事務能夠并行化。
工作隊列(workqueue)是另外一種將工作推后執行的形式。工作隊列可以把工作推后,交由一個內核線程去執行,也就是說,這個下半部分可以在進程上下文執行。最重要的就是工作隊列允許被重新調度甚至睡眠。
為什么需要工作隊列?
在內核代碼中,經常會遇到不能或不合適馬上調用某個處理過程,此時希望將該工作推給某個內核線程執行,這樣做的原因有很多,比如:
- 中斷觸發了某個過程的執行條件,而該過程執行時間較長或者會調用導致睡眠的函數,則該過程不應該在中斷上下文中立即被調用。
- 類似于中斷,一些緊急性的任務不希望執行比較耗時的非關鍵過程,則需要把該過程提交到低優先級線程執行。比如一個輪詢的通信接收線程,它需要快速完成檢測和接收數據,而對數據的解析則應該交給低優先級線程慢慢處理。
- 有時希望將一些工作集中起來以獲取批處理的性能;或者合并縮減一些執行線程,減少資源消耗。
基于以上需求,人們開發除了工作隊列這一機制。工作隊列不光在操作系統內核中會用到,一些應用程序或協議棧也會實現自己的工作隊列。
工作隊列的概念
工作隊列(workqueue):是將操作(或回調)延期異步執行的一種機制。工作隊列可以把工作推后,交由一個內核線程去執行,并且工作隊列是執行在線程上下文中,因此工作隊列執行過程中可以被重新調度、搶占、睡眠。
工作項(work item):是工作隊列中的元素,是一個回調函數和多個回調函數參數的集合,有時也會有額外的屬性成員,總之通過一個結構體即可記錄和描述一個工作項。
關鍵數據結構
work_struct
struct work_struct {atomic_long_t data;struct list_head entry;work_func_t func;
#ifdef CONFIG_LOCKDEPstruct lockdep_map lockdep_map;
#endifANDROID_KABI_RESERVE(1);ANDROID_KABI_RESERVE(2);
};
?workqueue_struct
如何傳參?
func的參數是一個work_struct指針,指向的數據就是定義func的work_struct。
看到這里,會有兩個疑問:
第一:如何把用戶的數據作為參數傳遞給func呢?
第二:如何實現延遲工作?
解決第一個問題:工作隊列需要把work_struct定義在用戶的數據結構中,然后通過container_of來得到用戶數據。
對于第二個問題,新的工作隊列把timer拿掉的用意是使得work_struct更加單純。首先回憶一下以前的版本,只有在需要延遲執行工作時才會用到timer,普通情況下timer是沒有意義的,所以之前的做法在一定程序上有些浪費資源。所以新版本中,將timer從work_struct中拿掉,然后又定義了一個新的結構delayed_work用于延遲執行。
struct delayed_work {struct work_struct work;struct timer_list timer;};
API介紹
不是所有的驅動程序都必須有自己的工作隊列。驅動程序可以使用內核提供的缺省工作隊列。由于這個工作隊列由很多驅動程序共享,任務可能會需要比較長一段時間才能開始執行。為了解決這一問題,工作函數中的延遲應該保持最小或者不要延時。
創建工作隊列
每個工作隊列由一個專門的線程(即一個工作隊列一個線程),所有來自運行隊列的任務在進程的上下文中運行(這樣它們可以休眠)。驅動程序可以創建并使用它們自己的工作隊列,或者使用內核的一個工作隊列。
//創建工作隊列
struct workqueue_struct *create_workqueue(const char *name);
?創建工作隊列的任務
工作隊列任務可以在編譯時或者運行時創建。
//編譯時創建
DECLARE_WORK(name, void (*function)(void *), void *data);
//運行時創建
INIT_WORK(struct work_struct *work, void (*function)(void *), void *data);
?將任務添加到工作隊列中
//添加到指定工作隊列
int queue_work(struct workqueue_struct *queue, struct work_struct *work);<br>
int queue_delayed_work(struct workqueue_struct *queue, struct work_struct<br>
*work, unsigned long delay);//添加到內核默認工作隊列
int schedule_work(struct work_struct *work);
int schedule_delayed_work(struct work_struct *work, unsigned long delay);
delay:保證至少在經過一段給定的最小延遲時間以后,工作隊列中的任務才可以真正執行。
隊列和任務的清除操作
//取消任務
int cancel_delayed_work(struct work_struct *work);//清空隊列中的所有任務
void flush_workqueue(struct workqueue_struct *queue);//銷毀工作隊列
void destroy_workqueue(struct workqueue_struct *queue);
?舉例
struct my_struct_t {char *name;struct work_struct my_work;};void my_func(struct work_struct *work){struct my_struct_t *my_name = container_of(work, struct my_struct_t, my_work);printk(KERN_INFO “Hello world, my name is %s!\n”, my_name->name);}struct workqueue_struct *my_wq = create_workqueue(“my wq”);struct my_struct_t my_name;my_name.name = “Jack”;INIT_WORK(&(my_name.my_work), my_func);queue_work(my_wq, &my_work);
工作原理
workqueue是內核里面很重要的一個機制,特別是內核驅動,一般的小型任務(work)都不會自己起一個線程來處理,而是扔到workqueue中處理。workqueue的主要工作就是用進程上下文來處理內核中大量的小任務。
所以workqueue的主要涉及思想:一個是并行,多個work不要相互阻塞。另一個是節省資源,多個work盡量共享資源(進程、調度、內存),不要造成系統過多的資源浪費。
為了實現設計思想,workqueue的設計
實現也更新了很多版本。最新的workqueue實現叫做CMWQ(concurrency Managed Workqueue),也就是用更加只能的算法來實現“并行和節省”。新版本的workqueue創建函數改成alloc_workqueue(),舊版本的函數create_workqueue()逐漸會被廢棄。
CMWQ的幾個基本概念
關于workqueue中幾個概念都是work相關的數據結構,非常容易混淆,大概可以這樣理解。
1)work:工作
2)workqueue:工作集合。workqueue和work是一對多的關系
3)worker: 工人。在代碼中worker對應一個work_thread()內核線程
4)worker_pool: 工人的集合。worker_pool和worker是一對多的關系
5)PWQ(pool_workqueue):中間人/中介,負責建立workqueue和worker_pool之間的關系,workqueue和pwq是一對多的關系,pwq和worker_pool是一對一的關系。
worker_pool
每個執行work的線程叫做worker,一組worker的結合叫做worker_pool。CMWQ的精髓就在worker_pool里面的worker的動態增減的管理上 manage_workers()。
CMWQ對worker_pool分成兩類:
normal worker_pool,給通用的workqueue使用;
unbound worker_pool,給WQ_UNBOUND類型的workqueue使用;
normal worker_pool
默認work是在normal worker_pool中處理的。系統的規劃是每個CPU創建兩個normal worker_pool:一個Nomal的優先級(nice=0),一個高優先級(nice=HIGHPRI_NICE_LEVEL),對應創建出來的worker進程的nice不一樣。
每個worker對應一個worker_thread()內核線程,一個worker_pool包含一個或者多個worker,worker_pool中worker的數量是根據worker_pool中work的負載來動態增減的。
我們可以通過ps aux | grep kworker命令來查看所有worker對應的內核線程,normal worker_pool對應內核線程(worker_thread())的命名規則是這樣的:
snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id,pool->attrs->nice < 0 ? "H" : "");worker->task = kthread_create_on_node(worker_thread, worker, pool->node,"kworker/%s", id_buf);
so 類似名字是 normal worker_pool:
shell@PRO5:/ $ ps | grep "kworker"
root 14 2 0 0 worker_thr 0000000000 S kworker/1:0H // cpu1 高優先級 worker_pool 的第 0 個 worker 進程
root 17 2 0 0 worker_thr 0000000000 S kworker/2:0 // cpu2 低優先級 worker_pool 的第 0 個 worker 進程
root 18 2 0 0 worker_thr 0000000000 S kworker/2:0H // cpu2 高優先級 worker_pool 的第 0 個 worker 進程
root 23699 2 0 0 worker_thr 0000000000 S kworker/0:1 // cpu0 低優先級 worker_pool 的第 1 個 worker 進程
unbound worker_pool
大部分的work都是通過normal worker_pool來執行的(例如通過schedule_work()、schedule_work_on()壓入到系統workqueue中的work),最后都是通過normal worker_pool中的worker來執行的。這些worker是和某個CPU綁定的,work一旦被worker開始執行,都是一直運行到某個CPU上的,不會切換CPU。
unbound worker_pool相對應的意思,就是worker可以在多個CPU上調度。但是它其實也是綁定的,只不過它綁定的單位不是CPU,而是node,所謂的node是對NUMA(Non uniform Memory Access Architecture)系統來說的,NUMA可能存在多個Node,每個node可能包含一個或者多個CPU。
unbound worker_pool對應內核線程(worker_thread())的命名規則是這樣的:
snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id);worker->task = kthread_create_on_node(worker_thread, worker, pool->node,"kworker/%s", id_buf);
so 類似名字是 unbound worker_pool:
shell@PRO5:/ $ ps | grep "kworker"
root 23906 2 0 0 worker_thr 0000000000 S kworker/u20:2/* unbound pool 20 的第 2 個 worker 進程*/
root 24564 2 0 0 worker_thr 0000000000 S kworker/u20:0/* unbound pool 20 的第 0 個 worker 進程*/
root 24622 2 0 0 worker_thr 0000000000 S kworker/u21:1/* unbound pool 21 的第 1 個 worker 進程*/
worker
每個worker對應一個worker_thread()內核線程,一個worker_pool對應一個或者多個worker。多個worker從同一個鏈表中worker_pool->worklist獲取work進行處理。
這其中有幾個重點:
- worker怎么處理work;
- worker_pool怎么動態管理worker的數量;
worker處理work
處理 work 的過程主要在 worker_thread() -> process_one_work() 中處理,我們具體看看代碼的實現過程。
kernel/workqueue.c: worker_thread() -> process_one_work()
static int worker_thread(void *__worker)
{struct worker *worker = __worker;struct worker_pool *pool = worker->pool;/* tell the scheduler that this is a workqueue worker */worker->task->flags |= PF_WQ_WORKER;
woke_up:spin_lock_irq(&pool->lock);// (1) 是否 die/* am I supposed to die? */if (unlikely(worker->flags & WORKER_DIE)) {spin_unlock_irq(&pool->lock);WARN_ON_ONCE(!list_empty(&worker->entry));worker->task->flags &= ~PF_WQ_WORKER;set_task_comm(worker->task, "kworker/dying");ida_simple_remove(&pool->worker_ida, worker->id);worker_detach_from_pool(worker, pool);kfree(worker);return 0;}// (2) 脫離 idle 狀態// 被喚醒之前 worker 都是 idle 狀態worker_leave_idle(worker);
recheck:// (3) 如果需要本 worker 繼續執行則繼續,否則進入 idle 狀態// need more worker 的條件: (pool->worklist != 0) && (pool->nr_running == 0)// worklist 上有 work 需要執行,并且現在沒有處于 running 的 work/* no more worker necessary? */if (!need_more_worker(pool))goto sleep;// (4) 如果 (pool->nr_idle == 0),則啟動創建更多的 worker// 說明 idle 隊列中已經沒有備用 worker 了,先創建 一些 worker 備用/* do we need to manage? */if (unlikely(!may_start_working(pool)) && manage_workers(worker))goto recheck;/** ->scheduled list can only be filled while a worker is* preparing to process a work or actually processing it.* Make sure nobody diddled with it while I was sleeping.*/WARN_ON_ONCE(!list_empty(&worker->scheduled));/** Finish PREP stage. We're guaranteed to have at least one idle* worker or that someone else has already assumed the manager* role. This is where @worker starts participating in concurrency* management if applicable and concurrency management is restored* after being rebound. See rebind_workers() for details.*/worker_clr_flags(worker, WORKER_PREP | WORKER_REBOUND);do {// (5) 如果 pool->worklist 不為空,從其中取出一個 work 進行處理struct work_struct *work =list_first_entry(&pool->worklist,struct work_struct, entry);if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {/* optimization path, not strictly necessary */// (6) 執行正常的 workprocess_one_work(worker, work);if (unlikely(!list_empty(&worker->scheduled)))process_scheduled_works(worker);} else {// (7) 執行系統特意 scheduled 給某個 worker 的 work// 普通的 work 是放在池子的公共 list 中的 pool->worklist// 只有一些特殊的 work 被特意派送給某個 worker 的 worker->scheduled// 包括:1、執行 flush_work 時插入的 barrier work;// 2、collision 時從其他 worker 推送到本 worker 的 workmove_linked_works(work, &worker->scheduled, NULL);process_scheduled_works(worker);}// (8) worker keep_working 的條件:// pool->worklist 不為空 && (pool->nr_running <= 1)} while (keep_working(pool));worker_set_flags(worker, WORKER_PREP);supposed
sleep:// (9) worker 進入 idle 狀態/** pool->lock is held and there's no work to process and no need to* manage, sleep. Workers are woken up only while holding* pool->lock or from local cpu, so setting the current state* before releasing pool->lock is enough to prevent losing any* event.*/worker_enter_idle(worker);__set_current_state(TASK_INTERRUPTIBLE);spin_unlock_irq(&pool->lock);schedule();goto woke_up;
}
| →
static void process_one_work(struct worker *worker, struct work_struct *work)
__releases(&pool->lock)
__acquires(&pool->lock)
{struct pool_workqueue *pwq = get_work_pwq(work);struct worker_pool *pool = worker->pool;bool cpu_intensive = pwq->wq->flags & WQ_CPU_INTENSIVE;int work_color;struct worker *collision;
#ifdef CONFIG_LOCKDEP/** It is permissible to free the struct work_struct from* inside the function that is called from it, this we need to* take into account for lockdep too. To avoid bogus "held* lock freed" warnings as well as problems when looking into* work->lockdep_map, make a copy and use that here.*/struct lockdep_map lockdep_map;lockdep_copy_map(&lockdep_map, &work->lockdep_map);
#endif/* ensure we're on the correct CPU */WARN_ON_ONCE(!(pool->flags & POOL_DISASSOCIATED) &&raw_smp_processor_id() != pool->cpu);// (8.1) 如果 work 已經在 worker_pool 的其他 worker 上執行,// 將 work 放入對應 worker 的 scheduled 隊列中延后執行/** A single work shouldn't be executed concurrently by* multiple workers on a single cpu. Check whether anyone is* already processing the work. If so, defer the work to the* currently executing one.*/collision = find_worker_executing_work(pool, work);if (unlikely(collision)) {move_linked_works(work, &collision->scheduled, NULL);return;}// (8.2) 將 worker 加入 busy 隊列 pool->busy_hash/* claim and dequeue */debug_work_deactivate(work);hash_add(pool->busy_hash, &worker->hentry, (unsigned long)work);worker->current_work = work;worker->current_func = work->func;worker->current_pwq = pwq;work_color = get_work_color(work);list_del_init(&work->entry);// (8.3) 如果 work 所在的 wq 是 cpu 密集型的 WQ_CPU_INTENSIVE// 則當前 work 的執行脫離 worker_pool 的動態調度,成為一個獨立的線程/** CPU intensive works don't participate in concurrency management.* They're the scheduler's responsibility. This takes @worker out* of concurrency management and the next code block will chain* execution of the pending work items.*/if (unlikely(cpu_intensive))worker_set_flags(worker, WORKER_CPU_INTENSIVE);// (8.4) 在 UNBOUND 或者 CPU_INTENSIVE work 中判斷是否需要喚醒 idle worker// 普通 work 不會執行這個操作/** Wake up another worker if necessary. The condition is always* false for normal per-cpu workers since nr_running would always* be >= 1 at this point. This is used to chain execution of the* pending work items for WORKER_NOT_RUNNING workers such as the* UNBOUND and CPU_INTENSIVE ones.*/if (need_more_worker(pool))wake_up_worker(pool);/** Record the last pool and clear PENDING which should be the last* update to @work. Also, do this inside @pool->lock so that* PENDING and queued state changes happen together while IRQ is* disabled.*/set_work_pool_and_clear_pending(work, pool->id);spin_unlock_irq(&pool->lock);lock_map_acquire_read(&pwq->wq->lockdep_map);lock_map_acquire(&lockdep_map);trace_workqueue_execute_start(work);// (8.5) 執行 work 函數worker->current_func(work);/** While we must be careful to not use "work" after this, the trace* point will only record its address.*/trace_workqueue_execute_end(work);lock_map_release(&lockdep_map);lock_map_release(&pwq->wq->lockdep_map);if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {pr_err("BUG: workqueue leaked lock or atomic: %s/0x%08x/%d\n"" last function: %pf\n",current->comm, preempt_count(), task_pid_nr(current),worker->current_func);debug_show_held_locks(current);dump_stack();}/** The following prevents a kworker from hogging CPU on !PREEMPT* kernels, where a requeueing work item waiting for something to* happen could deadlock with stop_machine as such work item could* indefinitely requeue itself while all other CPUs are trapped in* stop_machine. At the same time, report a quiescent RCU state so* the same condition doesn't freeze RCU.*/cond_resched_rcu_qs();spin_lock_irq(&pool->lock);/* clear cpu intensive status */if (unlikely(cpu_intensive))worker_clr_flags(worker, WORKER_CPU_INTENSIVE);/* we're done with it, release */hash_del(&worker->hentry);worker->current_work = NULL;worker->current_func = NULL;worker->current_pwq = NULL;worker->desc_valid = false;pwq_dec_nr_in_flight(pwq, work_color);
}
worker_pool 動態管理 worker
worker_pool 怎么來動態增減 worker,這部分的算法是 CMWQ 的核心。其思想如下:
- worker_pool中的worker有3中狀態:idle、running、suspend;
- 如果worker_pool中有work需要處理,保持至少一個running worker來處理;
- running worker在處理work的過程中進入了阻塞suspend狀態,為了保持其他work的執行,需要喚醒新的idle worker來處理work;
- 如果有work需要執行且running worker大于1個,會讓多余的running worker進入idle狀態。
- 如果沒有work需要執行,會讓所有work進入idle狀態;
- 如果創建的worker過多,destroy_worker在300s(IDLE_WORKER_TIMEOUT)時間內沒有再次運行的idle_worker。
workqueue
workqueue就是存放一組work的集合,基本可以分為兩類:一類是系統創建的workqueue,一類是用戶自己創建的workqueue。不論是系統還是用戶的workqueue,如果沒有指定WQ_UNBOUND,默認都是和normal worker_pool綁定。
系統wrokqueue
系統在初始化時創建了一批默認的workqueue:system_wq、system_highpri_wq、system_unbound_wq、system_freezable_wq、system_power_efficient_wq、system_freezable_power_efficient_wq。
像system_wq,就是schedule_work()默認使用的。
kernel/workqueue.c:init_workqueues()
static int __init init_workqueues(void)
{system_wq = alloc_workqueue("events", 0, 0);system_highpri_wq = alloc_workqueue("events_highpri", WQ_HIGHPRI, 0);system_long_wq = alloc_workqueue("events_long", 0, 0);system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND,WQ_UNBOUND_MAX_ACTIVE);system_freezable_wq = alloc_workqueue("events_freezable",WQ_FREEZABLE, 0);system_power_efficient_wq = alloc_workqueue("events_power_efficient",WQ_POWER_EFFICIENT, 0);system_freezable_power_efficient_wq = alloc_workqueue("events_freezable_power_efficient",WQ_FREEZABLE | WQ_POWER_EFFICIENT,0);
}
workqueue 創建
詳細過程見上幾節的代碼分析:alloc_workqueue() -> __alloc_workqueue_key() -> alloc_and_link_pwqs()。
queue_work()
將work壓入到workqueue當中。
kernel/workqueue.c: queue_work() -> queue_work_on() -> __queue_work()
static void __queue_work(int cpu, struct workqueue_struct *wq,struct work_struct *work)
{struct pool_workqueue *pwq;struct worker_pool *last_pool;struct list_head *worklist;unsigned int work_flags;unsigned int req_cpu = cpu;/** While a work item is PENDING && off queue, a task trying to* steal the PENDING will busy-loop waiting for it to either get* queued or lose PENDING. Grabbing PENDING and queueing should* happen with IRQ disabled.*/WARN_ON_ONCE(!irqs_disabled());debug_work_activate(work);/* if draining, only works from the same workqueue are allowed */if (unlikely(wq->flags & __WQ_DRAINING) &&WARN_ON_ONCE(!is_chained_work(wq)))return;
retry:// (1) 如果沒有指定 cpu,則使用當前 cpuif (req_cpu == WORK_CPU_UNBOUND)cpu = raw_smp_processor_id();/* pwq which will be used unless @work is executing elsewhere */if (!(wq->flags & WQ_UNBOUND))// (2) 對于 normal wq,使用當前 cpu 對應的 normal worker_poolpwq = per_cpu_ptr(wq->cpu_pwqs, cpu);else// (3) 對于 unbound wq,使用當前 cpu 對應 node 的 worker_poolpwq = unbound_pwq_by_node(wq, cpu_to_node(cpu));// (4) 如果 work 在其他 worker 上正在被執行,把 work 壓到對應的 worker 上去// 避免 work 出現重入的問題/** If @work was previously on a different pool, it might still be* running there, in which case the work needs to be queued on that* pool to guarantee non-reentrancy.*/last_pool = get_work_pool(work);if (last_pool && last_pool != pwq->pool) {struct worker *worker;spin_lock(&last_pool->lock);worker = find_worker_executing_work(last_pool, work);if (worker && worker->current_pwq->wq == wq) {pwq = worker->current_pwq;} else {/* meh... not running there, queue here */spin_unlock(&last_pool->lock);spin_lock(&pwq->pool->lock);}} else {spin_lock(&pwq->pool->lock);}/** pwq is determined and locked. For unbound pools, we could have* raced with pwq release and it could already be dead. If its* refcnt is zero, repeat pwq selection. Note that pwqs never die* without another pwq replacing it in the numa_pwq_tbl or while* work items are executing on it, so the retrying is guaranteed to* make forward-progress.*/if (unlikely(!pwq->refcnt)) {if (wq->flags & WQ_UNBOUND) {spin_unlock(&pwq->pool->lock);cpu_relax();goto retry;}/* oops */WARN_ONCE(true, "workqueue: per-cpu pwq for %s on cpu%d has 0 refcnt",wq->name, cpu);}/* pwq determined, queue */trace_workqueue_queue_work(req_cpu, pwq, work);if (WARN_ON(!list_empty(&work->entry))) {spin_unlock(&pwq->pool->lock);return;}pwq->nr_in_flight[pwq->work_color]++;work_flags = work_color_to_flags(pwq->work_color);// (5) 如果還沒有達到 max_active,將 work 掛載到 pool->worklistif (likely(pwq->nr_active < pwq->max_active)) {trace_workqueue_activate_work(work);pwq->nr_active++;worklist = &pwq->pool->worklist;// 否則,將 work 掛載到臨時隊列 pwq->delayed_works} else {work_flags |= WORK_STRUCT_DELAYED;worklist = &pwq->delayed_works;}// (6) 將 work 壓入 worklist 當中insert_work(pwq, work, worklist, work_flags);spin_unlock(&pwq->pool->lock);
}
flush_work()
flush某個work,確保work執行完成。
怎么判斷異步的work已經執行完成?這里面使用了一個技巧:在目標work后面插入一個新的work wq_barrier,如果wq_barrier執行完成,那么目標work肯定已經執行完成。
kernel/workqueue.c: queue_work() -> queue_work_on() -> __queue_work()
/*** flush_work - wait for a work to finish executing the last queueing instance* @work: the work to flush** Wait until @work has finished execution. @work is guaranteed to be idle* on return if it hasn't been requeued since flush started.** Return:* %true if flush_work() waited for the work to finish execution,* %false if it was already idle.*/
bool flush_work(struct work_struct *work)
{struct wq_barrier barr;lock_map_acquire(&work->lockdep_map);lock_map_release(&work->lockdep_map);if (start_flush_work(work, &barr)) {// 等待 barr work 執行完成的信號wait_for_completion(&barr.done);destroy_work_on_stack(&barr.work);return true;} else {return false;}
}
| →
static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr)
{struct worker *worker = NULL;struct worker_pool *pool;struct pool_workqueue *pwq;might_sleep();// (1) 如果 work 所在 worker_pool 為 NULL,說明 work 已經執行完local_irq_disable();pool = get_work_pool(work);if (!pool) {local_irq_enable();return false;}spin_lock(&pool->lock);/* see the comment in try_to_grab_pending() with the same code */pwq = get_work_pwq(work);if (pwq) {// (2) 如果 work 所在 pwq 指向的 worker_pool 不等于上一步得到的 worker_pool,說明 work 已經執行完if (unlikely(pwq->pool != pool))goto already_gone;} else {// (3) 如果 work 所在 pwq 為 NULL,并且也沒有在當前執行的 work 中,說明 work 已經執行完worker = find_worker_executing_work(pool, work);if (!worker)goto already_gone;pwq = worker->current_pwq;}// (4) 如果 work 沒有執行完,向 work 的后面插入 barr workinsert_wq_barrier(pwq, barr, work, worker);spin_unlock_irq(&pool->lock);/** If @max_active is 1 or rescuer is in use, flushing another work* item on the same workqueue may lead to deadlock. Make sure the* flusher is not running on the same workqueue by verifying write* access.*/if (pwq->wq->saved_max_active == 1 || pwq->wq->rescuer)lock_map_acquire(&pwq->wq->lockdep_map);elselock_map_acquire_read(&pwq->wq->lockdep_map);lock_map_release(&pwq->wq->lockdep_map);return true;
already_gone:spin_unlock_irq(&pool->lock);return false;
}
|| →
static void insert_wq_barrier(struct pool_workqueue *pwq,struct wq_barrier *barr,struct work_struct *target, struct worker *worker)
{struct list_head *head;unsigned int linked = 0;/** debugobject calls are safe here even with pool->lock locked* as we know for sure that this will not trigger any of the* checks and call back into the fixup functions where we* might deadlock.*/// (4.1) barr work 的執行函數 wq_barrier_func()INIT_WORK_ONSTACK(&barr->work, wq_barrier_func);__set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&barr->work));init_completion(&barr->done);/** If @target is currently being executed, schedule the* barrier to the worker; otherwise, put it after @target.*/// (4.2) 如果 work 當前在 worker 中執行,則 barr work 插入 scheduled 隊列if (worker)head = worker->scheduled.next;// 否則,則 barr work 插入正常的 worklist 隊列中,插入位置在目標 work 后面// 并且置上 WORK_STRUCT_LINKED 標志else {unsigned long *bits = work_data_bits(target);head = target->entry.next;/* there can already be other linked works, inherit and set */linked = *bits & WORK_STRUCT_LINKED;__set_bit(WORK_STRUCT_LINKED_BIT, bits);}debug_work_activate(&barr->work);insert_work(pwq, &barr->work, head,work_color_to_flags(WORK_NO_COLOR) | linked);
}
||| →
static void wq_barrier_func(struct work_struct *work)
{struct wq_barrier *barr = container_of(work, struct wq_barrier, work);// (4.1.1) barr work 執行完成,發出 complete 信號。complete(&barr->done);
}
Workqueue 對外接口函數
CMWQ 實現的 workqueue 機制,被包裝成相應的對外接口函數。
schedule_work()
把work壓入系統默認wq system_wq,WORK_CPU_UNBOUND指定worker為當前CPU綁定的normal work_pool創建的worker。
kernel/workqueue.c: schedule_work() -> queue_work_on() -> __queue_work()
static inline bool schedule_work(struct work_struct *work)
{return queue_work(system_wq, work);
}
| →
static inline bool queue_work(struct workqueue_struct *wq,struct work_struct *work)
{return queue_work_on(WORK_CPU_UNBOUND, wq, work);
}
schedule_work_on()?
在schedule_work()基礎上,可以指定work運行的CPU。
kernel/workqueue.c: schedule_work_on() -> queue_work_on() -> __queue_work()
static inline bool schedule_work_on(int cpu, struct work_struct *work)
{return queue_work_on(cpu, system_wq, work);
}
schedule_delayed_work()
啟動一個timer,在timer定時到了以后調用delayed_work_timer_fn()把work壓入系統默認wq system_wq。
kernel/workqueue.c: schedule_work_on() -> queue_work_on() -> __queue_work()
static inline bool schedule_delayed_work(struct delayed_work *dwork,unsigned long delay)
{return queue_delayed_work(system_wq, dwork, delay);
}
| →
static inline bool queue_delayed_work(struct workqueue_struct *wq,struct delayed_work *dwork,unsigned long delay)
{return queue_delayed_work_on(WORK_CPU_UNBOUND, wq, dwork, delay);
}
|| →
bool queue_delayed_work_on(int cpu, struct workqueue_struct *wq,struct delayed_work *dwork, unsigned long delay)
{struct work_struct *work = &dwork->work;bool ret = false;unsigned long flags;/* read the comment in __queue_work() */local_irq_save(flags);if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {__queue_delayed_work(cpu, wq, dwork, delay);ret = true;}local_irq_restore(flags);return ret;
}
||| →
static void __queue_delayed_work(int cpu, struct workqueue_struct *wq,struct delayed_work *dwork, unsigned long delay)
{struct timer_list *timer = &dwork->timer;struct work_struct *work = &dwork->work;WARN_ON_ONCE(timer->function != delayed_work_timer_fn ||timer->data != (unsigned long)dwork);WARN_ON_ONCE(timer_pending(timer));WARN_ON_ONCE(!list_empty(&work->entry));/** If @delay is 0, queue @dwork->work immediately. This is for* both optimization and correctness. The earliest @timer can* expire is on the closest next tick and delayed_work users depend* on that there's no such delay when @delay is 0.*/if (!delay) {__queue_work(cpu, wq, &dwork->work);return;}timer_stats_timer_set_start_info(&dwork->timer);dwork->wq = wq;dwork->cpu = cpu;timer->expires = jiffies + delay;if (unlikely(cpu != WORK_CPU_UNBOUND))add_timer_on(timer, cpu);elseadd_timer(timer);
}
|||| →
void delayed_work_timer_fn(unsigned long __data)
{struct delayed_work *dwork = (struct delayed_work *)__data;/* should have been called from irqsafe timer with irq already off */__queue_work(dwork->cpu, dwork->wq, &dwork->work);
}