Netty中future和promise用法和區別

定義與概念

  • Future:表示一個異步操作的結果。它是只讀的,意味著你只能查看操作是否完成、是否成功、獲取結果或者異常等信息,但不能主動設置操作的結果。
  • Promise:是 Future 的可寫擴展。它不僅可以像 Future 一樣查看操作結果,還能主動設置操作的成功、失敗或者取消狀態,并且通知所有的監聽器。

用法示例

Future 的用法

Future 通常用于獲取異步操作的結果,并且可以添加監聽器來處理操作完成后的邏輯。以下是一個簡單的示例,展示了如何使用 Future 來處理 DNS 解析結果:

dnsNameResolver.resolve(host).addListener(new FutureListener<InetAddress>() {@Overridepublic void operationComplete(Future<InetAddress> future) throws Exception {if (future.isSuccess()) {InetAddress hostAddress = future.get();// 處理解析成功的結果} else {// 處理解析失敗的情況}}
});

在這個示例中,dnsNameResolver.resolve(host) 方法返回一個 Future<InetAddress> 對象,我們通過添加 FutureListener 來監聽解析操作的完成狀態。當操作完成后,會調用 operationComplete 方法,我們可以在這個方法中處理解析結果。

Promise 的用法

Promise 主要用于主動設置異步操作的結果,并且可以通知所有的監聽器。以下是一個示例,展示了如何使用 Promise 來處理 OCSP 查詢結果:

final Promise<OCSPResp> responsePromise = eventLoop.newPromise();// 異步操作
dnsNameResolver.resolve(host).addListener(new FutureListener<InetAddress>() {@Overridepublic void operationComplete(Future<InetAddress> future) throws Exception {if (future.isSuccess()) {// 處理解析成功的結果InetAddress hostAddress = future.get();final ChannelFuture channelFuture = bootstrap.connect(hostAddress, port);channelFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) {if (future.isSuccess()) {// 處理連接成功的結果responsePromise.trySuccess(result); // 設置操作成功的結果} else {responsePromise.tryFailure(new IllegalStateException("Connection to OCSP Responder Failed", future.cause())); // 設置操作失敗的結果}}});} else {responsePromise.tryFailure(future.cause()); // 設置操作失敗的結果}}
});// 添加監聽器來處理操作結果
responsePromise.addListener(new FutureListener<OCSPResp>() {@Overridepublic void operationComplete(Future<OCSPResp> future) throws Exception {if (future.isSuccess()) {OCSPResp resp = future.get();// 處理操作成功的結果} else {// 處理操作失敗的情況}}
});

在這個示例中,我們首先創建了一個 Promise<OCSPResp> 對象 responsePromise,然后在異步操作完成后,根據操作結果調用 trySuccesstryFailure 方法來設置 Promise 的狀態。最后,我們添加了一個 FutureListener 來監聽 Promise 的完成狀態,并處理操作結果。

區別總結

  • 可寫性:
    • Future 是只讀的,只能查看異步操作的結果,不能主動設置操作的狀態。
    • Promise 是可寫的,可以主動設置操作的成功、失敗或者取消狀態。
  • 用途:
    • Future 主要用于獲取異步操作的結果,并且可以添加監聽器來處理操作完成后的邏輯。
    • Promise 主要用于在異步操作完成后,主動設置操作的結果,并且通知所有的監聽器。
  • 方法差異:
    • Future 提供了一些方法來查看操作的狀態,如 isDone()isSuccess()cause() 等。
    • Promise 除了繼承了 Future 的方法外,還提供了一些方法來設置操作的結果,如 setSuccess()trySuccess()setFailure()tryFailure() 等。

代碼中的體現

在提供的代碼片段中,InflightNameResolver 類的 resolve 方法使用了 Promise 來處理 DNS 解析結果:

private <U> Promise<U> resolve(final ConcurrentMap<String, Promise<U>> resolveMap,final String inetHost, final Promise<U> promise, boolean resolveAll) {// ...if (resolveAll) {@SuppressWarnings("unchecked")final Promise<List<T>> castPromise = (Promise<List<T>>) promise; // U is List<T>delegate.resolveAll(inetHost, castPromise);} else {@SuppressWarnings("unchecked")final Promise<T> castPromise = (Promise<T>) promise; // U is Tdelegate.resolve(inetHost, castPromise);}// ...return promise;
}

在這個方法中,我們可以看到 Promise 被用于傳遞異步操作的結果,并且可以在操作完成后主動設置操作的狀態。

另外,PromiseNotifier 類展示了如何使用 Promise 來通知多個監聽器:

public class PromiseNotifier<V, F extends Future<V>> implements GenericFutureListener<F> {private final Promise<? super V>[] promises;public PromiseNotifier(Promise<? super V>... promises) {this.promises = promises;}@Overridepublic void operationComplete(F future) throws Exception {if (future.isSuccess()) {V result = future.get();for (Promise<? super V> p : promises) {PromiseNotificationUtil.trySuccess(p, result, null);}} else if (future.isCancelled()) {for (Promise<? super V> p : promises) {PromiseNotificationUtil.tryCancel(p, null);}} else {Throwable cause = future.cause();for (Promise<? super V> p : promises) {PromiseNotificationUtil.tryFailure(p, cause, null);}}}
}

在這個類中,我們可以看到 Promise 被用于通知多個監聽器操作的結果,并且可以根據操作的狀態調用不同的方法來設置 Promise 的狀態。

綜上所述,FuturePromise 在 Netty 中都是非常重要的組件,它們分別用于處理異步操作的不同方面。通過合理使用 FuturePromise,可以有效地處理異步操作的結果,提高代碼的可讀性和可維護性。

處理多個順序依賴的異步操作

假設我們需要完成一個包含三個步驟的操作流程:

  1. 連接到服務器
  2. 發送認證請求并等待認證成功
  3. 發送業務數據并接收響應

這三個步驟必須按順序執行,后一個步驟依賴于前一個步驟的成功完成。以下是實現這種依賴關系的代碼示例:

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;public class ChannelPromiseDependencyExample {private static final String SERVER_HOST = "localhost";private static final int SERVER_PORT = 8080;public static void main(String[] args) throws Exception {EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new StringDecoder(),new StringEncoder(),new ClientHandler());}});// 創建主 Promise,用于跟蹤整個操作流程的完成狀態ChannelPromise mainPromise = bootstrap.config().group().next().newPromise();// 開始執行依賴操作鏈connectAndProcess(bootstrap, mainPromise);// 等待整個操作流程完成mainPromise.await();if (mainPromise.isSuccess()) {System.out.println("所有操作成功完成");} else {System.out.println("操作失敗: " + mainPromise.cause());}} finally {group.shutdownGracefully();}}private static void connectAndProcess(Bootstrap bootstrap, ChannelPromise mainPromise) {// 步驟 1: 連接到服務器ChannelFuture connectFuture = bootstrap.connect(SERVER_HOST, SERVER_PORT);// 為連接操作添加監聽器connectFuture.addListener((ChannelFuture future) -> {if (future.isSuccess()) {Channel channel = future.channel();System.out.println("成功連接到服務器");// 步驟 2: 發送認證請求ChannelPromise authPromise = channel.newPromise();sendAuthRequest(channel, authPromise);// 為認證操作添加監聽器authPromise.addListener((ChannelFuture authFuture) -> {if (authFuture.isSuccess()) {System.out.println("認證成功");// 步驟 3: 發送業務數據ChannelPromise businessPromise = channel.newPromise();sendBusinessData(channel, businessPromise);// 為業務操作添加監聽器businessPromise.addListener((ChannelFuture businessFuture) -> {if (businessFuture.isSuccess()) {System.out.println("業務數據處理成功");mainPromise.setSuccess(); // 標記整個操作成功} else {mainPromise.setFailure(businessFuture.cause()); // 標記整個操作失敗}channel.close(); // 關閉連接});} else {mainPromise.setFailure(authFuture.cause()); // 標記整個操作失敗channel.close(); // 關閉連接}});} else {mainPromise.setFailure(future.cause()); // 標記整個操作失敗}});}private static void sendAuthRequest(Channel channel, ChannelPromise authPromise) {// 發送認證請求channel.writeAndFlush("AUTH username password").addListener(future -> {if (future.isSuccess()) {System.out.println("認證請求已發送");// 認證結果將在 ChannelHandler 中處理} else {authPromise.setFailure(future.cause()); // 認證請求發送失敗}});}private static void sendBusinessData(Channel channel, ChannelPromise businessPromise) {// 發送業務數據channel.writeAndFlush("DATA some_business_data").addListener(future -> {if (future.isSuccess()) {System.out.println("業務數據已發送");// 業務響應將在 ChannelHandler 中處理} else {businessPromise.setFailure(future.cause()); // 業務數據發送失敗}});}static class ClientHandler extends SimpleChannelInboundHandler<String> {private ChannelPromise authPromise;private ChannelPromise businessPromise;@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {// 通道激活時,可以獲取外部的 Promise 實例// 實際應用中可能需要通過構造函數或其他方式傳遞}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {System.out.println("收到服務器響應: " + msg);// 根據響應內容判斷操作結果if (msg.startsWith("AUTH_SUCCESS")) {if (authPromise != null) {authPromise.setSuccess(); // 認證成功}} else if (msg.startsWith("AUTH_FAILURE")) {if (authPromise != null) {authPromise.setFailure(new Exception("認證失敗: " + msg)); // 認證失敗}} else if (msg.startsWith("DATA_SUCCESS")) {if (businessPromise != null) {businessPromise.setSuccess(); // 業務數據處理成功}} else if (msg.startsWith("DATA_FAILURE")) {if (businessPromise != null) {businessPromise.setFailure(new Exception("業務數據處理失敗: " + msg)); // 業務數據處理失敗}}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();// 設置所有未完成的 Promise 為失敗狀態if (authPromise != null && !authPromise.isDone()) {authPromise.setFailure(cause);}if (businessPromise != null && !businessPromise.isDone()) {businessPromise.setFailure(cause);}ctx.close();}}
}

關鍵點解析

  1. 創建和使用 ChannelPromise
    • 通過 EventLoop.newPromise()Channel.newPromise() 創建 ChannelPromise 實例。
    • mainPromise 用于跟蹤整個操作流程的完成狀態。
  2. 處理依賴關系
    • 使用 addListener() 方法為每個異步操作添加監聽器。
    • 在前一個操作的監聽器中檢查操作結果,只有成功時才繼續執行下一個操作。
    • 如果某個操作失敗,立即設置主 Promise 為失敗狀態并終止后續操作。
  3. 在 ChannelHandler 中處理響應
    • ClientHandler 中接收服務器響應,并根據響應內容設置相應的 Promise 狀態。
    • 這樣可以將異步響應與對應的操作關聯起來。
  4. 異常處理
    • exceptionCaught() 方法中捕獲異常,并設置所有未完成的 Promise 為失敗狀態。

更復雜的依賴關系處理

對于更復雜的依賴關系,可以使用 PromiseCombiner 來組合多個 Promise,并在所有 Promise 都成功完成后執行后續操作。以下是一個使用 PromiseCombiner 的示例:

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;public class PromiseCombinerExample {public static void main(String[] args) throws Exception {EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new StringDecoder(),new StringEncoder(),new SimpleChannelInboundHandler<String>() {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {System.out.println("收到消息: " + msg);}});}});// 連接到多個服務器ChannelFuture future1 = bootstrap.connect("server1.example.com", 8080);ChannelFuture future2 = bootstrap.connect("server2.example.com", 8080);ChannelFuture future3 = bootstrap.connect("server3.example.com", 8080);// 創建 PromiseCombiner 來組合多個 FuturePromiseCombiner combiner = new PromiseCombiner(group.next());combiner.add(future1);combiner.add(future2);combiner.add(future3);// 創建一個 Promise 來接收組合結果ChannelPromise allConnectedPromise = group.next().newPromise();combiner.finish(allConnectedPromise);// 為組合結果添加監聽器allConnectedPromise.addListener(future -> {if (future.isSuccess()) {System.out.println("所有連接都已成功建立");// 執行后續操作} else {System.out.println("至少有一個連接失敗: " + future.cause());}});// 等待所有操作完成allConnectedPromise.await();} finally {group.shutdownGracefully();}}
}

