rust高級 異步編程 一 future

文章目錄

      • Async 編程簡介
        • async/.await 簡單入門
      • Future 執行器與任務調度
        • Future 特征
        • 使用 Waker 來喚醒任務
        • 構建一個定時器
        • 執行器 Executor
          • 構建執行器
    • 完整代碼

Async 編程簡介

  • OS 線程, 它最簡單,也無需改變任何編程模型(業務/代碼邏輯),因此非常適合作為語言的原生并發模型,我們在多線程章節也提到過,Rust 就選擇了原生支持線程級的并發編程。但是,這種模型也有缺點,例如線程間的同步將變得更加困難,線程間的上下文切換損耗較大。使用線程池在一定程度上可以提升性能,但是對于 IO 密集的場景來說,線程池還是不夠。
  • 事件驅動(Event driven),如果說事件驅動常常跟回調( Callback )一起使用,相信大家就恍然大悟了。這種模型性能相當的好,但最大的問題就是存在回調地獄的風險:非線性的控制流和結果處理導致了數據流向和錯誤傳播變得難以掌控,還會導致代碼可維護性和可讀性的大幅降低。
  • 協程(Coroutines) Go 語言的協程設計就非常優秀,這也是 Go 語言能夠迅速火遍全球的殺手锏之一。協程跟線程類似,無需改變編程模型,同時,它也跟 async 類似,可以支持大量的任務并發運行。但協程抽象層次過高,導致用戶無法接觸到底層的細節,這對于系統編程語言和自定義異步運行時是難以接受的
  • actor 模型是 erlang 的殺手锏之一,它將所有并發計算分割成一個一個單元,這些單元被稱為 actor , 單元之間通過消息傳遞的方式進行通信和數據傳遞,跟分布式系統的設計理念非常相像。由于 actor 模型跟現實很貼近,因此它相對來說更容易實現,但是一旦遇到流控制、失敗重試等場景時,就會變得不太好用
  • async/await, 該模型性能高,還能支持底層編程,同時又像線程和協程那樣無需過多的改變編程模型,但有得必有失,async 模型的問題就是內部實現機制過于復雜,對于用戶來說,理解和使用起來也沒有線程和協程簡單,好在前者的復雜性開發者們已經幫我們封裝好,而理解和使用起來不夠簡單。

對于長時間運行的 CPU 密集型任務,例如并行計算,使用線程將更有優勢。 這種密集任務往往會讓所在的線程持續運行,任何不必要的線程切換都會帶來性能損耗,因此高并發反而在此時成為了一種多余。同時你所創建的線程數應該等于 CPU 核心數,充分利用 CPU 的并行能力,甚至還可以將線程綁定到 CPU 核心上,進一步減少線程上下文切換。

而高并發更適合 IO 密集型任務,例如 web 服務器、數據庫連接等等網絡服務,因為這些任務絕大部分時間都處于等待狀態,如果使用多線程,那線程大量時間會處于無所事事的狀態,再加上線程上下文切換的高昂代價,讓多線程做 IO 密集任務變成了一件非常奢侈的事。而使用async,既可以有效的降低 CPU 和內存的負擔,又可以讓大量的任務并發的運行,一個任務一旦處于IO或者其他等待(阻塞)狀態,就會被立刻切走并執行另一個任務,而這里的任務切換的性能開銷要遠遠低于使用多線程時的線程上下文切換。

async/.await 簡單入門

async/.await 是 Rust 內置的語言特性,可以讓我們用同步的方式去編寫異步的代碼。
通過 async 標記的語法塊會被轉換成實現了Future特征的狀態機。 與同步調用阻塞當前線程不同,當Future執行并遇到阻塞時,它會讓出當前線程的控制權,這樣其它的Future就可以在該線程中運行,這種方式完全不會導致當前線程的阻塞。
下面我們來通過例子學習 async/.await 關鍵字該如何使用,在開始之前,需要先引入 futures 包。編輯 Cargo.toml 文件并添加以下內容:

[dependencies]
futures = "0.3"
// `block_on`會阻塞當前線程直到指定的`Future`執行完成,這種阻塞當前線程以等待任務完成的方式較為簡單、粗暴,
// 好在其它運行時的執行器(executor)會提供更加復雜的行為,例如將多個`future`調度到同一個線程上執行。
use futures::executor::block_on;async fn hello_world() {hello_cat().await;//等待異步方法完成println!("hello, world!");
}async fn hello_cat() {println!("hello, kitty!");
}
fn main() {let future = hello_world(); // 返回一個Future, 因此不會打印任何輸出block_on(future); // 執行`Future`并等待其運行完成,此時"hello, world!"會被打印輸出
}

