本文內容
- 使用 Await 掛起執行
- 取消異步操作
- 監視進度
- 使用內置的基于任務的連結符
- 構建基于任務的連結符
- 構建基于任務的數據結構
c#使用基于任務的異步模式 (TAP) 處理異步操作時,可以使用回叫實現等待,而不會阻塞。 對于任務,這可通過?Task.ContinueWith?等方法實現。 通過允許在正常控制流中等待異步操縱,基于語言的異步支持可隱藏回叫,并且編譯器生成的代碼可提供此相同 API 級別的支持。
1、使用 Await 掛起執行
可以使用 C# 中的?await?關鍵字和 Visual Basic 中的?Await 運算符來異步等待?Task?和?Task<TResult>?對象。 等待?Task?時,await
?表達式的類型為?void
。 等待?Task<TResult>?時,await
?表達式的類型為?TResult
。?await
?表達式必須出現在異步方法的正文內。 (.NET Framework 4.5 中引入了這些語言功能。)
實際上,await 功能通過使用延續任務在任務上安裝回叫。 此回叫在掛起點恢復異步方法。 恢復異步方法時,如果等待的操作已成功完成且為?Task<TResult>,返回的是?TResult
。 如果等待的?Task?或?Task<TResult>?以?Canceled?狀態結束,就會拋出?OperationCanceledException?異常。 如果等待的?Task?或?Task<TResult>?以?Faulted?狀態結束,就會拋出導致它發生故障的異常。 一個?Task
?可能由于多個異常而出錯,但只會傳播一個異常。 不過,Task.Exception?屬性會返回包含所有錯誤的?AggregateException?異常。
如果同步上下文(SynchronizationContext?對象)與暫停時正在執行異步方法的線程(例如,SynchronizationContext.Current?屬性不是?null
)相關聯,異步方法使用上下文的?Post?方法,恢復相同的同步上下文。 否則,它依賴暫停時的當前任務計劃程序(TaskScheduler?對象)。 通常情況下,這是定目標到線程池的默認任務計劃程序 (TaskScheduler.Default)。 此任務計劃程序確定等待的異步操作是否應在該操作完成時恢復,或是否應計劃該恢復。 默認計劃程序通常允許在完成等待操作的線程上延續任務。
調用異步方法時,將同步執行函數的正文,直到遇見尚未完成的可等待實例上的第一個 await 表達式,此時調用返回到調用方。 如果異步方法不返回?void
,將會返回?Task?或?Task<TResult>?對象,以表示正在進行的計算。 在非 void 異步方法中,如果遇到 return 語句或到達方法正文末尾,任務就以?RanToCompletion?最終狀態完成。 如果未經處理的異常導致無法控制異步方法正文,任務就以?Faulted?狀態結束。 如果異常為?OperationCanceledException,任務改為以?Canceled?狀態結束。 通過此方式,最終將發布結果或異常。
此行為有幾種重要特殊情況。 出于性能原因,如果任務在等待時已完成,則不會生成控件,并且該函數將繼續執行。 返回到原始上下文并不總是所需的行為,可對其進行更改;將在下一節中更詳細地描述此內容。
1.1 使用 Yield 和 ConfigureAwait 配置掛起和恢復
有多種方法可更好地控制異步方法的執行。 例如,可以使用?Task.Yield?方法,將暫停點引入異步方法:
public class Task : …
{public static YieldAwaitable Yield();…
}
這相當于以異步方式發布或計劃返回當前上下文。
Task.Run(async delegate
{for(int i=0; i<1000000; i++){await Task.Yield(); // fork the continuation into a separate work item...}
});
還可以使用?Task.ConfigureAwait?方法,更好地控制異步方法中的暫停和恢復。 如前所述,默認情況下,異步方法掛起時會捕獲當前上下文,捕獲的上下文用于在恢復時調用異步方法的延續。 在多數情況下,這就是你所需的確切行為。 在其他情況下,你可能不關心延續上下文,則可以通過避免此類發布返回原始上下文來獲得更好的性能。 若要啟用此功能,請使用?Task.ConfigureAwait?方法,指示等待操作不要捕獲和恢復上下文,而是繼續執行正在等待完成的所有異步操作:
await someTask.ConfigureAwait(continueOnCapturedContext:false);
2、取消異步操作
從 .NET Framework 4 開始,支持取消操作的 TAP 方法提供至少一個接受取消令牌(CancellationToken?對象)的重載。
可通過取消令牌源(CancellationTokenSource?對象)創建取消令牌。 源的?Token?屬性返回取消令牌,它在源的?Cancel?方法獲得調用時收到信號。 例如,若要下載一個網頁,并且希望能夠取消此操作,請創建?CancellationTokenSource?對象,將它的令牌傳遞給 TAP 方法,再在準備好取消此操作時,調用源的?Cancel?方法:
var cts = new CancellationTokenSource();
string result = await DownloadStringTaskAsync(url, cts.Token);
… // at some point later, potentially on another thread
cts.Cancel();
若要取消多個異步調用,可以將相同令牌傳遞給所有調用:
var cts = new CancellationTokenSource();IList<string> results = await Task.WhenAll(from url in urls select DownloadStringTaskAsync(url, cts.Token));// at some point later, potentially on another thread…cts.Cancel();
或者,將相同令牌傳遞給操作的選擇性子集:
var cts = new CancellationTokenSource();byte [] data = await DownloadDataAsync(url, cts.Token);await SaveToDiskAsync(outputPath, data, CancellationToken.None);… // at some point later, potentially on another threadcts.Cancel();
?重要
可以從任意線程啟動取消請求。
可以將?CancellationToken.None?值傳遞給接受取消令牌的任何方法,指明絕不會請求取消操作。 這會導致?CancellationToken.CanBeCanceled?屬性返回?false
,并且調用的方法可以相應地進行優化。 出于測試目的,還可以通過傳入預取消標記(該標記使用接受布爾值的構造函數進行實例化)來指示是否應以已取消或未取消狀態啟動標記。
使用此方法進行取消具有以下優點:
-
可以將相同的取消標記傳遞給任意數量的異步和同步操作。
-
相同的取消請求可能會擴展到任意數量的偵聽器。
-
異步 API 的開發人員可完全控制是否可以請求取消以及取消何時生效。
-
使用 API 的代碼可以選擇性地確定將對其傳播取消請求的異步調用。
3、監視進度
某些異步方法通過傳入異步方法的進度接口來公開進度。 例如,設想某個函數以異步方式下載文本字符串,并在該過程中引發包括到目前為止下載完成百分比的進度更新。 此類方法可在 Windows Presentation Foundation (WPF) 應用程序中使用,如下所示:
private async void btnDownload_Click(object sender, RoutedEventArgs e)
{btnDownload.IsEnabled = false;try{txtResult.Text = await DownloadStringTaskAsync(txtUrl.Text,new Progress<int>(p => pbDownloadProgress.Value = p));}finally { btnDownload.IsEnabled = true; }
}
4、使用內置的基于任務的連結符
System.Threading.Tasks?命名空間包含多個方法,可用于撰寫和處理任務。
4.1 Task.Run
Task?類包含多個?Run?方法,以便于將工作作為?Task?或?Task<TResult>?輕松卸載到線程池,例如:
public async void button1_Click(object sender, EventArgs e)
{textBox1.Text = await Task.Run(() =>{// … do compute-bound work herereturn answer;});
}
其中部分?Run?方法(如?Task.Run(Func<Task>)?重載)以?TaskFactory.StartNew?方法的簡約表示形式存在。 借助此重載,可以在卸載的工作內使用 await,例如:
public async void button1_Click(object sender, EventArgs e)
{pictureBox1.Image = await Task.Run(async() =>{using(Bitmap bmp1 = await DownloadFirstImageAsync())using(Bitmap bmp2 = await DownloadSecondImageAsync())return Mashup(bmp1, bmp2);});
}
此類重載在邏輯上相當于結合使用?TaskFactory.StartNew?方法和任務并行庫中的?Unwrap?擴展方法。
4.2 Task.FromResult
FromResult?方法的適用情景為,數據可能已存在,且只需通過提升到?Task<TResult>?的任務返回方法返回:
public Task<int> GetValueAsync(string key)
{int cachedValue;return TryGetCachedValue(out cachedValue) ?Task.FromResult(cachedValue) :GetValueAsyncInternal();
}private async Task<int> GetValueAsyncInternal(string key)
{…
}
4.3 Task.WhenAll
WhenAll?方法可用于異步等待多個表示為任務的異步操作。 該方法所具有的多個重載支持一組非泛型任務或一組不統一的常規任務(如異步等待多個返回 void 的操作,或異步等待多個返回值的方法,其中每個值可能具有不同類型),并支持一組統一的常規任務(如異步等待多個?TResult
?返回方法)。
假設你想要向多個客戶發送電子郵件。 你可以重疊發送郵件,因此發送郵件時無需等待上一封郵件完成發送。 還可以查看發送操作完成的時間,以及是否發生了錯誤:
IEnumerable<Task> asyncOps = from addr in addrs select SendMailAsync(addr);
await Task.WhenAll(asyncOps);
此代碼不顯式處理可能發生的異常,而是通過對?WhenAll?生成的任務執行?await
?傳播異常。 若要處理該異常,可以使用以下代碼:
IEnumerable<Task> asyncOps = from addr in addrs select SendMailAsync(addr);
try
{await Task.WhenAll(asyncOps);
}
catch(Exception exc)
{...
}
在這種情況下,如果任意異步操作失敗,所有異常都會合并到?AggregateException?異常中,此異常存儲在?WhenAll?方法返回的?Task?中。 但是,僅通過?await
?關鍵字傳播其中一個異常。 如果想要檢查所有異常,可以重寫前面的代碼,如下所示:
Task [] asyncOps = (from addr in addrs select SendMailAsync(addr)).ToArray();
try
{await Task.WhenAll(asyncOps);
}
catch(Exception exc)
{foreach(Task faulted in asyncOps.Where(t => t.IsFaulted)){… // work with faulted and faulted.Exception}
}
讓我們考慮一下以異步方式從 Web 下載多個文件的示例。 在此示例中,所有異步操作具有相同的結果類型,并很容易對結果進行訪問:
string [] pages = await Task.WhenAll(from url in urls select DownloadStringTaskAsync(url));
可以使用上一個返回 void 方案中所討論的異常處理技術:
Task<string> [] asyncOps =(from url in urls select DownloadStringTaskAsync(url)).ToArray();
try
{string [] pages = await Task.WhenAll(asyncOps);...
}
catch(Exception exc)
{foreach(Task<string> faulted in asyncOps.Where(t => t.IsFaulted)){… // work with faulted and faulted.Exception}
}
4.4 Task.WhenAny
WhenAny?方法可用于異步等待多個表示為要完成的任務的異步操作之一。 此方法適用于四個主要用例:
-
冗余:多次執行一個操作并選擇最先完成的一次(例如,聯系能夠生成一個結果的多個股市行情 Web 服務并選擇完成最快的一個)。
-
交錯:啟動多個操作并等待所有這些操作完成,但是在完成這些操作時對其進行處理。
-
限制:允許其他操作完成時開始附加操作。 這是交錯方案的擴展。
-
早期釋放:例如,用任務 t1 表示的操作可以與任務 t2 組成?WhenAny?任務,您可以等待?WhenAny?任務。 任務 t2 可以表示超時、取消或其他一些導致?WhenAny?任務先于 t1 完成的信號。
冗余
假設你想要決定是否購買股票。 你信任一些股票建議 Web 服務,但每個服務最終會在不同的時間段變得很慢,具體取決于每日負載。?WhenAny?方法可用于在任何操作完成時接收通知:
var recommendations = new List<Task<bool>>()
{GetBuyRecommendation1Async(symbol),GetBuyRecommendation2Async(symbol),GetBuyRecommendation3Async(symbol)
};
Task<bool> recommendation = await Task.WhenAny(recommendations);
if (await recommendation) BuyStock(symbol);
WhenAll?返回已成功完成的所有任務的取消包裝結果。與它不同,WhenAny?返回已完成的任意任務。 如果任務失敗,重要的是知道該任務失敗,如果任務成功,重要的是知道返回值與哪個任務相關聯。 因此,你需要訪問返回任務的結果,或進一步等待,如本示例中所示。
與?WhenAll?一樣,必須能夠容納異常。 因為接收到完成的任務后,可以等待返回的任務傳播錯誤,并適當地進行?try/catch
,例如:
Task<bool> [] recommendations = …;
while(recommendations.Count > 0)
{Task<bool> recommendation = await Task.WhenAny(recommendations);try{if (await recommendation) BuyStock(symbol);break;}catch(WebException exc){recommendations.Remove(recommendation);}
}
此外,即使第一個任務成功完成,后續任務也可能會失敗。 此時,可以有多個選擇來處理異常:可以等待所有啟動的任務完成,這種情況可以使用?WhenAll?方法,或者決定所有異常是否重要且必須記錄。 為此,可以使用延續任務以在任務異步完成時接收通知:
foreach(Task recommendation in recommendations)
{var ignored = recommendation.ContinueWith(t => { if (t.IsFaulted) Log(t.Exception); });
}
或:
foreach(Task recommendation in recommendations)
{var ignored = recommendation.ContinueWith(t => Log(t.Exception), TaskContinuationOptions.OnlyOnFaulted);
}
或者甚至:
private static async void LogCompletionIfFailed(IEnumerable<Task> tasks)
{foreach(var task in tasks){try { await task; }catch(Exception exc) { Log(exc); }}
}
…
LogCompletionIfFailed(recommendations);
最后,若要取消所有剩余操作:
var cts = new CancellationTokenSource();
var recommendations = new List<Task<bool>>()
{GetBuyRecommendation1Async(symbol, cts.Token),GetBuyRecommendation2Async(symbol, cts.Token),GetBuyRecommendation3Async(symbol, cts.Token)
};Task<bool> recommendation = await Task.WhenAny(recommendations);
cts.Cancel();
if (await recommendation) BuyStock(symbol);
交錯
假設你要從 Web 下載圖像,并且處理每個圖像(例如,將圖像添加到 UI 控件)。 可以在 UI 線程上按順序處理圖像,但建議盡可能同時下載圖像。 此外,建議不要直到所有圖像都下載完成才將圖像添加到 UI。 建議在完成下載時添加它們。
List<Task<Bitmap>> imageTasks =(from imageUrl in urls select GetBitmapAsync(imageUrl)).ToList();
while(imageTasks.Count > 0)
{try{Task<Bitmap> imageTask = await Task.WhenAny(imageTasks);imageTasks.Remove(imageTask);Bitmap image = await imageTask;panel.AddImage(image);}catch{}
}
還可以將交錯應用于涉及下載圖像?ThreadPool?的計算密集型處理的方案;例如:
List<Task<Bitmap>> imageTasks =(from imageUrl in urls select GetBitmapAsync(imageUrl).ContinueWith(t => ConvertImage(t.Result)).ToList();
while(imageTasks.Count > 0)
{try{Task<Bitmap> imageTask = await Task.WhenAny(imageTasks);imageTasks.Remove(imageTask);Bitmap image = await imageTask;panel.AddImage(image);}catch{}
}
遏制
請考慮交錯示例,因用戶大量下載圖像而導致下載必須受到遏制除外;例如,你希望僅能同時下載特定數目的內容。 為此,可以啟動異步操作的子集。 操作完成后,你可以啟動其他操作對其進行替代:
const int CONCURRENCY_LEVEL = 15;
Uri [] urls = …;
int nextIndex = 0;
var imageTasks = new List<Task<Bitmap>>();
while(nextIndex < CONCURRENCY_LEVEL && nextIndex < urls.Length)
{imageTasks.Add(GetBitmapAsync(urls[nextIndex]));nextIndex++;
}while(imageTasks.Count > 0)
{try{Task<Bitmap> imageTask = await Task.WhenAny(imageTasks);imageTasks.Remove(imageTask);Bitmap image = await imageTask;panel.AddImage(image);}catch(Exception exc) { Log(exc); }if (nextIndex < urls.Length){imageTasks.Add(GetBitmapAsync(urls[nextIndex]));nextIndex++;}
}
早期釋放
假設正在異步等待某個操作完成的同時,對用戶的取消請求(例如,用戶單擊取消按鈕)進行響應。 以下代碼闡釋了此方案:
private CancellationTokenSource m_cts;public void btnCancel_Click(object sender, EventArgs e)
{if (m_cts != null) m_cts.Cancel();
}public async void btnRun_Click(object sender, EventArgs e)
{m_cts = new CancellationTokenSource();btnRun.Enabled = false;try{Task<Bitmap> imageDownload = GetBitmapAsync(txtUrl.Text);await UntilCompletionOrCancellation(imageDownload, m_cts.Token);if (imageDownload.IsCompleted){Bitmap image = await imageDownload;panel.AddImage(image);}else imageDownload.ContinueWith(t => Log(t));}finally { btnRun.Enabled = true; }
}private static async Task UntilCompletionOrCancellation(Task asyncOp, CancellationToken ct)
{var tcs = new TaskCompletionSource<bool>();using(ct.Register(() => tcs.TrySetResult(true)))await Task.WhenAny(asyncOp, tcs.Task);return asyncOp;
}
一旦決定退出,此實現將重新啟用用戶界面,但不會取消基礎異步操作。 另一種選擇是決定退出時,取消掛起的操作,但在操作完成之前不重新建立用戶界面,可能會由于取消請求而提前結束:
private CancellationTokenSource m_cts;public async void btnRun_Click(object sender, EventArgs e)
{m_cts = new CancellationTokenSource();btnRun.Enabled = false;try{Task<Bitmap> imageDownload = GetBitmapAsync(txtUrl.Text, m_cts.Token);await UntilCompletionOrCancellation(imageDownload, m_cts.Token);Bitmap image = await imageDownload;panel.AddImage(image);}catch(OperationCanceledException) {}finally { btnRun.Enabled = true; }
}
另一個早期釋放示例涉及結合使用?WhenAny?方法和?Delay?方法,下一部分將對此進行介紹。
4.5 Task.Delay
Task.Delay?方法可用于將暫停引入異步方法的執行中。 這對于許多功能都非常有用,包括構建輪詢循環和延遲預定時間段的用戶輸入處理。?Task.Delay?方法還可以與?Task.WhenAny?結合使用,以對 await 實現超時。
如果某任務屬于較大型異步操作(如 ASP.NET Web 服務)中的一部分,由于花費時間過長而不能完成,則整體操作可能會受到影響(尤其是此任務一直不能完成的情況下)。 因此,等待異步操作時可以超時非常重要。 雖然同步?Task.Wait、Task.WaitAll?和?Task.WaitAny?方法接受超時值,但相應的?TaskFactory.ContinueWhenAll/TaskFactory.ContinueWhenAny?和前述?Task.WhenAll/Task.WhenAny?方法不接受。 相反,可以將?Task.Delay?與?Task.WhenAny結合使用,以實現超時。
例如,在 UI 應用程序中,假設你想要下載圖像,并在圖像下載期間禁用該 UI。 但是,如果下載時間過長,你希望重新啟用 UI 并放棄下載:
public async void btnDownload_Click(object sender, EventArgs e)
{btnDownload.Enabled = false;try{Task<Bitmap> download = GetBitmapAsync(url);if (download == await Task.WhenAny(download, Task.Delay(3000))){Bitmap bmp = await download;pictureBox.Image = bmp;status.Text = "Downloaded";}else{pictureBox.Image = null;status.Text = "Timed out";var ignored = download.ContinueWith(t => Trace("Task finally completed"));}}finally { btnDownload.Enabled = true; }
}
這同樣適用于多個下載,因為?WhenAll?返回任務:
public async void btnDownload_Click(object sender, RoutedEventArgs e)
{btnDownload.Enabled = false;try{Task<Bitmap[]> downloads =Task.WhenAll(from url in urls select GetBitmapAsync(url));if (downloads == await Task.WhenAny(downloads, Task.Delay(3000))){foreach(var bmp in downloads.Result) panel.AddImage(bmp);status.Text = "Downloaded";}else{status.Text = "Timed out";downloads.ContinueWith(t => Log(t));}}finally { btnDownload.Enabled = true; }
}
5、構建基于任務的連結符
因為任務可以完全代表異步操作、提供同步和異步功能來加入操作、檢索其結果等,所以可以構建組成任務的連結符的庫以構建更大的模式。 如前一部分所述,.NET 包括一些內置連結符,但是,你也可以構建自己的連結符。 以下各節提供了一些潛在的連結符方法和類型的示例。
5.1 RetryOnFault
在許多情況下,如果上次嘗試失敗,你可能想要重試操作。 對于同步代碼,你可能會構建一個幫助器方法來實現此目的,如下例中的?RetryOnFault
:
public static T RetryOnFault<T>(Func<T> function, int maxTries)
{for(int i=0; i<maxTries; i++){try { return function(); }catch { if (i == maxTries-1) throw; }}return default(T);
}
你可以為異步操作(使用 TAP 實現,因此返回任務)構建幾乎相同的幫助器方法:
public static async Task<T> RetryOnFault<T>(Func<Task<T>> function, int maxTries)
{for(int i=0; i<maxTries; i++){try { return await function().ConfigureAwait(false); }catch { if (i == maxTries-1) throw; }}return default(T);
}
然后,可以使用此連結符將重試編碼到應用程序的邏輯中,例如:
// Download the URL, trying up to three times in case of failure
string pageContents = await RetryOnFault(() => DownloadStringTaskAsync(url), 3);
可以進一步擴展?RetryOnFault
?函數。 例如,該函數可以接受另一個?Func<Task>
(在重試間隔期間調用以確定何時重試該操作):
public static async Task<T> RetryOnFault<T>(Func<Task<T>> function, int maxTries, Func<Task> retryWhen)
{for(int i=0; i<maxTries; i++){try { return await function().ConfigureAwait(false); }catch { if (i == maxTries-1) throw; }await retryWhen().ConfigureAwait(false);}return default(T);
}
重試該操作前,可以使用以下函數等待片刻:
// Download the URL, trying up to three times in case of failure,
// and delaying for a second between retries
string pageContents = await RetryOnFault(() => DownloadStringTaskAsync(url), 3, () => Task.Delay(1000));
5.2 NeedOnlyOne
有時,你可以利用冗余改進操作延遲和提高成功的可能性。 假設有多個 Web 服務提供股票報價,但在一天中的不同時間,每個服務可能提供不同級別的質量和響應時間。 為了應對這些波動,你可能會向所有 Web 服務發出請求,并且只要從其中一個獲得響應,立刻取消剩余的請求。 你可以通過 helper 函數更輕松地實現此啟動多個操作的通用模式:等待任何操作,然后取消其余部分。 以下示例中的?NeedOnlyOne
?函數闡釋了這種方案:
public static async Task<T> NeedOnlyOne(params Func<CancellationToken,Task<T>> [] functions)
{var cts = new CancellationTokenSource();var tasks = (from function in functionsselect function(cts.Token)).ToArray();var completed = await Task.WhenAny(tasks).ConfigureAwait(false);cts.Cancel();foreach(var task in tasks){var ignored = task.ContinueWith(t => Log(t), TaskContinuationOptions.OnlyOnFaulted);}return completed;
}
然后,你可以使用此函數,如下所示:
double currentPrice = await NeedOnlyOne(ct => GetCurrentPriceFromServer1Async("msft", ct),ct => GetCurrentPriceFromServer2Async("msft", ct),ct => GetCurrentPriceFromServer3Async("msft", ct));
5.3 交錯操作
處理大型任務集時,如果使用?WhenAny?方法支持交錯方案,可能存在潛在性能問題。 每次調用?WhenAny?都會向每個任務注冊延續。 對于 N 個任務,這將導致在交錯操作的操作期間創建 O(N2) 次延續。 如果處理大型任務集,則可以使用連結符(以下示例中的?Interleaved
)來解決性能問題:
static IEnumerable<Task<T>> Interleaved<T>(IEnumerable<Task<T>> tasks)
{var inputTasks = tasks.ToList();var sources = (from _ in Enumerable.Range(0, inputTasks.Count)select new TaskCompletionSource<T>()).ToList();int nextTaskIndex = -1;foreach (var inputTask in inputTasks){inputTask.ContinueWith(completed =>{var source = sources[Interlocked.Increment(ref nextTaskIndex)];if (completed.IsFaulted)source.TrySetException(completed.Exception.InnerExceptions);else if (completed.IsCanceled)source.TrySetCanceled();elsesource.TrySetResult(completed.Result);}, CancellationToken.None,TaskContinuationOptions.ExecuteSynchronously,TaskScheduler.Default);}return from source in sourcesselect source.Task;
}
然后,可以在任務完成時,使用連結符來處理任務的結果,例如:
IEnumerable<Task<int>> tasks = ...;
foreach(var task in Interleaved(tasks))
{int result = await task;…
}
5.4 WhenAllOrFirstException
在特定的分散/集中情況下,你可能想要等待集中的所有任務,除非某個任務發生錯誤。在這種情況下,你希望在異常發生時停止等待。 你可以通過使用連結符方法(如?WhenAllOrFirstException
)實現該目的,如下所示:
public static Task<T[]> WhenAllOrFirstException<T>(IEnumerable<Task<T>> tasks)
{var inputs = tasks.ToList();var ce = new CountdownEvent(inputs.Count);var tcs = new TaskCompletionSource<T[]>();Action<Task> onCompleted = (Task completed) =>{if (completed.IsFaulted)tcs.TrySetException(completed.Exception.InnerExceptions);if (ce.Signal() && !tcs.Task.IsCompleted)tcs.TrySetResult(inputs.Select(t => t.Result).ToArray());};foreach (var t in inputs) t.ContinueWith(onCompleted);return tcs.Task;
}
6、構建基于任務的數據結構
除了能夠生成基于任務的自定義組合器,在?Task?和?Task<TResult>(表示異步操作結果和聯接所必需的同步操作結果)中包含數據結構,還可以使其成為功能非常強大的類型,基于該類型可生成在異步方案中使用的自定義數據結構。
6.1 AsyncCache
任務的重要方面之一是,它可能會分發到多個使用者,所有使用者都可以等待任務、向任務注冊延續、獲取任務結果或異常(如果是?Task<TResult>?的話)等。 這樣一來,Task?和?Task<TResult>?就非常適用于異步緩存基礎結構。 下面的示例演示了基于?Task<TResult>?生成的功能非常強大的小型異步緩存:
public class AsyncCache<TKey, TValue>
{private readonly Func<TKey, Task<TValue>> _valueFactory;private readonly ConcurrentDictionary<TKey, Lazy<Task<TValue>>> _map;public AsyncCache(Func<TKey, Task<TValue>> valueFactory){if (valueFactory == null) throw new ArgumentNullException("valueFactory");_valueFactory = valueFactory;_map = new ConcurrentDictionary<TKey, Lazy<Task<TValue>>>();}public Task<TValue> this[TKey key]{get{if (key == null) throw new ArgumentNullException("key");return _map.GetOrAdd(key, toAdd =>new Lazy<Task<TValue>>(() => _valueFactory(toAdd))).Value;}}
}
AsyncCache<TKey,TValue>?類接受需要使用?TKey
?且返回?Task<TResult>?的函數作為構造函數的委托。 以前從緩存訪問的所有值都存儲在內部字典中,AsyncCache
?可以確保每個密鑰僅生成一個任務,即便同時訪問緩存也是如此。
例如,你可以生成下載網頁的緩存:
private AsyncCache<string,string> m_webPages =new AsyncCache<string,string>(DownloadStringTaskAsync);
然后可以在任何需要網頁內容的時候,以異步方式使用此緩存。?AsyncCache
?類可確保下載盡可能少的頁面,并緩存結果。
private async void btnDownload_Click(object sender, RoutedEventArgs e)
{btnDownload.IsEnabled = false;try{txtContents.Text = await m_webPages["https://www.microsoft.com"];}finally { btnDownload.IsEnabled = true; }
}
AsyncProducerConsumerCollection
你還可以使用任務來構建協調異步活動的數據結構。 請考慮經典的并行設計模式之一:制造者/使用者。 在此模式下,制造者生成數據,使用者使用數據,制造者和使用者可能會并行運行。 例如,使用者處理之前由制造者生成的第 1 項,而制造者現在正在制造第 2 項。 對于制造者/使用者模式,總是需要某種數據結構來存儲制造者創建的工作,以便使用者可以收到新數據的通知并及時發現新數據。
以下是基于任務構建的簡單數據結構,可以將異步方法用作生成方和使用方:
public class AsyncProducerConsumerCollection<T>
{private readonly Queue<T> m_collection = new Queue<T>();private readonly Queue<TaskCompletionSource<T>> m_waiting =new Queue<TaskCompletionSource<T>>();public void Add(T item){TaskCompletionSource<T> tcs = null;lock (m_collection){if (m_waiting.Count > 0) tcs = m_waiting.Dequeue();else m_collection.Enqueue(item);}if (tcs != null) tcs.TrySetResult(item);}public Task<T> Take(){lock (m_collection){if (m_collection.Count > 0){return Task.FromResult(m_collection.Dequeue());}else{var tcs = new TaskCompletionSource<T>();m_waiting.Enqueue(tcs);return tcs.Task;}}}
}
通過該數據結構,可以編寫如下所示的代碼:
private static AsyncProducerConsumerCollection<int> m_data = …;
…
private static async Task ConsumerAsync()
{while(true){int nextItem = await m_data.Take();ProcessNextItem(nextItem);}
}
…
private static void Produce(int data)
{m_data.Add(data);
}
System.Threading.Tasks.Dataflow?命名空間包括?BufferBlock<T>?類型,可以類似方式使用它,但無需生成自定義集合類型:
private static BufferBlock<int> m_data = …;
…
private static async Task ConsumerAsync()
{while(true){int nextItem = await m_data.ReceiveAsync();ProcessNextItem(nextItem);}
}
…
private static void Produce(int data)
{m_data.Post(data);
}
System.Threading.Tasks.Dataflow?命名空間作為 NuGet 包提供。 若要安裝包含?System.Threading.Tasks.Dataflow?命名空間的程序集,請在 Visual Studio 中打開項目,選擇“項目”菜單中的“管理 NuGet 包”,再在線搜索?System.Threading.Tasks.Dataflow
?包。