通過 ChannelPromise 和相關工具,我們可以在 Netty 中靈活處理多個異步操作的依賴關系:

  1. 順序依賴:通過在前一個操作的監聽器中啟動下一個操作,實現順序執行。
  2. 并行依賴:使用 PromiseCombiner 等工具組合多個并行操作,等待所有操作完成后執行后續邏輯。
  3. 異常處理:在每個步驟中正確處理異常,并傳播給主 Promise
  4. 狀態管理:使用 Promise 跟蹤每個操作的狀態,確保操作按預期完成。

這種方式使得異步代碼更加清晰和易于維護,避免了回調地獄,提高了代碼的可讀性和可維護性。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/92822.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/92822.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/92822.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

微算法科技(NASDAQ:MLGO)采用分布式哈希表優化區塊鏈索引結構,提高區塊鏈檢索效率

隨著區塊鏈技術的快速發展&#xff0c;其在各個領域的應用越來越廣泛。然而&#xff0c;區塊鏈數據的存儲和檢索效率問題一直是制約其發展的瓶頸之一。為了解決這一問題&#xff0c;微算法科技(NASDAQ&#xff1a;MLGO)采用了分布式哈希表&#xff08;DHT&#xff09;技術來優化…

Jmeter的元件使用介紹:(三)配置元件詳解01