總之,在async fn函數中使用.await可以等待另一個異步調用的完成。但是與block_on不同,.await并不會阻塞當前的線程,而是異步的等待Future A的完成,在等待的過程中,該線程還可以繼續執行其它的Future B,最終實現了并發處理的效果。

Future 執行器與任務調度

Future 特征

首先,來給出 Future 的定義:它是一個能產出值的異步計算(雖然該值可能為空,例如 () )。光看這個定義,一個簡化版的 Future 特征:

trait SimpleFuture {// 設置關聯類型type Output;// 輸出計算完成的結果或者輸出Pending表示本次不能夠完成計算fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
}// Future 需要被執行器poll(輪詢)后才能運行,并不能保證在一次 poll 中就被執行完,
enum Poll<T> {Ready(T),Pending,
}

若在當前 poll 中, Future 可以被完成,則會返回 Poll::Ready(result) ,反之則返回 Poll::Pending, 并且安排一個 wake 函數:當未來 Future 準備好進一步執行時, 該函數會被調用,然后管理該 Future 的執行器會再次調用 poll 方法,此時 Future 就可以繼續執行了。

用一個例子來說明下。考慮一個需要從 socket 讀取數據的場景:如果有數據,可以直接讀取數據并返回 Poll::Ready(data), 但如果沒有數據,Future 會被阻塞且不會再繼續執行,此時它會注冊一個 wake 函數,當 socket 數據準備好時,該函數將被調用以通知執行器:我們的 Future 已經準備好了,可以繼續執行。

pub struct SocketRead<'a> {socket: &'a Socket,
}impl SimpleFuture for SocketRead<'_> {type Output = Vec<u8>;fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {if self.socket.has_data_to_read() {// socket有數據,寫入buffer中并返回Poll::Ready(self.socket.read_buf())} else {// socket中還沒數據// 注冊一個`wake`函數,當數據可用時,該函數會被調用,// 然后當前Future的執行器會再次調用`poll`方法,此時就可以讀取到數據self.socket.set_readable_callback(wake);Poll::Pending}}
}

Future 模型允許將多個異步操作組合在一起,同時還無需任何內存分配。不僅僅如此,如果你需要同時運行多個 Future或鏈式調用多個 Future ,也可以通過無內存分配的狀態機實現,例如:


