MIT6.S081-lab7前置
這部分包含了設備中斷和鎖的內容
設備中斷
之前系統調用的時候提過 usertrap ,而我們的設備中斷,比如計時器中斷也會在這里執行,我們可以看看具體的邏輯:
void
usertrap(void)
{int which_dev = 0;if((r_sstatus() & SSTATUS_SPP) != 0)...} else if((which_dev = devintr()) != 0){// ok} else if((r_scause() == 15 || r_scause() == 13) && iscowpage(p->pagetable, r_stval())){...} else {...}...// give up the CPU if this is a timer interrupt.if(which_dev == 2)yield();usertrapret();
}
我們可以發現,中間會為 which_dev 賦值,而當 which_dev 等于 2 的時候,我們會進入 yield 進行調度,而這里,就是我們實現時間片調度的關鍵地方,并且再devintr里面,如果識別為時鐘中斷,會將計時器自增,保證計時正確。
看了手冊感覺云里霧里的,直接開讀代碼!
kernel/kernelvec.S, kernel/plic.c, kernel/console.c, kernel/uart.c, kernel/printf.c
雖然給了鏈接,但是最好是在本地讀源碼。
kernel/console.c
先從簡單的入手,根據手冊我們可以知道, console.c 就是處理鍵盤和顯示器的模塊,下面開始讀源碼,我們會發現,有很多亂七八糟的函數,我們這一節需要學習一下中斷,所以我們主要看看 void consoleintr(int c)
的調用即可,查看其引用,我們會發現,他最終會被 devintr 調用,當然,是在識別中斷信號是由鍵盤發出的才會真的進行調用。
到這里,我們就可以將我們之前的知識串聯起來,我們在輸入字符的時候,會真正的產生鍵盤中斷,并輸入到顯示屏上面。
按照順序,先看看 void consoleintr(int c)
的調用者干了些什么
// 處理 UART 中斷,該中斷可能由于有輸入到達、
// 或 UART 準備好發送更多輸出,或兩者同時發生。
// 此函數由 devintr() 調用。
void
uartintr(void)
{// 循環讀取并處理輸入的字符。while(1){int c = uartgetc(); // 從 UART 讀取一個字符if(c == -1) // 沒有更多輸入時返回 -1break;consoleintr(c); // 將字符傳給 console 進行處理}acquire(&uart_tx_lock); // 加鎖,防止并發修改發送緩沖區uartstart(); // 啟動 UART 輸出release(&uart_tx_lock);
}
uartintr 實際上是直接有 devintr 調用的,他上層的調用我們已經很清楚了,所以不需要再看上層干了什么,我們專注于下層的實現。我們可以看見,uartintr 循環從 uart 里面讀取字符,然后傳給我們剛剛提到的 consoleintr 處理,最終發送給緩沖區,等待輸出,我們可以進一步看看如何從 UART 中讀取字符:
// 從 UART 讀取一個輸入字符。
// 如果沒有可讀的字符,則返回 -1。
int
uartgetc(void)
{// 檢查接收狀態寄存器(LSR)的最低位是否為 1,// 表示是否有輸入數據準備好。if(ReadReg(LSR) & 0x01){// 如果有輸入數據,讀取接收保持寄存器(RHR)中的字符返回。return ReadReg(RHR);} else {// 否則返回 -1,表示沒有數據可以讀取。return -1;}
}
ReadReg 就不再繼續深入了,他這里實現的實際上就是從寄存器 RHR 讀取一個字符,并且每次讀取都會以一種類似隊列的模式踢出讀取的字符,把下一個字符放到 RHR 中,如果我們看他的源碼,會發現沒有這一部分的邏輯,這是由硬件實現的。
回到上一層,我們可以看見我們調用了 consoleintr 去處理這一個字符,但是我們還沒有去看過他的源碼,這里就去看一下~
void
consoleintr(int c)
{acquire(&cons.lock); // 獲取鎖,防止并發修改 cons 結構體。switch(c){case C('P'): // Ctrl+P:打印當前進程列表。procdump();break;case C('U'): // Ctrl+U:清除當前輸入行(kill line)。// cons.e 指的是當前寫入位置的下標索引。while(cons.e != cons.w &&cons.buf[(cons.e-1) % INPUT_BUF_SIZE] != '\n'){cons.e--; // 索引--consputc(BACKSPACE); // 刪除并移動光標}break;case C('H'): // Ctrl+H,表示退格(Backspace)case '\x7f': // Delete 鍵的 ASCII(127),功能與退格相同if(cons.e != cons.w){cons.e--; // 刪除一個字符consputc(BACKSPACE); // 回顯退格符}break;default:// 只在字符非 0 且緩沖區未滿時處理if(c != 0 && cons.e-cons.r < INPUT_BUF_SIZE){// 將回車符轉為換行符,統一處理c = (c == '\r') ? '\n' : c;consputc(c); // 將字符回顯到控制臺// 將字符寫入緩沖區 cons.buf 中cons.buf[cons.e++ % INPUT_BUF_SIZE] = c;// 如果收到換行、Ctrl+D(EOF),或緩沖區已滿,則喚醒 consolereadif(c == '\n' || c == C('D') || cons.e-cons.r == INPUT_BUF_SIZE){cons.w = cons.e; // 更新寫指針,表示有一行可讀wakeup(&cons.r); // 喚醒等待讀取的 consoleread()}}break;}release(&cons.lock); // 釋放鎖
}
我們發現,有很多快捷鍵可以幫助我們識別一些指令,比如清空行,打印進程,我們可以去試試,這些確實是可以執行的!
當 UART 接收到一個字符時,會調用這個函數處理輸入。主要功能是處理用戶的輸入字符,包括特殊控制字符(如 Ctrl+U 清空行、Ctrl+P 打印進程),并將處理后的字符追加到控制臺緩沖區 cons.buf 中。如果接收到換行符(表示一行輸入完成),則喚醒等待輸入的 consoleread()。
我們大體梳理了一下這個函數的邏輯,我們可以仔細看看其中用到的函數,先看看第一個 procdump :
void
procdump(void)
{// 進程狀態字符串表,對應 enum procstate 的枚舉值。static char *states[] = {[UNUSED] "unused", // 未使用[USED] "used", // 已分配但未就緒[SLEEPING] "sleep ", // 睡眠中[RUNNABLE] "runble", // 可運行[RUNNING] "run ", // 正在運行[ZOMBIE] "zombie" // 僵尸態(已退出但未被回收)};struct proc *p;char *state;printf("\n"); // 輸出一個空行用于分隔// 遍歷所有進程表項for(p = proc; p < &proc[NPROC]; p++){if(p->state == UNUSED) // 跳過未使用的進程項continue;// 獲取進程的狀態對應的字符串if(p->state >= 0 && p->state < NELEM(states) && states[p->state])state = states[p->state];elsestate = "???"; // 狀態非法或未知// 打印進程的 pid、狀態和進程名printf("%d %s %s", p->pid, state, p->name);printf("\n");}
}
這里就是執行了打印進程的邏輯,不是重點,next。
而當我們在鍵盤上按 Ctrl + U 的時候,則會清空我們當前輸入的行,我們可以看看這個函數 consputc :
//
// consputc - 向 UART 發送一個字符。
// 該函數由 printf() 調用,也用于回顯輸入字符;但不會由 write() 調用。
//
void
consputc(int c)
{if(c == BACKSPACE){// 如果用戶輸入了退格(Backspace),用空格覆蓋原字符。uartputc_sync('\b'); // 發送退格符(移動光標到前一個位置)uartputc_sync(' '); // 發送空格覆蓋原字符uartputc_sync('\b'); // 發送退格符,將光標移回原位置} else {// 向 UART 發送普通字符uartputc_sync(c);}
}
我們可以看見,這個函數可以移動我們的光標,以及向 UART 發送我們的字符。在 uartputc_sync 中,最終會執行我們的 WriteReg 來向指定的寄存器寫入字符。之后,就由硬件來完成字符的顯示工作,這樣,就完成了一個字符的清空退格和顯示。
事實上,輸入的各種字符最終都會通過這個函數來顯示出來。
而在輸入換行,或者超出緩沖區的時候,會調用wakeup觸發調度,來調用 consoleread 從而讀取控制臺的信息,但是為什么會知道wakeup就會調用這個 consoleread ?:
//
// 用戶態的 read() 系統調用讀取控制臺時,會調用這里。
// 作用是:從 cons.buf 中拷貝一整行(或者盡可能多的輸入)到 dst 指向的內存。
// user_dst 參數標記 dst 是用戶空間地址還是內核空間地址。
//
int
consoleread(int user_dst, uint64 dst, int n)
{uint target;int c;char cbuf;target = n; // 保存最初要求讀取的字節數acquire(&cons.lock); // 加鎖,保護 cons 共享數據while(n > 0){// 如果當前沒有可讀的數據,進入睡眠,等待輸入。while(cons.r == cons.w){if(killed(myproc())){// 如果當前進程已經被殺死,直接返回錯誤。release(&cons.lock);return -1;}// 這里傳入了一個 cons.r 的參數,這樣我們就可以正確喚醒了!sleep(&cons.r, &cons.lock); // 沒數據可讀時掛起等待喚醒}// 從輸入緩沖區讀取一個字符c = cons.buf[cons.r++ % INPUT_BUF_SIZE];if(c == C('D')){ // 遇到 Ctrl+D,表示文件結束(EOF)if(n < target){// 如果已經讀入了一些數據,把 ^D 留在緩沖區,等下次再處理cons.r--;}break; // 退出讀取循環}// 把讀到的一個字節拷貝到用戶空間(或內核空間)dst指向的地方cbuf = c;// 典中典之 copyoutif(either_copyout(user_dst, dst, &cbuf, 1) == -1)break; // 如果拷貝失敗,直接結束讀取dst++; // dst 指針后移--n; // 剩余要讀取的字節數減一if(c == '\n'){// 如果讀到了換行符,說明一整行已經讀完了break;}}release(&cons.lock); // 釋放鎖return target - n; // 返回實際讀到的字節數
}
那么問題來了,我們在通過 wakeup 是進入到了哪一個進程?哪一個進程為我們準備好了 read ?很簡單,我們知道,我們所有的輸入基本都是在 shell 里面執行的,所以我們可以回到 user/sh.c 中,去查看,很輕松可以找到,里面的調用鏈路:
main -> getcmd -> gets -> read
以此來讀取我們輸入的字符,而我們在shell里面等待的時候,這個read是處于睡眠的狀態,通過之前提過的快捷鍵可以知道我們目前運行的進程:
$
1 sleep init
2 sleep sh
sh處于睡眠,而一旦我們敲擊鍵盤,輸入字符的那一瞬間,會產生中斷,并且將字符輸入緩沖區,一旦我們輸入換行符,或者緩沖區溢出了,就會喚醒 consoleread 來讀取我們的緩沖區(控制臺)數據,隨后,通過shell去識別這行內容,并執行相關程序。這樣,就形成了一次輸入的閉環。
user/printf.c
回到另一個關于屏幕顯示的函數,printf,它實際上沒有很復雜,除了一堆字符串判斷邏輯之外,就僅僅是系統調用 write 一個字符到屏幕上,忽略獲取參數的部分,我們可以進入 write 這個系統調用里面詳細看看他干了什么:
// 向文件 f 寫數據。
// addr 是用戶空間的虛擬地址。
int
filewrite(struct file *f, uint64 addr, int n)
{int r, ret = 0;// 如果文件不可寫,直接返回錯誤。if(f->writable == 0)return -1;if(f->type == FD_PIPE){// 如果是管道類型文件,調用管道的寫操作。ret = pipewrite(f->pipe, addr, n);} else if(f->type == FD_DEVICE){// 如果是設備文件,調用對應設備的寫操作。if(f->major < 0 || f->major >= NDEV || !devsw[f->major].write)return -1;ret = devsw[f->major].write(1, addr, n); // 調用設備的 write 函數//這里是我們的重點} else if(f->type == FD_INODE){// 如果是普通文件(inode文件),進行文件系統寫操作。// 為了避免超出日志的最大事務大小(MAXOPBLOCKS),// 每次最多只寫 max 字節。int max = ((MAXOPBLOCKS-1-1-2) / 2) * BSIZE;int i = 0;while(i < n){int n1 = n - i;if(n1 > max)n1 = max;begin_op(); // 開始一個文件系統事務ilock(f->ip); // 加鎖 i-nodeif ((r = writei(f->ip, 1, addr + i, f->off, n1)) > 0)f->off += r; // 更新文件偏移量iunlock(f->ip); // 解鎖 i-nodeend_op(); // 結束事務if(r != n1){// 如果實際寫入的字節數與預期不同,說明出錯了,停止寫入。break;}i += r; // 寫入成功,繼續寫下一部分}ret = (i == n ? n : -1); // 如果全部寫完返回寫入的字節數,否則返回錯誤} else {// 如果文件類型不是以上幾種,說明出錯,觸發 panic。panic("filewrite");}return ret;
}
姑且給出全部注釋,但是對于本章無足輕重的部分就不講解了,重點看看我們的 ret = devsw[f->major].write(1, addr, n);
的調用,它是什么意思?什么時候會進入到他的里面?
在進入這里之前,我們會判斷傳入的文件描述符指向的文件是一個設備,并且存在對應的 write 函數,這樣,我們可以專門的去訪問這個write,比如說我們的 console 設備就會提前注冊一個 write 的函數,這個函數會將對應的字符 write 到指定的文件中,在這里,就是終端,我們注冊函數是這個:
int consolewrite(int user_src, uint64 src, int n)
{int i;// 遍歷需要寫入的字節數。for(i = 0; i < n; i++){char c;// 從用戶空間復制一個字節到變量 c 中。// either_copyin() 用于檢查地址有效性,并將數據從用戶空間復制到內核空間。if(either_copyin(&c, user_src, src+i, 1) == -1)break; // 如果復制失敗,則退出,返回已寫入的字節數。// 調用 uartputc() 將字符 c 發送到 UART,用于輸出到控制臺。uartputc(c);}// 返回實際寫入的字節數。return i;
}
我們的 uartputc 會將字符存入 UART 的緩沖區,在之后,會由硬件輸出到控制臺:
// 將字符添加到 UART 發送緩沖區,并在緩沖區未滿時啟動 UART 傳輸。
// 如果緩沖區已滿,則會阻塞直到有空間可用。
// 該函數不適合在中斷中調用,適合用在寫操作中。
// 用于將字符通過 UART 輸出到控制臺。
void uartputc(int c) {// 獲取 UART 發送緩沖區的鎖,確保線程安全。acquire(&uart_tx_lock);// 如果系統處于 panic 狀態,進入死循環,阻止繼續執行。if (panicked) {for (;;) ; // 系統處于 panic 狀態,不再繼續執行。}// 如果 UART 發送緩沖區滿,等待直到有空間可用。while (uart_tx_w == uart_tx_r + UART_TX_BUF_SIZE) {// 緩沖區已滿,等待 uartstart() 函數發送字符并騰出空間。sleep(&uart_tx_r, &uart_tx_lock);}// 將字符 c 放入 UART 發送緩沖區。uart_tx_buf[uart_tx_w % UART_TX_BUF_SIZE] = c;uart_tx_w += 1; // 更新寫指針。// 調用 uartstart() 啟動 UART 傳輸,開始將緩沖區中的字符發送出去。uartstart();// 釋放 UART 發送緩沖區的鎖。release(&uart_tx_lock);
}
總結一下,最終的 write 會調用 uartputc 這個函數,然后這里就會向終端的緩沖區 uart_tx_buf 寫入數據,如果緩沖區滿了,就會休眠等待,中途如果被喚醒,寫入數據,然后就會調用 uartstart 向終端顯示數據了,為什么會有緩沖區,為什么需要睡眠?操作系統是個天生的多線程并發運行的,應用程序可能一口氣產生大量數據,但 UART 硬件發送速度慢,僅有一個進程可能感受不到什么,但是如果進程很多,緩沖區可能在一段時間非常龐大,占用了大量的內存就不可想象了!所以這里陷入睡眠等待是比較好的選擇。
這里的具體的邏輯其實和之前的鍵盤中斷是類似的,而之前 console.c 的時候,也提到過 uartstart,但是沒有分析,所以在這里分析一下他干了什么:
// 如果 UART 處于空閑狀態,并且傳輸緩沖區中有字符等待發送,則發送字符。
// 調用者必須持有 uart_tx_lock。
// 該函數同時可以從中斷的頂部(中斷處理程序)和底部(常規代碼)調用。
void
uartstart()
{while(1){// 如果傳輸緩沖區為空,表示沒有數據等待發送。if(uart_tx_w == uart_tx_r){ReadReg(ISR); // 讀取中斷狀態寄存器,清除相關中斷標志。return; // 返回,不需要發送任何數據。}// 如果 UART 的傳輸保持寄存器未空閑,表示無法再發送數據。if((ReadReg(LSR) & LSR_TX_IDLE) == 0){// 傳輸保持寄存器已滿,無法發送新的字節。// 當寄存器空閑時會觸發中斷,我們就能繼續發送下一個字節。return;}// 從緩沖區中取出一個待發送的字符。int c = uart_tx_buf[uart_tx_r % UART_TX_BUF_SIZE];uart_tx_r += 1; // 更新緩沖區的讀取指針。// 可能有調用 uartputc() 的地方在等待緩沖區空間的釋放,喚醒它。wakeup(&uart_tx_r);// 向 UART 傳輸保持寄存器寫入字符。WriteReg(THR, c);}
}
這里我其實沒太理解,因為在 consoleintr 里面已經 Write 了一遍,為什么這里還需要再執行一遍?在這里先保留一下這個問題吧。
回來解決這個疑惑,我們在 consoleintr 會將字符顯示到屏幕上,這部分講義其實也沒講清楚,我們可以回歸手冊來深入理解這部分,我們 consoleintr 會將數據顯示到屏幕上并寫入到 input 緩沖區,也會對一些特殊鍵進行處理,當然,如果我們遇到換行符,就會將控制權交給 consoleread 他則是我們的 read 系統調用,此時,會將這一行內容復制到用戶空間。
同時,為什么會有 uartstart 這個東西?實際上,在這里是排不上用場的,因為此時已經把數據給讀取完了,那么什么時候會派上用場?當我們執行 write 系統調用的時候,最終會到達 uartputc ,就跟我們之前介紹 printf 提過的, 會存在一個 uart_tx_buf 緩沖區,這個緩沖區可以實現異步的寫入和讀取,而我們的 uartstart 恰恰就會去讀取這一串緩沖區,并寫入到指定的寄存器上,而當我們的 UART 發送完一個字節,就會產生一個中斷,從而再次調用這個 uartstart ,來檢查是否真的完成了發送,并且將下一個緩沖的輸出字符交給寄存器,這樣就實現了解耦。
kernel/plic.c
這玩意是啥?有啥用?
舉個例子,如果同時有多個中斷程序需要我們去處理,比如鍵盤中斷,磁盤 I/O ,我們就需要去設定一個優先級,并且能夠識別具有最優優先級的中斷程序,從而去處理,實現這一組的代碼,則是我們的 plic.c , 在操作系統初始化的時候,會調用:
void
plicinit(void)
{// 設置外設中斷(IRQ)的優先級,使能中斷。// 如果優先級為 0,表示禁用;非零表示啟用并設定優先級。// 將 UART0 的中斷優先級設置為 1(使能 UART0 中斷)*(uint32*)(PLIC + UART0_IRQ*4) = 1;// 將 VIRTIO0(虛擬磁盤設備)的中斷優先級設置為 1(使能 VIRTIO0 中斷)*(uint32*)(PLIC + VIRTIO0_IRQ*4) = 1;
}
這里都將優先級設置為 1 了,這兩個都能夠被判斷了,但是兩者優先級上卻沒什么區別,這里就會用到調度策略(Ai說的)
同時,我們可以在 devintr 里面發現, plic_claim
這個函數,它可以幫助我們獲取當前優先級最高的中斷:
int
plic_claim(void)
{int hart = cpuid();int irq = *(uint32*)PLIC_SCLAIM(hart); // 從 PLIC 中讀取當前需要處理的中斷號return irq;
}
很簡單,這里的 PLIC_SCLAIM
是直接從內存中獲取數據,不再贅述,簡單來說,就是在內存中的指定位置設定優先級之后,硬件會幫你判斷。
隨后,我們就可以根據返回的中斷號來執行指定的中斷程序了。
除此之外, kernelvec 和 uservec 差不多,就不多介紹了。
lecture 部分
中斷和系統調用很相似,雖然都使用了相同的保存狀態,恢復狀態的機制,并且在一個地方處理,但是他們并不一樣,其實大部分內容都已經在源碼閱讀那一塊講完了。。哈哈。但是依舊有一點是值得提及的:
并發,雖然現在還沒有到并發的部分,但是這里依舊是涉及到一定的并發的,比如說我們的設備和 cpu :當 UART 向 Console發送字符的時候, CPU 會被喚醒,返回執行 shell ,而他可能會執行再執行一次系統調用,向 buffer 中寫入一個字符,而這是在并行的執行。
這樣的并發叫做 Consumer/Producer 并發,在 buffer 的例子里面,我們的 producer 可以一直寫入數據, 使得寫指針 + 1 ,而 buffer 滿的時候, 寫指針則需要停下,我們的 uartintr 就會從讀指針中讀取一個字符,再通過 UART 發送,令讀指針 + 1 ,如果讀指針追上了寫指針,則說明 buffer 為空了。而這段 buffer 在內存中僅此一份。
另外隨便提一下,過去的中斷處理還是很快的,原因是過去的計算機很簡單,中斷處理也不復雜,現在設備變得很復雜,處理器需要干的事情變得更多了,就慢了,所以產生中斷之前,設備就會做大量的操作,減輕 cpu 的負擔。
同時,如果有一個高性能設備,如網卡,不斷地接收到大量的包,產生的中斷就會很多,這里的解決辦法就是使用輪詢,但是這里的輪詢僅僅是設備 I/O 輪詢,但是對于中斷觸發少的設備,我們依舊采取中斷的方式,這樣,對于中斷頻繁的設備,我們 cpu 主動去檢查是否有數據到來,這樣,來提高性能。
鎖
總算到鎖了。。老樣子,先讀源碼
kernel/spinlock.h
// 自旋鎖的結構體
struct spinlock {uint locked; // 是否被持有char *name; // 鎖名struct cpu *cpu; // 由哪一個cpu持有
};
我們發現,xv6里面,自旋鎖的結構體還是挺簡單的,對于看過go的協程調度的我來說感覺有點輕松了,哈哈。
kernel/spinlock.c
我們常常看見,我們會初始化鎖,獲取鎖,釋放鎖,這些其實對我們來說已經不陌生了,索性一次性讀完😎
void initlock(struct spinlock *lk, char *name)
{lk->name = name;lk->locked = 0; // 初始狀態未被加鎖lk->cpu = 0; // 沒有任何CPU持有這個鎖
}void acquire(struct spinlock *lk)
{push_off(); // 關中斷,防止在持鎖期間被打斷導致死鎖if (holding(lk)) // 如果當前CPU已經持有這個鎖,出錯panic("acquire");// 使用原子指令嘗試加鎖// RISC-V上編譯成 amoswap.w.aq 原子交換指令while (__sync_lock_test_and_set(&lk->locked, 1) != 0); // 如果鎖已經被別人持有,就自旋等待__sync_synchronize(); // 內存屏障,禁止編譯器/CPU對臨界區內存訪問重排lk->cpu = mycpu(); // 記錄是哪個CPU持有了這把鎖
}// 釋放鎖
void release(struct spinlock *lk)
{if (!holding(lk)) // 如果當前CPU沒有持有鎖,卻想釋放,出錯panic("release");lk->cpu = 0; // 清空持鎖CPU信息__sync_synchronize(); // 內存屏障,確保臨界區修改的內存對其他CPU可見// 使用原子指令釋放鎖,相當于 lk->locked = 0__sync_lock_release(&lk->locked);pop_off(); // 恢復中斷狀態
}// 檢查當前CPU是否持有這把鎖
// 要求調用前已經關閉了中斷
int holding(struct spinlock *lk)
{int r;r = (lk->locked && lk->cpu == mycpu());return r;
}
在我們 acquire 獲取鎖的時候,其實就是原子操作 + while 循環的自旋,拋開性能不談,這也算一個完整的自旋鎖了。而 release 也是一樣,通過原子指令釋放鎖。但是仍有一點值得注意,無論是獲取鎖之后,我們都需要關中斷來防止被時間片或者其他中斷停止,這也是很重要的一點,防止當前 cpu 被調度而執行其他進程,可以說是我們加鎖的根本目的之一,而原子操作則是為了保證只有一個 cpu 可以獲取這把鎖,進而執行相關的代碼。
另外,這部分還涉及到我們的開中斷和關中斷的代碼:
// 關閉中斷,并記錄當前中斷狀態,支持嵌套關閉
void push_off(void)
{int old = intr_get(); // 保存當前中斷狀態(開啟 or 關閉)intr_off(); // 立即關閉中斷if (mycpu()->noff == 0) // 如果這是第一次調用 push_offmycpu()->intena = old; // 記錄第一次調用時中斷是否是打開的mycpu()->noff += 1; // 關閉中斷計數器加 1
}// 恢復中斷,支持多層嵌套的 pop_off
void pop_off(void)
{struct cpu *c = mycpu();if (intr_get()) // 檢查當前中斷不能是開啟狀態panic("pop_off - interruptible"); // 違反規則,panicif (c->noff < 1) // 檢查是否有多余的 pop_off 調用panic("pop_off");c->noff -= 1; // 關閉計數器減 1// 如果 noff 歸零,且第一次 push_off 時中斷是開的,那么恢復中斷if (c->noff == 0 && c->intena)intr_on();
}
這部分,感覺理解了就行。
xv6 手冊
鎖
為啥需要鎖?很簡單,幻想一下多個線程同時操作一個鏈表就可以了,我們的鏈表插入在 C 語言中往往是兩步:
- 更新需要插入的節點的 Next 指針指向原本鏈表的頭節點
- 更新鏈表頭節點指向當前節點。
如果引入了并發,事情將不可估計,如果我們同時更新了需要插入節點的 Next 指針,那么更新原本鏈表頭節點的時候將會發生混亂,因為其中一個節點的 Next 指針是舊的!這會為我們的并發帶來不確定性。而解決這種不確定性的方法就是加鎖。
而在我們的 acquire 和 release 之間的區域,叫做臨界區域,也就是鎖保護的地方,會造成不確定性的地方。
對于鎖,我們可以盡量去降低它的粒度來提高性能,比如說對于所有的文件不應該用一把大鎖去保證其原子性,而是應該對每一個文件的讀寫使用單獨的一把鎖,來降低粒度,提高并行性。
死鎖,在 go 中開發經常會出現死鎖的情況,死鎖是啥?有興趣的同學可以了解一下哲學家就餐問題,這就是一個經典的死鎖問題,而死鎖就是在鎖定資源時發生了沖突,比如 A 向 B 轉賬, 我們需要先鎖定 A , 在鎖定 B , 但是如果在同一時刻, 我們的 B 又向 A 發起轉賬, 如果我們的 A 和 B 同時被兩個線程進行鎖定,那么接下來兩個線程都還需要分別對 B 和 A 進行鎖定,此時就會產生沖突,導致資源不會被釋放,兩個線程就卡住了的情況。雖然聽起來很難發生,但是這確實很常見的問題。而很多系統都會自動檢測這樣的死鎖, xv6 當然也可以。
在文件系統里面,鎖的調用鏈很長,所以會很容易造成死鎖,所以一般對某一個地方加鎖,會按照規定的順序來加鎖,但是這樣也會存在很多問題,比如說邏輯順序和鎖的順序不一致等,所以關于死鎖真的是一個很大的問題。
中斷和鎖交織的時候,也會出現一些問題,比如在 sys_sleep 的時候,如果這個 cpu 持有 tickslock ,并且又被計時器中斷了,那么計時器就會嘗試獲取這把鎖,當然,很輕易地就可以知道,這里的中斷永遠不可能獲取 ticklock 的,所以在持有鎖的時候,我們需要關中斷,這里對應了我們上面的代碼講解。
而編譯器有時為了性能,會不按順序執行代碼,如果我們被鎖定的代碼段和鎖的順序被打亂,那也是一個問題,所以我們會使用 __sync_synchronize
來禁止 cpu 對指令進行重排。
Sleep鎖,這種鎖很常見, go 中的自旋鎖其實是 sleeplock 和 spinlock 的結合,具有更好的性能。為什么會有這種鎖?因為我們有時會鎖定資源很長時間,而其他的 spinlock 也會不斷自旋,無法執行其他任務,從而浪費 cpu 資源,而我們的 sleeplock 在無法獲取鎖的時候,就會陷入睡眠,這個cpu允許去執行其他程序,并且允許被中斷喚醒,重新去持有這把鎖,這樣就提高了 cpu 了利用率。
lecture里面感覺沒啥好說的,都是提過的內容,為了寫 lock 實驗,我們還需要去閱讀一下第 8 章的內容。
buffer cache
文件系統并不是我們本次的重點,但是依舊需要提一下,文件系統的目的是組織,存儲數據,支持用戶間的數據共享,保證數據的持久性。而 buffer cache 則是在我們磁盤的外面一層作為緩存的,它是以雙鏈表表示的緩沖區。盡管我們在 main 中是以靜態數組 buf 的形式初始化 NBUF 個緩沖區初始化列表,但是對 buffer cache 的所有其他訪問都通過 bcache.head 引用鏈表,而不是數組。每次想要從硬盤讀取數據的時候,不會直接去讀,而是先去讀取在內存中維護的緩存,我們可以通過代碼來詳細了解一下。
通常,比如 readi 讀取數據的時候,會通過 bread 來獲取對應設備,對應編號的緩存塊,然后從中讀取數據。我們重點看看 bread 的邏輯:
// 返回一個鎖定的緩沖區(buf),其中包含指定設備 dev 和塊號 blockno 的數據。
struct buf*
bread(uint dev, uint blockno)
{struct buf *b;// 獲取對應設備和塊號的緩沖區,返回后緩沖區已經加鎖b = bget(dev, blockno);// 如果緩沖區內容無效(還沒有正確的數據)if(!b->valid) {// 從磁盤讀取對應塊的數據到緩沖區中virtio_disk_rw(b, 0); // 第二個參數0表示讀取操作// 標記緩沖區內容有效b->valid = 1;}// 返回包含數據的、已經加鎖的緩沖區return b;
}
bget 函數返回的緩沖區是加了鎖的,我們可以看看 bget 到底干了什么:
// 在緩沖區緩存中查找指定設備 dev 上的塊 blockno。
// 如果找到,返回對應的緩沖區(已加鎖)。
// 如果沒找到,分配一個空閑緩沖區,并返回(也已加鎖)。
static struct buf*
bget(uint dev, uint blockno)
{struct buf *b;// 加鎖,保護整個緩沖區緩存結構acquire(&bcache.lock);// 第一輪:查找是否已經緩存了目標塊for(b = bcache.head.next; b != &bcache.head; b = b->next){if(b->dev == dev && b->blockno == blockno){// 找到了:增加引用計數b->refcnt++;// 釋放 bcache.lock,因為要改拿緩沖區自己的睡眠鎖release(&bcache.lock);// 加鎖緩沖區,防止其他線程訪問acquiresleep(&b->lock);return b;}}// 第二輪:找一個空閑的緩沖區(最近最少使用的,從鏈表尾部開始)for(b = bcache.head.prev; b != &bcache.head; b = b->prev){if(b->refcnt == 0) { // 找到一個沒人用的// 重新初始化緩沖區元數據b->dev = dev;b->blockno = blockno;b->valid = 0; // 標記為無效,需要重新讀磁盤b->refcnt = 1; // 引用一次release(&bcache.lock);acquiresleep(&b->lock);return b;}}// 如果沒有空閑緩沖區了(說明太多進程占用了緩存)panic("bget: no buffers");
}
在這里我們可以看見,在返回之前都會釋放掉整個緩沖區的鎖,而加上單獨一塊緩沖區的鎖,先從前往后遍歷鏈表,如果發現當前需要讀取的塊已經在緩沖區里面了,那么就可以標記為有效直接返回,如果不在,則需要找到一個空閑的塊,而后我們需要標記為無效并且從磁盤重新讀取數據。
這里僅僅是讀取操作,為什么要加鎖?原因是這里需要從鏈表中獲取緩沖區,如果兩個線程需要讀取相同的塊緩沖區,但是都沒有讀取到,此時就會返回一個空閑的區域來讀取磁盤上的信息作為緩存,但是這很有可能會造成返回了兩個不同的緩沖區,也就是說,一個區塊有兩塊緩存!如果我們分別對這兩塊緩存區進行讀寫操作,那就不符合原子性了!因為一塊緩存單獨持有一把鎖,我們應該把這把鎖對應到文件系統上面的塊。
在進行寫入操作之后,我們還需要調用 bwrite 將更改的數據寫入到磁盤,而在使用完緩沖區之后,我們會調用 brelse 去釋放這塊緩沖區的鎖,并且將它放到鏈表的最前面。
順便,我們可以了解一下磁盤的寫入:
// 向磁盤發起讀/寫請求。
// b 是要操作的緩沖區,write==0表示讀磁盤到內存,write!=0表示寫內存到磁盤。
void
virtio_disk_rw(struct buf *b, int write)
{// 計算要操作的磁盤扇區號(塊號 × 每塊大小/每扇區大小)uint64 sector = b->blockno * (BSIZE / 512);// 獲取磁盤的鎖,保護共享資源acquire(&disk.vdisk_lock);#ifdef LAB_LOCK// (實驗用)檢查緩沖區是否持有正確的鎖checkbuf(b);
#endif// 根據virtio規范:一次塊設備請求需要用3個描述符// 1. 請求頭(讀/寫操作類型、扇區號)// 2. 數據區(讀或者寫的數據)// 3. 結果狀態(1字節,設備填充)int idx[3];while(1){// 申請3個描述符if(alloc3_desc(idx) == 0) {break;}// 申請失敗,等待可用描述符sleep(&disk.free[0], &disk.vdisk_lock);}// 填寫第一個描述符:請求頭struct virtio_blk_req *buf0 = &disk.ops[idx[0]];if(write)buf0->type = VIRTIO_BLK_T_OUT; // 寫磁盤elsebuf0->type = VIRTIO_BLK_T_IN; // 讀磁盤buf0->reserved = 0;buf0->sector = sector; // 設置要讀/寫的扇區號disk.desc[idx[0]].addr = (uint64) buf0; // 地址指向請求頭disk.desc[idx[0]].len = sizeof(struct virtio_blk_req); // 大小是請求頭結構體disk.desc[idx[0]].flags = VRING_DESC_F_NEXT; // 后面還有描述符disk.desc[idx[0]].next = idx[1]; // 指向下一個描述符// 填寫第二個描述符:數據緩沖區disk.desc[idx[1]].addr = (uint64) b->data; // 數據地址(緩沖區)disk.desc[idx[1]].len = BSIZE; // 大小是一個塊if(write)disk.desc[idx[1]].flags = 0; // 寫磁盤時,設備讀數據elsedisk.desc[idx[1]].flags = VRING_DESC_F_WRITE; // 讀磁盤時,設備寫數據disk.desc[idx[1]].flags |= VRING_DESC_F_NEXT; // 還有下一個描述符disk.desc[idx[1]].next = idx[2]; // 指向狀態描述符// 填寫第三個描述符:操作結果狀態disk.info[idx[0]].status = 0xff; // 初始化狀態,設備完成后會寫0disk.desc[idx[2]].addr = (uint64) &disk.info[idx[0]].status;disk.desc[idx[2]].len = 1; // 只需要1字節disk.desc[idx[2]].flags = VRING_DESC_F_WRITE; // 設備寫結果disk.desc[idx[2]].next = 0; // 最后一個描述符了// 記錄緩沖區,供中斷處理函數 virtio_disk_intr() 使用b->disk = 1; // 標記為請求中disk.info[idx[0]].b = b;// 告訴設備可用的描述符鏈的第一個索引disk.avail->ring[disk.avail->idx % NUM] = idx[0];__sync_synchronize(); // 內存屏障,確保寫操作順序// 更新 avail->idx,通知設備新請求已準備好disk.avail->idx += 1; // 不需要取模!__sync_synchronize(); // 再次內存屏障// 寫寄存器通知設備有新請求(queue 0)*R(VIRTIO_MMIO_QUEUE_NOTIFY) = 0;// 等待中斷處理函數把 b->disk 置為0,表示磁盤操作完成while(b->disk == 1) {sleep(b, &disk.vdisk_lock);}// 清理:標記 info 沒有關聯的 bufdisk.info[idx[0]].b = 0;// 釋放三個描述符free_chain(idx[0]);// 解鎖磁盤release(&disk.vdisk_lock);
}
這里看看就行,僅作了解,之后關于文件系統還會詳細介紹,當我們將 type 修改為寫時,隨后進行一系列發送請求,發送信號等一系列操作,我們硬件會被提醒有新的數據到達,然后我們的硬件會去讀取緩沖區的數據,而在這段代碼中,我們會將緩沖區的指針賦給結構體中的變量,這樣硬件就可以識別這段緩沖區并且進行讀取了。