Jmeter的配置元件有非常多&#xff0c;常用的有&#xff1a;信息頭管理器、Cookie管理器、用戶定義的變量、Http請求默認值、JDBC Connection Configuration、CSV 數據文件設置、計數器等&#xff0c;本文會對這些常用的配置元件一一介紹&#xff0c;還有其他很多配置元件&…

git 連接GitHub倉庫

一、安裝 git 包在官網下載 git 包二、通過SSH密鑰與GitHub遠程倉庫連接1. 檢查本地 SSH 密鑰是否存在ls -al ~/.ssh如果看到 id_rsa 和 id_rsa.pub&#xff0c;說明已有密鑰。2.如果沒有&#xff0c;生成新的 SSH 密鑰&#xff1a;ssh-keygen -t ed25519 -C "your_email…

如何通過AI掃描代碼中的問題

代碼質量其實在需求高壓&#xff0c;業務快速迭代的場景下往往容易被人忽視的問題&#xff0c;大家的編碼習慣和規范也經常會各有喜好&#xff0c;短期之內獲取看不出來什么問題&#xff0c;但長此以往就會發現&#xff0c;屎山逐步成型了&#xff0c;而線上代碼跑著往往就不想…

Java 大視界 -- Java 大數據機器學習模型在金融衍生品市場波動特征挖掘與交易策略創新中的應用(363)

