@DubboService
public class AsyncOrderFacadeImpl implements AsyncOrderFacade {private Logger logger = LoggerFactory.getLogger(AsyncOrderFacadeImpl.class);// 構建線程池ThreadPoolExecutor threadPoolExecutor= new ThreadPoolExecutor(1000, 1000, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024));@Overridepublic OrderInfo queryOrderById(String id) {// 開啟異步化操作模式,標識異步化模式開啟AsyncContext asyncContext = RpcContext.startAsync();threadPoolExecutor.submit(() -> {// 同步queryOrderById方法所在的線程上下文信息到當前的子線程中asyncContext.signalContextSwitch();try {TimeUnit.MILLISECONDS.sleep(50);} catch (InterruptedException e) {throw new RuntimeException(e);}// 使用asyncContext將結果返回asyncContext.write(new OrderInfo("GeekDubbo", "服務方異步方式之RpcContext.startAsync#" + id, new BigDecimal(2000)));logger.info("-------------- end --------------");});return null;}
}
org.apache.dubbo.rpc.RpcContext#startAsync
public static AsyncContext startAsync() throws IllegalStateException {// 獲取上下文對象RpcContext currentContext = getContext();// AsyncContext 是Dubbo服務提供者端實現的異步處理的核心接口,用于將同步調用轉為異步執行if (currentContext.asyncContext == null) {// 獲取Dubbo上下文currentContext.asyncContext = new AsyncContextImpl();}// 創建CompletableFuture實例currentContext.asyncContext.start();return currentContext.asyncContext;
}
org.apache.dubbo.rpc.AsyncContextImpl
/*** 初始化上下文實例*/
public AsyncContextImpl() {// 獲取當前線程綁定的RpcContext實例,包含請求和響應雙向信息// 跨服務參數透傳:消費端設置隱式參數,提供端通過getAttachment獲取// 異步調用管理:通過getCompletableFuture()處理異步結果(消費端)或延遲寫入響應(提供端)this.storedContext = RpcContext.getContext();// 獲取服務端特有的響應上下文,用于回傳數據(dubbo3.x中廢棄,使用getServiceContext())]// 服務端響應附件元數據:服務端處理完成后,通過setAttachment回傳簽名,加密令牌等數據this.storedServerContext = RpcContext.getServerContext();
}
org.apache.dubbo.rpc.AsyncContextImpl#start
public void start() {if (this.started.compareAndSet(false, true)) {// 創建CompletableFuture實例// 該對象用于非阻塞式任務編排,支持鏈式調用和組合操作,簡化多線程開發復雜度this.future = new CompletableFuture<>();}
}
org.apache.dubbo.rpc.AsyncContextImpl#signalContextSwitch
/*** 將主線程中的上下文對象同步到當前線程上下文中*/
@Override
public void signalContextSwitch() {RpcContext.restoreContext(storedContext);RpcContext.restoreServerContext(storedServerContext);// Restore any other contexts in here if necessary.
}
org.apache.dubbo.rpc.AsyncContextImpl#write
@Overridepublic void write(Object value) {if (isAsyncStarted() && stop()) {if (value instanceof Throwable) {Throwable bizExe = (Throwable) value;future.completeExceptionally(bizExe);} else {// 將結果寫回future.complete(value);}} else {throw new IllegalStateException("The async response has probably been wrote back by another thread, or the asyncContext has been closed.");}}