trait SimpleFuture {// 關聯類型是 trait 定義中的類型占位符。定義的時候,并不定義它的具體的類型是什么。在 impl 這個 trait 的時候,才為這個關聯類型賦予確定的類型。type Output;//關聯類型fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
}enum Poll<T> {Ready(T),Pending,
}/// 一個SimpleFuture,它會并發地運行兩個Future直到它們完成
///
/// 之所以可以并發,是因為兩個Future的輪詢可以交替進行,一個阻塞,另一個就可以立刻執行,反之亦然
pub struct Join<FutureA, FutureB> {// 結構體的每個字段都包含一個Future,可以運行直到完成.// 等到Future完成后,字段會被設置為 `None`. 這樣Future完成后,就不會再被輪詢a: Option<FutureA>,b: Option<FutureB>,
}impl<FutureA, FutureB> SimpleFuture for Join<FutureA, FutureB>
whereFutureA: SimpleFuture<Output = ()>,FutureB: SimpleFuture<Output = ()>,
{type Output = ();fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {// 嘗試去完成一個 Future `a`if let Some(a) = &mut self.a {if let Poll::Ready(()) = a.poll(wake) {self.a.take();}}// 嘗試去完成一個 Future `b`if let Some(b) = &mut self.b {if let Poll::Ready(()) = b.poll(wake) {self.b.take();}}if self.a.is_none() && self.b.is_none() {// 兩個 Future都已完成 - 我們可以成功地返回了Poll::Ready(())} else {// 至少還有一個 Future 沒有完成任務,因此返回 `Poll::Pending`.// 當該 Future 再次準備好時,通過調用`wake()`函數來繼續執行Poll::Pending}}
}

上面代碼展示了如何同時運行多個 Future, 且在此過程中沒有任何內存分配,讓并發編程更加高效。 類似的,多個Future也可以一個接一個的連續運行:

/// 一個SimpleFuture, 它使用順序的方式,一個接一個地運行兩個Future
//
// 而真實的`Andthen`允許根據第一個`Future`的輸出來創建第二個`Future`,因此復雜的多。
pub struct AndThenFut<FutureA, FutureB> {first: Option<FutureA>,second: FutureB,
}impl<FutureA, FutureB> SimpleFuture for AndThenFut<FutureA, FutureB>
whereFutureA: SimpleFuture<Output = ()>,FutureB: SimpleFuture<Output = ()>,
{type Output = ();fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {if let Some(first) = &mut self.first {match first.poll(wake) {// 我們已經完成了第一個 Future, 可以將它移除, 然后準備開始運行第二個Poll::Ready(()) => self.first.take(),// 第一個 Future 還不能完成Poll::Pending => return Poll::Pending,};}// 運行到這里,說明第一個Future已經完成,嘗試去完成第二個self.second.poll(wake)}
}

這些例子展示了在不需要內存對象分配以及深層嵌套回調的情況下,該如何使用 Future 特征去表達異步控制流。 在了解了基礎的控制流后,我們再來看看真實的 Future 特征有何不同之處。

trait Future {type Output;fn poll(// 首先值得注意的地方是,`self`的類型從`&mut self`變成了`Pin<&mut Self>`:self: Pin<&mut Self>,// 其次將`wake: fn()` 修改為 `cx: &mut Context<'_>`:cx: &mut Context<'_>,) -> Poll<Self::Output>;
}

首先這里多了一個 Pin ,現在你只需要知道使用它可以創建一個無法被移動的 Future ,因為無法被移動,所以它將具有固定的內存地址,意味著我們可以存儲它的指針(如果內存地址可能會變動,那存儲指針地址將毫無意義!),也意味著可以實現一個自引用數據結構: struct MyFut { a: i32, ptr_to_a: *const i32 }。

其次,從 wake: fn() 變成了 &mut Context<’_> 。意味著 wake 函數可以攜帶數據了。

使用 Waker 來喚醒任務

對于 Future 來說,第一次被 poll 時無法完成任務是很正常的。但它需要確保在未來一旦準備好時,可以通知執行器再次對其進行 poll 進而繼續往下執行,該通知就是通過 Waker 類型完成的。

Waker 提供了一個 wake() 方法可以用于告訴執行器:相關的任務可以被喚醒了,此時執行器就可以對相應的 Future 再次進行 poll 操作。

構建一個定時器

下面一起來實現一個簡單的定時器 Future 。為了讓例子盡量簡單,當計時器創建時,我們會啟動一個線程接著讓該線程進入睡眠,等睡眠結束后再通知給 Future 。

注意本例子還會在后面繼續使用,因此我們重新創建一個工程來演示:使用 cargo new --lib timer_future 來創建一個新工程,在 lib 包的根路徑 src/lib.rs 中添加以下內容:

use std::{future::Future,pin::Pin,sync::{Arc, Mutex},task::{Context, Poll, Waker},thread,time::Duration,
};

繼續來實現 Future 定時器,之前提到: 新建線程在睡眠結束后會需要將狀態同步給定時器 Future ,由于是多線程環境,我們需要使用 Arc//> 來作為一個共享狀態,用于在新線程和 Future 定時器間共享。

pub struct TimerFuture {// Arc是一種能夠使得數據在線程間安全共享的智能指針.它的工作方式從本質上來講,是對將要共享的數據進行包裝,并表現為此數據的一個指針。// Arc會追蹤這個指針的所有拷貝,當最后一份拷貝離開作用域時,它就會安全釋放內存。shared_state: Arc<Mutex<SharedState>>,
}/// 在Future和等待的線程間共享狀態
struct SharedState {/// 定時(睡眠)是否結束completed: bool,/// 當睡眠結束后,線程可以用`waker`通知`TimerFuture`來喚醒任務waker: Option<Waker>,
}

下面給出 Future 的具體實現:

impl Future for TimerFuture {type Output = ();// 函數沒有返回值,那么返回一個 ()// 通過 ; 結尾的表達式返回一個 ()fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {// 通過檢查共享狀態,來確定定時器是否已經完成let mut shared_state = self.shared_state.lock().unwrap();if shared_state.completed {// 計算完成,彈出計算數據Poll::Ready(())} else {// 設置`waker`,這樣新線程在睡眠(計時)結束后可以喚醒當前的任務,接著再次對`Future`進行`poll`操作,// 下面的`clone`每次被`poll`時都會發生一次,實際上,應該是只`clone`一次更加合理。// 選擇每次都`clone`的原因是: `TimerFuture`可以在執行器的不同任務間移動,如果只克隆一次,// 那么獲取到的`waker`可能已經被篡改并指向了其它任務,最終導致執行器運行了錯誤的任務shared_state.waker = Some(cx.waker().clone());// 設置pending狀態Poll::Pending}}
}

代碼很簡單,只要新線程設置了 shared_state.completed = true ,那任務就能順利結束。如果沒有設置,會為當前的任務克隆一份 Waker ,這樣新線程就可以使用它來喚醒當前的任務。

最后,再來創建一個 API 用于構建定時器和啟動計時線程:

impl TimerFuture {/// 創建一個新的`TimerFuture`,在指定的時間結束后,該`Future`可以完成pub fn new(duration: Duration) -> Self {let shared_state = Arc::new(Mutex::new(SharedState {completed: false,waker: None,}));// 創建新線程let thread_shared_state = shared_state.clone();thread::spawn(move || {// 睡眠指定時間實現計時功能thread::sleep(duration);let mut shared_state = thread_shared_state.lock().unwrap();// 通知執行器定時器已經完成,可以繼續`poll`對應的`Future`了shared_state.completed = true;if let Some(waker) = shared_state.waker.take() {waker.wake()}});TimerFuture { shared_state }}
}
執行器 Executor

Rust 的 Future 是惰性的。其中一個推動它的方式就是在 async 函數中使用 .await 來調用另一個 async 函數,但是這個只能解決 async 內部的問題,那么這些最外層的 async 函數由執行器 executor控制 。

執行器會管理一批 Future (最外層的 async 函數),然后通過不停地 poll 推動它們直到完成。 最開始,執行器會先 poll 一次 Future ,后面就不會主動去 poll 了,而是等待 Future 通過調用 wake 函數來通知它可以繼續,它才會繼續去 poll 。這種 wake 通知然后 poll 的方式會不斷重復,直到 Future 完成。

構建執行器

下面我們將實現一個簡單的執行器,它可以同時并發運行多個 Future 。例子中,需要用到 futures 包的 ArcWake 特征,它可以提供一個方便的途徑去構建一個 Waker 。編輯 Cargo.toml ,添加下面依賴:

[dependencies]
futures = "0.3"

在之前的內容中,我們在 src/lib.rs 中創建了定時器 Future ,現在在 src/main.rs 中來創建程序的主體內容,開始之前,先引入所需的包:


use {futures::{future::{BoxFuture, FutureExt},task::{waker_ref, ArcWake},},std::{future::Future,sync::mpsc::{sync_channel, Receiver, SyncSender},sync::{Arc, Mutex},task::{Context, Poll},time::Duration,},// 引入之前實現的定時器模塊timer_future::TimerFuture,
};

執行器需要從一個消息通道( channel )中拉取事件,然后運行它們。當一個任務準備好后(可以繼續執行),它會將自己放入消息通道中,然后等待執行器 poll 。

/// 任務執行器,負責從通道中接收任務然后執行
struct Executor {ready_queue: Receiver<Arc<Task>>,
}/// `Spawner`負責創建新的`Future`然后將它發送到任務通道中
#[derive(Clone)]
struct Spawner {task_sender: SyncSender<Arc<Task>>,
}/// 一個Future,它可以調度自己(將自己放入任務通道中),然后等待執行器去`poll`
struct Task {/// 進行中的Future,在未來的某個時間點會被完成////// 按理來說`Mutex`在這里是多余的,因為我們只有一個線程來執行任務。但是由于/// Rust并不聰明,它無法知道`Future`只會在一個線程內被修改,并不會被跨線程修改。因此/// 我們需要使用`Mutex`來滿足這個笨笨的編譯器對線程安全的執著。////// 如果是生產級的執行器實現,不會使用`Mutex`,因為會帶來性能上的開銷,取而代之的是使用`UnsafeCell`future: Mutex<Option<BoxFuture<'static, ()>>>,/// 可以將該任務自身放回到任務通道中,等待執行器的polltask_sender: SyncSender<Arc<Task>>,
}fn new_executor_and_spawner() -> (Executor, Spawner) {// 任務通道允許的最大緩沖數(任務隊列的最大長度)// 當前的實現僅僅是為了簡單,在實際的執行中,并不會這么使用const MAX_QUEUED_TASKS: usize = 10_000;let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);(Executor { ready_queue }, Spawner { task_sender })
}

下面再來添加一個方法用于生成 Future , 然后將它放入任務通道中:

impl Spawner {fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {let future = future.boxed();let task = Arc::new(Task {future: Mutex::new(Some(future)),task_sender: self.task_sender.clone(),});self.task_sender.send(task).expect("任務隊列已滿");}
}

在執行器 poll 一個 Future 之前,首先需要調用 wake 方法進行喚醒,然后再由 Waker 負責調度該任務并將其放入任務通道中。創建 Waker 的最簡單的方式就是實現 ArcWake 特征,先來為我們的任務實現 ArcWake 特征,這樣它們就能被轉變成 Waker 然后被喚醒:

impl ArcWake for Task {fn wake_by_ref(arc_self: &Arc<Self>) {// 通過發送任務到任務管道的方式來實現`wake`,這樣`wake`后,任務就能被執行器`poll`let cloned = arc_self.clone();arc_self.task_sender.send(cloned).expect("任務隊列已滿");}
}

當任務實現了 ArcWake 特征后,它就變成了 Waker ,在調用 wake() 對其喚醒后會將任務復制一份所有權( Arc ),然后將其發送到任務通道中。最后我們的執行器將從通道中獲取任務,然后進行 poll 執行:

impl Executor {fn run(&self) {while let Ok(task) = self.ready_queue.recv() {// 獲取一個future,若它還沒有完成(仍然是Some,不是None),則對它進行一次poll并嘗試完成它let mut future_slot = task.future.lock().unwrap();if let Some(mut future) = future_slot.take() {// 基于任務自身創建一個 `LocalWaker`let waker = waker_ref(&task);let context = &mut Context::from_waker(&*waker);// `BoxFuture<T>`是`Pin<Box<dyn Future<Output = T> + Send + 'static>>`的類型別名// 通過調用`as_mut`方法,可以將上面的類型轉換成`Pin<&mut dyn Future + Send + 'static>`if future.as_mut().poll(context).is_pending() {// Future還沒執行完,因此將它放回任務中,等待下次被poll*future_slot = Some(future);}}}}
}

恭喜!我們終于擁有了自己的執行器,下面再來寫一段代碼使用該執行器去運行之前的定時器 Future :

fn main() {let (executor, spawner) = new_executor_and_spawner();// 生成一個任務spawner.spawn(async {println!("howdy!");// 創建定時器Future,并等待它完成TimerFuture::new(Duration::new(2, 0)).await;println!("done!");});// drop掉任務,這樣執行器就知道任務已經完成,不會再有新的任務進來drop(spawner);// 運行執行器直到任務隊列為空// 任務運行后,會先打印`howdy!`, 暫停2秒,接著打印 `done!`executor.run();
}

完整代碼

main.rs

use {futures::{future::{BoxFuture, FutureExt},task::{waker_ref, ArcWake},},std::{future::Future,sync::mpsc::{sync_channel, Receiver, SyncSender},sync::{Arc, Mutex},task::{Context, Poll},time::Duration,},// 引入之前實現的定時器模塊timer_future::TimerFuture,
};/// 任務執行器,負責從通道中接收任務然后執行
struct Executor {ready_queue: Receiver<Arc<Task>>,
}/// `Spawner`負責創建新的`Future`然后將它發送到任務通道中
#[derive(Clone)]
struct Spawner {task_sender: SyncSender<Arc<Task>>,
}/// 一個Future,它可以調度自己(將自己放入任務通道中),然后等待執行器去`poll`
struct Task {/// 進行中的Future,在未來的某個時間點會被完成////// 按理來說`Mutex`在這里是多余的,因為我們只有一個線程來執行任務。但是由于/// Rust并不聰明,它無法知道`Future`只會在一個線程內被修改,并不會被跨線程修改。因此/// 我們需要使用`Mutex`來滿足這個笨笨的編譯器對線程安全的執著。////// 如果是生產級的執行器實現,不會使用`Mutex`,因為會帶來性能上的開銷,取而代之的是使用`UnsafeCell`future: Mutex<Option<BoxFuture<'static, ()>>>,/// 可以將該任務自身放回到任務通道中,等待執行器的polltask_sender: SyncSender<Arc<Task>>,
}fn new_executor_and_spawner() -> (Executor, Spawner) {// 任務通道允許的最大緩沖數(任務隊列的最大長度)// 當前的實現僅僅是為了簡單,在實際的執行中,并不會這么使用const MAX_QUEUED_TASKS: usize = 10_000;let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);//設置緩沖通道(Executor { ready_queue }, Spawner { task_sender })
}
impl Spawner {// 派生任務函數fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {let future = future.boxed();let task = Arc::new(Task {future: Mutex::new(Some(future)),task_sender: self.task_sender.clone(),});self.task_sender.send(task).expect("任務隊列已滿");}
}
impl ArcWake for Task {fn wake_by_ref(arc_self: &Arc<Self>) {// 通過發送任務到任務管道的方式來實現`wake`,這樣`wake`后,任務就能被執行器`poll`let cloned = arc_self.clone();arc_self.task_sender.send(cloned).expect("任務隊列已滿");}
}
impl Executor {fn run(&self) {while let Ok(task) = self.ready_queue.recv() {// 獲取一個future,若它還沒有完成(仍然是Some,不是None),則對它進行一次poll并嘗試完成它let mut future_slot = task.future.lock().unwrap();if let Some(mut future) = future_slot.take() {// 基于任務自身創建一個 `LocalWaker`let waker = waker_ref(&task);let context = &mut Context::from_waker(&*waker);// `BoxFuture<T>`是`Pin<Box<dyn Future<Output = T> + Send + 'static>>`的類型別名// 通過調用`as_mut`方法,可以將上面的類型轉換成`Pin<&mut dyn Future + Send + 'static>`if future.as_mut().poll(context).is_pending() {// Future還沒執行完,因此將它放回任務中,等待下次被poll*future_slot = Some(future);}}}}
}fn main() {let (executor, spawner) = new_executor_and_spawner();// 生成一個任務spawner.spawn(async {println!("howdy!");// 創建定時器Future,并等待它完成TimerFuture::new(Duration::new(2, 0)).await;println!("done!");});// drop掉任務,這樣執行器就知道任務已經完成,不會再有新的任務進來drop(spawner);// 運行執行器直到任務隊列為空// 任務運行后,會先打印`howdy!`, 暫停2秒,接著打印 `done!`executor.run();
}

lib.rs

use std::{future::Future,pin::Pin,sync::{Arc, Mutex},task::{Context, Poll, Waker},thread,time::Duration,
};
pub struct TimerFuture {// Arc是一種能夠使得數據在線程間安全共享的智能指針.它的工作方式從本質上來講,是對將要共享的數據進行包裝,并表現為此數據的一個指針。// Arc會追蹤這個指針的所有拷貝,當最后一份拷貝離開作用域時,它就會安全釋放內存。shared_state: Arc<Mutex<SharedState>>,
}/// 在Future和等待的線程間共享狀態
struct SharedState {/// 定時(睡眠)是否結束completed: bool,/// 當睡眠結束后,線程可以用`waker`通知`TimerFuture`來喚醒任務waker: Option<Waker>,
}
impl Future for TimerFuture {type Output = ();// 函數沒有返回值,那么返回一個 ()// 通過 ; 結尾的表達式返回一個 ()fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {// 通過檢查共享狀態,來確定定時器是否已經完成let mut shared_state = self.shared_state.lock().unwrap();if shared_state.completed {// 計算完成,彈出計算數據Poll::Ready(())} else {// 設置`waker`,這樣新線程在睡眠(計時)結束后可以喚醒當前的任務,接著再次對`Future`進行`poll`操作,// 下面的`clone`每次被`poll`時都會發生一次,實際上,應該是只`clone`一次更加合理。// 選擇每次都`clone`的原因是: `TimerFuture`可以在執行器的不同任務間移動,如果只克隆一次,// 那么獲取到的`waker`可能已經被篡改并指向了其它任務,最終導致執行器運行了錯誤的任務shared_state.waker = Some(cx.waker().clone());// 設置pending狀態Poll::Pending}}
}impl TimerFuture {/// 創建一個新的`TimerFuture`,在指定的時間結束后,該`Future`可以完成pub fn new(duration: Duration) -> Self {// 設置狀態和喚醒函數let shared_state = Arc::new(Mutex::new(SharedState {completed: false,waker: None,}));// 創建新線程let thread_shared_state = shared_state.clone();// thread::spawn 函數的返回值類型是 JoinHandle, 通過 JoinHandle 來等待所有線程完成就可以解決上面執行不完的問題。thread::spawn(move || { //move 閉包通常和 thread::spawn 函數一起使用,它允許你使用其它線程的數據。創建線程時,把值的所有權從一個線程轉移到另一個線程。// 睡眠指定時間實現計時功能thread::sleep(duration);// 線程加鎖let mut shared_state = thread_shared_state.lock().unwrap();// 通知執行器定時器已經完成,可以繼續`poll`對應的`Future`了shared_state.completed = true;if let Some(waker) = shared_state.waker.take() {waker.wake()}});// 返回創建的對象TimerFuture { shared_state }}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/208168.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/208168.shtml
英文地址,請注明出處:http://en.pswp.cn/news/208168.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Linux設置root初始密碼

目錄 一、Linux系統中普通用戶和特權用戶&#xff08;root&#xff09; 二、Linux系統中設置root初始密碼 一、Linux系統中普通用戶和特權用戶&#xff08;root&#xff09; windows 系統中有普通用戶和特權用戶&#xff0c;特權用戶是 administer&#xff0c;普通用戶可以…

mybatisplus調用oracle存儲過程

mybatisplus調用oracle存儲過程 創建一個測試的oracle存儲過程 -- 創建攜帶返回值存儲過程 CREATE OR REPLACE PROCEDURE SP_SUM_PROC_2023(number1 IN NUMBER, number2 IN NUMBER, result OUT NUMBER,result2 OUT NUMBER) is BEGIN result : number1 number2; result2 : 99…

微服務01

筆記&#xff1a; day03-微服務01 - 飛書云文檔 (feishu.cn) 數據庫連接不上&#xff1f; 要在虛擬機啟動MySQL容器。docker start mysql 服務治理 服務提供者&#xff1a;暴露服務接口&#xff0c;供其他服務調用 服務消費者&#xff1a;調用其他服務提供的接口 注冊中心&…

Java IO流(一) 基本知識

Java IO流 一、基礎知識 IO流即存儲和讀取數據的解決方案。 &#xff08;一&#xff09;File 表示系統中的文件或者文件夾的路徑 獲取文件信息(大小&#xff0c;文件名&#xff0c;修改時間) 創建文件/文件夾 刪除文件/文件夾 判斷文件的類型 注意&#xff1a;File類只能對…

STL(五)(queue篇)

我發現之前一版在電腦上看 常用函數部分 沒有問題,由于是手打上去的,在手機上看會發生錯位問題,現已將電腦原版 常用函數部分 截圖改為圖片形式,不會再發生錯位問題,非常感謝大家的支持 ### priority_queue優先隊列出現頻率非常高,尤為重要(是一定要掌握的數據結構) 1.queue隊…

A : DS靜態查找之順序查找

Description 給出一個隊列和要查找的數值&#xff0c;找出數值在隊列中的位置&#xff0c;隊列位置從1開始 要求使用帶哨兵的順序查找算法 Input 第一行輸入n&#xff0c;表示隊列有n個數據 第二行輸入n個數據&#xff0c;都是正整數&#xff0c;用空格隔開 第三行輸入t&…

Spring-retry失敗重試機制

提示&#xff1a;文章寫完后&#xff0c;目錄可以自動生成&#xff0c;如何生成可參考右邊的幫助文檔 文章目錄 前言一、引入依賴二、主啟動類上加EnableRetry三、Server層注意 四、失敗后回調方法總結 前言 提示&#xff1a;SpringBoot項目為例 原文鏈接&#xff1a;https://…

docker全解

docker全解 一、docker的基本概念 什么是docker? docker是一個開源的應用容器引擎&#xff0c;讓開發者可以打包他們的應用以及依賴包到一個可移植的鏡像中&#xff0c;然后發布到任何流行的Linux或Windows機器上&#xff0c;也可以實現虛擬化。容器是完全使用沙箱機制&#…

MIT線性代數筆記-第26講-對稱矩陣及正定性

目錄 26.對稱矩陣及正定性打賞 26.對稱矩陣及正定性 實對稱矩陣的特征值均為實數&#xff0c;并且一定存在一組兩兩正交的特征向量 這對于單位矩陣顯然成立 證明特征值均為實數&#xff1a; ? ???設一個對稱矩陣 A A A&#xff0c;對于 A x ? λ x ? A \vec{x} \lambda…

作業12.8

1. 使用手動連接&#xff0c;將登錄框中的取消按鈕使用qt4版本的連接到自定義的槽函數中&#xff0c;在自定義的槽函數中調用關閉函數。將登錄按鈕使用qt5版本的連接到自定義的槽函數中&#xff0c;在槽函數中判斷ui界面上輸入的賬號是否為"admin"&#xff0c;密碼是…

Matlab simulink PLL學習筆記

本文學習內容&#xff1a;【官方】2022小邁步之 MATLAB助力芯片設計系列&#xff08;一&#xff09;&#xff1a;電路仿真與模數混合設計基礎_嗶哩嗶哩_bilibili 時域模型 testbench搭建 菜單欄點擊simulink 創建空白模型 點擊庫瀏覽器 在PLL里面選擇一種架構拖拽到畫布。 如…

一文理解什么是交叉熵損失函數以及它的作用

今天看一個在深度學習中很枯燥但很重要的概念——交叉熵損失函數。 作為一種損失函數&#xff0c;它的重要作用便是可以將“預測值”和“真實值(標簽)”進行對比&#xff0c;從而輸出 loss 值&#xff0c;直到 loss 值收斂&#xff0c;可以認為神經網絡模型訓練完成。 那么這…

【Java用法】Hutool樹結構工具-TreeUtil快速構建樹形結構的兩種方式 + 數據排序

Hutool樹結構工具-TreeUtil快速構建樹形結構的兩種方式 數據排序 一、業務場景二、Hutool官網樹結構工具2.1 介紹2.2 使用2.2.1 定義結構2.2.2 構建Tree2.2.3 自定義字段名 2.3 說明 三、具體的使用場景3.1 實現的效果3.2 業務代碼3.3 實現自定義字段的排序 四、踩過的坑4.1 坑…

策略產品經理常用的ChatGPT通用提示詞模板

產品策略&#xff1a;請幫助我制定一個策略產品的產品策略。 市場調研&#xff1a;如何進行策略產品的市場調研&#xff1f; 競爭分析&#xff1a;如何進行策略產品的競爭分析&#xff1f; 用戶畫像&#xff1a;如何構建策略產品的用戶畫像&#xff1f; 產品定位&#xff1…

交換排序(冒泡排序)(快速排序(1))

目錄 1.交換排序 &#xff08;1&#xff09;冒泡排序 &#xff08;2&#xff09;快速排序 1.交換排序 基本思想&#xff1a;所謂交換&#xff0c;就是根據序列中兩個記錄鍵值的比較結果來對換這兩個記錄在序列中的位置&#xff0c;交換排序的特點是&#xff1a;將鍵值較大的…

ambari hive on Tez引擎一直卡住

hive on tez使用./bin/hive啟動后一直卡住&#xff0c;無法進入命令行 使用TEZ作為Hive默認執行引擎時&#xff0c;需要在調用Hive CLI的時候啟動YARN應用&#xff0c;預分配資源&#xff0c;這需要花一些時間&#xff0c;而使用MapReduce作為執行引擎時是在執行語句的時候才會…

iPaaS架構深入探討

在數字化時代全面來臨之際&#xff0c;企業正面臨著前所未有的挑戰與機遇。技術的迅猛發展與數字化轉型正在徹底顛覆各行各業的格局&#xff0c;不斷推動著企業邁向新的前程。然而&#xff0c;這一數字化時代亦衍生出一系列復雜而深奧的難題&#xff1a;各異系統之間數據孤島、…

基于SuperMap iObjects Java生成地圖瓦片

作者&#xff1a;dongyx 前言 在GIS領域&#xff0c;地圖瓦片技術作為GIS領域的關鍵技術&#xff0c;是提高地圖服務性能的關鍵手段之一。通過預先生成地圖的瓦片數據&#xff0c;可以顯著提升用戶訪問地圖時的響應速度和體驗。SuperMap iObjects for Java作為一款強大的GIS開…

Docker, Docker-compose部署Sonarqube

參考文檔 鏡像地址: https://hub.docker.com/_/sonarqube/tags Docker部署文檔地址 Installing from Docker | SonarQube Docs Docker-compose文檔部署地址&#xff1a; Installing from Docker | SonarQube Docs 部署鏡像 2.1 docker部署 # 宿主機執行 $. vi /etc/sysctl.conf…

Java注解詳解

概述 注解是對程序代碼進行標注和解釋的一種方式。在Java中&#xff0c;注解提供了一種元數據形式&#xff0c;能夠在程序中嵌入有關程序的信息&#xff0c;以便進行進一步的處理。注解通過使用符號來聲明&#xff0c;如Override、Deprecated等。 注解和注釋的區別 注釋&…