Java 大視界 -- Java 大數據機器學習模型在金融衍生品市場波動特征挖掘與交易策略創新中的應用&#xff08;363&#xff09;引言&#xff1a;正文&#xff1a;一、Java 構建的金融數據處理架構1.1 多源異構數據實時融合1.2 新聞輿情與市場沖擊建模二、Java 驅動的波動特征挖掘與…

Cartographer安裝測試與模塊開發(三)--Cartographer在Gazebo仿真環境下的建圖以及建圖與定位階段問題(實車也可參考)

參數介紹之所以要首先介紹參數而不是實操&#xff0c;是因為大部分建圖失敗、漂移基本上都是參數設置錯誤引起的&#xff0c;或者說大部分都是TF存在問題&#xff0c;主要是坐標系Frame之間有沖突或者對不上等原因導致的&#xff0c;因此把參數放在前面介紹&#xff0c;了解了參…

uniapp nvue開發App 橫豎屏切換丟失上下文導致 setTimeout和clearTimeout報錯

報錯內容如下 [JS Framework] Failed to find taskCenter (35). [JS Framework] Failed to execute the callback function:TypeError: c.clearTimeout is not a function reportJSException >>>> exception function:__WEEX_CALL_JAVASCRIPT__, exception:JavaSc…

Mirauge3D 賦能:全自動建模,讓城市規劃與建筑設計擁有高分辨率實景三維模型

在數字化浪潮席卷各行各業的當下&#xff0c;高精度、多元化的空間數據已成為基礎測繪、智慧城市建設、自然資源管理等領域高質量發展的核心支撐。從城市交通網絡的智能規劃到國土空間的優化配置&#xff0c;從災害監測的精準預警到生態環境保護的科學決策&#xff0c;空間數據…

Javaweb————學習javaweb的預備知識

??????一.javase,javaweb,javaee的區別和聯系 &#x1f499;&#x1f499;&#x1f499;javase: 通俗的來講就是java技術棧&#xff0c;做java相關開發的基礎&#xff0c;比如javaweb&#xff0c;javaee開發都是必備javase的基礎的&#xff0c;包括java語言基礎&#xff…

zabbix服務自動發現、自動注冊及配置釘釘告警(小白的“升級打怪”成長之路)

目錄 一、自動發現及自動注冊 1、自動發現 2、自動注冊規則 二、監控告警并發送電子郵件 1、設定發郵件的地址 2、設定發郵件的用戶 3、設定監控及觸發的條件 4、開始告警并設置觸發發郵件 三、釘釘告警 1、配置zabbix-server 2、配置監控及觸發 3、web頁面操作 4、…

