定義與概念
- Future:表示一個異步操作的結果。它是只讀的,意味著你只能查看操作是否完成、是否成功、獲取結果或者異常等信息,但不能主動設置操作的結果。
- Promise:是
Future
的可寫擴展。它不僅可以像Future
一樣查看操作結果,還能主動設置操作的成功、失敗或者取消狀態,并且通知所有的監聽器。
用法示例
Future 的用法
Future
通常用于獲取異步操作的結果,并且可以添加監聽器來處理操作完成后的邏輯。以下是一個簡單的示例,展示了如何使用 Future
來處理 DNS 解析結果:
dnsNameResolver.resolve(host).addListener(new FutureListener<InetAddress>() {@Overridepublic void operationComplete(Future<InetAddress> future) throws Exception {if (future.isSuccess()) {InetAddress hostAddress = future.get();// 處理解析成功的結果} else {// 處理解析失敗的情況}}
});
在這個示例中,dnsNameResolver.resolve(host)
方法返回一個 Future<InetAddress>
對象,我們通過添加 FutureListener
來監聽解析操作的完成狀態。當操作完成后,會調用 operationComplete
方法,我們可以在這個方法中處理解析結果。
Promise 的用法
Promise
主要用于主動設置異步操作的結果,并且可以通知所有的監聽器。以下是一個示例,展示了如何使用 Promise
來處理 OCSP 查詢結果:
final Promise<OCSPResp> responsePromise = eventLoop.newPromise();// 異步操作
dnsNameResolver.resolve(host).addListener(new FutureListener<InetAddress>() {@Overridepublic void operationComplete(Future<InetAddress> future) throws Exception {if (future.isSuccess()) {// 處理解析成功的結果InetAddress hostAddress = future.get();final ChannelFuture channelFuture = bootstrap.connect(hostAddress, port);channelFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) {if (future.isSuccess()) {// 處理連接成功的結果responsePromise.trySuccess(result); // 設置操作成功的結果} else {responsePromise.tryFailure(new IllegalStateException("Connection to OCSP Responder Failed", future.cause())); // 設置操作失敗的結果}}});} else {responsePromise.tryFailure(future.cause()); // 設置操作失敗的結果}}
});// 添加監聽器來處理操作結果
responsePromise.addListener(new FutureListener<OCSPResp>() {@Overridepublic void operationComplete(Future<OCSPResp> future) throws Exception {if (future.isSuccess()) {OCSPResp resp = future.get();// 處理操作成功的結果} else {// 處理操作失敗的情況}}
});
在這個示例中,我們首先創建了一個 Promise<OCSPResp>
對象 responsePromise
,然后在異步操作完成后,根據操作結果調用 trySuccess
或 tryFailure
方法來設置 Promise
的狀態。最后,我們添加了一個 FutureListener
來監聽 Promise
的完成狀態,并處理操作結果。
區別總結
- 可寫性:
Future
是只讀的,只能查看異步操作的結果,不能主動設置操作的狀態。Promise
是可寫的,可以主動設置操作的成功、失敗或者取消狀態。
- 用途:
Future
主要用于獲取異步操作的結果,并且可以添加監聽器來處理操作完成后的邏輯。Promise
主要用于在異步操作完成后,主動設置操作的結果,并且通知所有的監聽器。
- 方法差異:
Future
提供了一些方法來查看操作的狀態,如isDone()
、isSuccess()
、cause()
等。Promise
除了繼承了Future
的方法外,還提供了一些方法來設置操作的結果,如setSuccess()
、trySuccess()
、setFailure()
、tryFailure()
等。
代碼中的體現
在提供的代碼片段中,InflightNameResolver
類的 resolve
方法使用了 Promise
來處理 DNS 解析結果:
private <U> Promise<U> resolve(final ConcurrentMap<String, Promise<U>> resolveMap,final String inetHost, final Promise<U> promise, boolean resolveAll) {// ...if (resolveAll) {@SuppressWarnings("unchecked")final Promise<List<T>> castPromise = (Promise<List<T>>) promise; // U is List<T>delegate.resolveAll(inetHost, castPromise);} else {@SuppressWarnings("unchecked")final Promise<T> castPromise = (Promise<T>) promise; // U is Tdelegate.resolve(inetHost, castPromise);}// ...return promise;
}
在這個方法中,我們可以看到 Promise
被用于傳遞異步操作的結果,并且可以在操作完成后主動設置操作的狀態。
另外,PromiseNotifier
類展示了如何使用 Promise
來通知多個監聽器:
public class PromiseNotifier<V, F extends Future<V>> implements GenericFutureListener<F> {private final Promise<? super V>[] promises;public PromiseNotifier(Promise<? super V>... promises) {this.promises = promises;}@Overridepublic void operationComplete(F future) throws Exception {if (future.isSuccess()) {V result = future.get();for (Promise<? super V> p : promises) {PromiseNotificationUtil.trySuccess(p, result, null);}} else if (future.isCancelled()) {for (Promise<? super V> p : promises) {PromiseNotificationUtil.tryCancel(p, null);}} else {Throwable cause = future.cause();for (Promise<? super V> p : promises) {PromiseNotificationUtil.tryFailure(p, cause, null);}}}
}
在這個類中,我們可以看到 Promise
被用于通知多個監聽器操作的結果,并且可以根據操作的狀態調用不同的方法來設置 Promise
的狀態。
綜上所述,Future
和 Promise
在 Netty 中都是非常重要的組件,它們分別用于處理異步操作的不同方面。通過合理使用 Future
和 Promise
,可以有效地處理異步操作的結果,提高代碼的可讀性和可維護性。
處理多個順序依賴的異步操作
假設我們需要完成一個包含三個步驟的操作流程:
- 連接到服務器
- 發送認證請求并等待認證成功
- 發送業務數據并接收響應
這三個步驟必須按順序執行,后一個步驟依賴于前一個步驟的成功完成。以下是實現這種依賴關系的代碼示例:
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;public class ChannelPromiseDependencyExample {private static final String SERVER_HOST = "localhost";private static final int SERVER_PORT = 8080;public static void main(String[] args) throws Exception {EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new StringDecoder(),new StringEncoder(),new ClientHandler());}});// 創建主 Promise,用于跟蹤整個操作流程的完成狀態ChannelPromise mainPromise = bootstrap.config().group().next().newPromise();// 開始執行依賴操作鏈connectAndProcess(bootstrap, mainPromise);// 等待整個操作流程完成mainPromise.await();if (mainPromise.isSuccess()) {System.out.println("所有操作成功完成");} else {System.out.println("操作失敗: " + mainPromise.cause());}} finally {group.shutdownGracefully();}}private static void connectAndProcess(Bootstrap bootstrap, ChannelPromise mainPromise) {// 步驟 1: 連接到服務器ChannelFuture connectFuture = bootstrap.connect(SERVER_HOST, SERVER_PORT);// 為連接操作添加監聽器connectFuture.addListener((ChannelFuture future) -> {if (future.isSuccess()) {Channel channel = future.channel();System.out.println("成功連接到服務器");// 步驟 2: 發送認證請求ChannelPromise authPromise = channel.newPromise();sendAuthRequest(channel, authPromise);// 為認證操作添加監聽器authPromise.addListener((ChannelFuture authFuture) -> {if (authFuture.isSuccess()) {System.out.println("認證成功");// 步驟 3: 發送業務數據ChannelPromise businessPromise = channel.newPromise();sendBusinessData(channel, businessPromise);// 為業務操作添加監聽器businessPromise.addListener((ChannelFuture businessFuture) -> {if (businessFuture.isSuccess()) {System.out.println("業務數據處理成功");mainPromise.setSuccess(); // 標記整個操作成功} else {mainPromise.setFailure(businessFuture.cause()); // 標記整個操作失敗}channel.close(); // 關閉連接});} else {mainPromise.setFailure(authFuture.cause()); // 標記整個操作失敗channel.close(); // 關閉連接}});} else {mainPromise.setFailure(future.cause()); // 標記整個操作失敗}});}private static void sendAuthRequest(Channel channel, ChannelPromise authPromise) {// 發送認證請求channel.writeAndFlush("AUTH username password").addListener(future -> {if (future.isSuccess()) {System.out.println("認證請求已發送");// 認證結果將在 ChannelHandler 中處理} else {authPromise.setFailure(future.cause()); // 認證請求發送失敗}});}private static void sendBusinessData(Channel channel, ChannelPromise businessPromise) {// 發送業務數據channel.writeAndFlush("DATA some_business_data").addListener(future -> {if (future.isSuccess()) {System.out.println("業務數據已發送");// 業務響應將在 ChannelHandler 中處理} else {businessPromise.setFailure(future.cause()); // 業務數據發送失敗}});}static class ClientHandler extends SimpleChannelInboundHandler<String> {private ChannelPromise authPromise;private ChannelPromise businessPromise;@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {// 通道激活時,可以獲取外部的 Promise 實例// 實際應用中可能需要通過構造函數或其他方式傳遞}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {System.out.println("收到服務器響應: " + msg);// 根據響應內容判斷操作結果if (msg.startsWith("AUTH_SUCCESS")) {if (authPromise != null) {authPromise.setSuccess(); // 認證成功}} else if (msg.startsWith("AUTH_FAILURE")) {if (authPromise != null) {authPromise.setFailure(new Exception("認證失敗: " + msg)); // 認證失敗}} else if (msg.startsWith("DATA_SUCCESS")) {if (businessPromise != null) {businessPromise.setSuccess(); // 業務數據處理成功}} else if (msg.startsWith("DATA_FAILURE")) {if (businessPromise != null) {businessPromise.setFailure(new Exception("業務數據處理失敗: " + msg)); // 業務數據處理失敗}}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();// 設置所有未完成的 Promise 為失敗狀態if (authPromise != null && !authPromise.isDone()) {authPromise.setFailure(cause);}if (businessPromise != null && !businessPromise.isDone()) {businessPromise.setFailure(cause);}ctx.close();}}
}
關鍵點解析
- 創建和使用 ChannelPromise:
- 通過
EventLoop.newPromise()
或Channel.newPromise()
創建ChannelPromise
實例。 mainPromise
用于跟蹤整個操作流程的完成狀態。
- 通過
- 處理依賴關系:
- 使用
addListener()
方法為每個異步操作添加監聽器。 - 在前一個操作的監聽器中檢查操作結果,只有成功時才繼續執行下一個操作。
- 如果某個操作失敗,立即設置主
Promise
為失敗狀態并終止后續操作。
- 使用
- 在 ChannelHandler 中處理響應:
- 在
ClientHandler
中接收服務器響應,并根據響應內容設置相應的Promise
狀態。 - 這樣可以將異步響應與對應的操作關聯起來。
- 在
- 異常處理:
- 在
exceptionCaught()
方法中捕獲異常,并設置所有未完成的Promise
為失敗狀態。
- 在
更復雜的依賴關系處理
對于更復雜的依賴關系,可以使用 PromiseCombiner
來組合多個 Promise
,并在所有 Promise
都成功完成后執行后續操作。以下是一個使用 PromiseCombiner
的示例:
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;public class PromiseCombinerExample {public static void main(String[] args) throws Exception {EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new StringDecoder(),new StringEncoder(),new SimpleChannelInboundHandler<String>() {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {System.out.println("收到消息: " + msg);}});}});// 連接到多個服務器ChannelFuture future1 = bootstrap.connect("server1.example.com", 8080);ChannelFuture future2 = bootstrap.connect("server2.example.com", 8080);ChannelFuture future3 = bootstrap.connect("server3.example.com", 8080);// 創建 PromiseCombiner 來組合多個 FuturePromiseCombiner combiner = new PromiseCombiner(group.next());combiner.add(future1);combiner.add(future2);combiner.add(future3);// 創建一個 Promise 來接收組合結果ChannelPromise allConnectedPromise = group.next().newPromise();combiner.finish(allConnectedPromise);// 為組合結果添加監聽器allConnectedPromise.addListener(future -> {if (future.isSuccess()) {System.out.println("所有連接都已成功建立");// 執行后續操作} else {System.out.println("至少有一個連接失敗: " + future.cause());}});// 等待所有操作完成allConnectedPromise.await();} finally {group.shutdownGracefully();}}
}
通過 ChannelPromise
和相關工具,我們可以在 Netty 中靈活處理多個異步操作的依賴關系:
- 順序依賴:通過在前一個操作的監聽器中啟動下一個操作,實現順序執行。
- 并行依賴:使用
PromiseCombiner
等工具組合多個并行操作,等待所有操作完成后執行后續邏輯。 - 異常處理:在每個步驟中正確處理異常,并傳播給主
Promise
。 - 狀態管理:使用
Promise
跟蹤每個操作的狀態,確保操作按預期完成。
這種方式使得異步代碼更加清晰和易于維護,避免了回調地獄,提高了代碼的可讀性和可維護性。