OSPF多區域

OSPF多區域劃分的必要性 OSPF單區域存在的問題 LSDB 龐大&#xff0c;占用內存大&#xff0c;SPF計算開銷大。 LSA洪泛范圍大&#xff0c;拓撲變化影響范圍大。 路由不能被匯總&#xff0c;路由表龐大&#xff0c;查找路由開銷大 解決辦法 劃分區域可以解決上述問題 每個區域獨…

質數、因數、最大公約數經典問題整理

1、計數質數 MX 5000000 is_prime [1] * MX is_prime[0] is_prime[1] 0 for i in range(2, MX):if is_prime[i]:for j in range(i * i, MX, i):is_prime[j] 0class Solution:def countPrimes(self, n: int) -> int:return sum(is_prime[:n]) 2、序列中不同最大公約數的…

Java NIO FileChannel在大文件傳輸中的性能優化實踐指南

Java NIO FileChannel在大文件傳輸中的性能優化實踐指南 在現代分布式系統中&#xff0c;海量數據的存儲與傳輸成為常見需求。Java NIO引入的FileChannel提供了高效的文件讀寫能力&#xff0c;尤其適合大文件傳輸場景。本文從原理深度解析出發&#xff0c;結合生產環境實戰經驗…

SQLite Insert 語句詳解

SQLite Insert 語句詳解 SQLite 是一種輕量級的數據庫管理系統,它以其簡潔的設計、強大的功能和易于使用而聞名。在 SQLite 中,INSERT 語句用于向數據庫表中添加新數據。本文將詳細介紹 SQLite 的 INSERT 語句,包括其基本語法、使用方法以及一些高級特性。 基本語法 SQLi…

git更新內核補丁完整指南

Git操作完整指南 ?? 目錄 項目概述 Git基礎配置 日常操作流程 補丁更新操作 分支管理 沖突解決 常見問題 最佳實踐 命令速查表 ?? 項目概述 </

關于回歸決策樹CART生成算法中的最優化算法詳解

首先&#xff0c;一共比如有M個特征&#xff0c;N個樣本&#xff0c;對于每一個特征j&#xff0c;遍歷其中的N個樣本&#xff0c;得到N個值中&#xff0c;最小的值&#xff0c;作為這個特征的最優切分點&#xff0c;而其中的c1&#xff0c;c2是可以直接得到的。然后&#xff0c…

Ubuntu 環境下創建并啟動一個 MediaMTX 的 systemd 服務

文章目錄一、簡介二、安裝及使用三、創建系統服務小結一、簡介 MediaMTX 是一個現代、高性能、跨平臺的 流媒體服務器&#xff0c;主要用于接收、轉發、轉碼和分發 音視頻流&#xff0c;支持多種協議。它的前身是 rtsp-simple-server&#xff0c;后來重命名為 MediaMTX&#x…

在React中,函數式組件和類組件各有優缺點

函數式組件&#xff1a;無this&#xff0c;無生命周期&#xff0c;配合使用useEffect&#xff0c; 可使用Hooks。 類組件&#xff1a;有生命周期&#xff0c;狀態管理&#xff0c;無Hooks&#xff0c;適用于需要明確生命周期方法和實例方法的場景。 函數式組件 優點&#xff1a…

【SketchUp插件推薦】Profile Builder 4.0 中文版下載安裝使用教程(含語言設置圖解)

一、插件簡介 Profile Builder 4.0 是一款適用于 SketchUp 2017-2024 的高效參數化建模插件&#xff0c;中文名稱為「參數化造型建模工具」。該插件基于參數化設計原理&#xff0c;允許用戶通過簡單的路徑定義和參數設定&#xff0c;快速生成智能模型&#xff0c;從而大幅提高…

【小沐學GIS】基于Unity3d繪制三維數字地球Earth(Unity3d、OpenGL、GIS)

&#x1f37a;三維數字地球GIS系列相關文章如下&#x1f37a;&#xff1a;1【小沐學GIS】基于C繪制三維數字地球Earth&#xff08;OpenGL、glfw、glut&#xff09;第一期2【小沐學GIS】基于C繪制三維數字地球Earth&#xff08;OpenGL、glfw、glut&#xff09;第二期3【小沐學GI…