flink如何基于Pekko實現RPC調用

摘要

通過閱讀flink源碼,了解flink是如何基于Pekko實現遠程RPC調用的

Pekko實現遠程調用

Flink 的 RPC 框架底層是構建在 Pekko 的 actor 模型之上的,了解Pekko如何使用,對后續源碼的閱讀有幫助。

Apache Pekko(原為 Akka 的一個分支)是一個強大的工具包,用于構建并發、分布式和可擴展的系統。它基于經典的 Actor 模型,提供了一種事件驅動、非阻塞的編程范式,使開發者能夠更輕松地構建容錯性強、模塊化清晰的分布式應用。

引入依賴

確保你使用的是 Apache Pekko 的 Maven 依賴:

<dependencies><dependency><groupId>org.apache.pekko</groupId><artifactId>pekko-actor_2.13</artifactId><version>1.0.2</version></dependency><dependency><groupId>org.apache.pekko</groupId><artifactId>pekko-remote_2.13</artifactId><version>1.0.2</version></dependency>
</dependencies>

定義消息類(RPC 通信協議)

public class HelloRequest implements java.io.Serializable {  public final String message;  public HelloRequest(String message) {  this.message = message;  }  
}
public class HelloResponse implements java.io.Serializable {  public final String reply;  public HelloResponse(String reply) {  this.reply = reply;  }  @Override  public String toString() {  return reply;  }  
}

HelloRequestHelloResponse 是在使用 Pekko遠程通信 時的消息協議類,也就是你定義的“請求消息”和“響應消息”。它們是通過網絡在客戶端與服務端之間傳輸的,所以必須滿足可序列化(Serializable)的要求。

服務端代碼(遠程服務)

HelloActor.java

public class HelloActor extends AbstractActor {  @Override  public Receive createReceive() {  return receiveBuilder()  .match(HelloRequest.class, req -> {  System.out.println("服務端收到消息: " + req.message);  // 回復客戶端  getSender().tell(new HelloResponse("你好,客戶端,我收到了:" + req.message), getSelf());  })  .build();  }  public static Props props() {  return Props.create(HelloActor.class);  }  
}

在 Pekko 中,HelloActor 相當于傳統 RPC 框架中的服務實現類,但其處理邏輯是基于 消息驅動模型 而非方法調用。Pekko 的核心設計理念是:Actor 只對接收到的消息做出反應,并保持自身狀態獨立和可并發執行

以下是關鍵點說明:

  • createReceive() 方法 定義了該 Actor 支持的消息類型和對應的處理邏輯。使用 receiveBuilder().match(...).build() 來設置“消息類型 → 響應處理”的映射。
  • getSender().tell(...) 表示將處理結果異步返回給消息發送者,它等價于傳統 RPC 中的“返回值”機制,只不過是通過消息的方式返回。
  • Props.create(...) 返回一個 Props 實例,描述了如何構造該 Actor。這類似于構造函數的封裝工廠。Props 是 Actor 的構造“配方”,用于 ActorSystem.actorOf(...) 創建真正的 Actor 實例。

ServerApp.java

public class ServerApp {  public static void main(String[] args) {  // 使用硬編碼配置啟動遠程 ActorSystem        Config config = ConfigFactory.parseString("""  pekko.actor.provider = remote            pekko.remote.artery.canonical.hostname = "127.0.0.1"            pekko.remote.artery.canonical.port = 25520            pekko.actor.allow-java-serialization = on            pekko.actor.serialize-messages = on            """);  ActorSystem system = ActorSystem.create("ServerSystem", config);  // 啟動 HelloActor,名字是 helloActor,供客戶端遠程訪問  ActorRef actorRef = system.actorOf(HelloActor.props(), "helloActor");  System.out.println("服務端已啟動,等待遠程調用...");  }  
}

代碼說明

  • 用 Java 代碼動態構造 Pekko 配置(替代 application.conf 文件)
  • pekko.actor.serialize-messages = on 強制所有 Actor 之間發送的消息都走序列化流程(即使是本地通信)
  • ActorSystem.create(...) 創建了一個名為 ServerSystem 的遠程 Actor 系統。
  • 指定 IP 和端口為 127.0.0.1:25520,就像傳統 RPC 服務綁定地址。
  • 啟動一個名為 helloActor 的 actor,客戶端稍后通過這個名字進行訪問。

客戶端代碼

ClientApp.java

public class ClientApp {  public static void main(String[] args) throws Exception {  // 使用硬編碼配置啟動客戶端 ActorSystem,端口 0 表示隨機  Config config = ConfigFactory.parseString("""  pekko.actor.provider = remote            pekko.remote.artery.canonical.hostname = "127.0.0.1"            pekko.remote.artery.canonical.port = 0            pekko.actor.allow-java-serialization = on            pekko.actor.serialize-messages = on            """);  ActorSystem system = ActorSystem.create("ClientSystem", config);  // 遠程 actor 路徑,相當于 RPC 服務地址 + 接口名  String remotePath = "pekko://ServerSystem@127.0.0.1:25520/user/helloActor";  // 選擇遠程 actor,相當于創建客戶端 stub        ActorSelection selection = system.actorSelection(remotePath);  // 使用 ask 模式發送消息,并接收響應(模擬同步 RPC 調用)  CompletionStage<Object> future =  Patterns.ask(selection, new HelloRequest("這是來自客戶端的問候"), Duration.ofSeconds(10));  // 等待響應結果(阻塞)  future.thenApply(response -> {  if (response instanceof HelloResponse helloResponse) {  return "客戶端收到回復: " + helloResponse.reply;  } else {  return "收到未知回復: " + response;  }  })  .exceptionally(ex -> "調用失敗: " + ex.getMessage())  .thenAccept(System.out::println).toCompletableFuture().join();  system.terminate();  }  
}

代碼說明:

  • ActorSelection是一種 actor地址定位方式,它類似于 DNS 查詢,可以根據路徑去“找”一個遠程 actor
  • Patterns.ask(...) 就像傳統 RPC 的同步調用,它封裝了發送、等待響應的過程。Duration.ofSeconds(3) 指定超時時間。.get() 阻塞等待結果,實際底層是異步實現。
    Pekko(或 Akka)中,如果你不需要請求-響應(ask),而只是發送消息給 Actorfire-and-forget),你可以直接使用 ActorRef.tell(...) 方法。
// 從 ActorSystem 中選擇一個路徑為 "/user/helloActor" 的 Actor(可能還沒拿到真實引用)
// 注意:這個路徑必須匹配一個已存在的 Actor,否則會 resolve 失敗
ActorSelection selection = actorSystem.actorSelection("/user/helloActor");// 異步解析 selection,嘗試獲取對應 Actor 的真正引用 ActorRef(帶超時)
CompletionStage<ActorRef> futureRef = selection.resolveOne(Duration.ofSeconds(3));// 當成功獲取 ActorRef 后,使用 tell 發送一條消息,不需要返回(fire-and-forget)
futureRef.thenAccept(ref -> ref.tell("你好", ActorRef.noSender()));

flink的RPC框架如何使用

Flink 基于Pekko實現了自己RPC框架。當需要組件間需要使用RPC服務時,只需要定義接口、編寫服務端接口邏輯即可。FlinkRpc框架自己會完成接收遠程請求、調度線程、安全并發、處理生命周期等工作,讓你像寫本地對象一樣寫分布式服務。

本來想直接使用flinkrpc模塊創建一個簡單的demo項目來說明的,但是由于Flink使用了自定義的類加載器(如 SubmoduleClassLoader)來隔離不同模塊(尤其是用戶代碼、插件、RPC 的動態 proxy 等)導致類不可見的問題

org.flink.MyServiceGateway referenced from a method is not visible from class loader: org.apache.flink.core.classloading.SubmoduleClassLoader

所以找了flink其中一個rpc服務來進行說明

Dispatcher組件

Dispatcher集群調度的中樞組件,它的作用相當于一個集群控制器,負責接收作業、分配作業、啟動作業執行組件、以及監控作業生命周期。雖然Dispatcher只是在JobManager內使用,類似
偽分布式一樣,但其創建與使用流程和真正的遠程RPC組件是一樣的。

DIspatcher在集群啟動的時候,通過DispatcherFactory創建,StandaloneSession模式下,工廠實現類為SessionDispatcherFactory

下面以Dispatcher組件為例進行說明如何基于flinkrpc框架實現一個rpc服務。

rpc框架使用流程

使用流程大致如下:

  1. 定義 RpcGateway 接口作為rpc協議
  2. 繼承 RpcEndpoint或者FencedRpcEndpoint 并實現RpcGateWay接口
  3. 使用 RpcService 注冊服務(啟動服務端)
  4. 使用RpcService連接服務端(獲取client)

步驟1.定義 RpcGateway 接口

在這里插入圖片描述

Dispatcher的RPC接口類是DispatcherGateway, FencedRpcGatewayRpcGateway的子接口。 rpc方法的返回值必須是 CompletableFuture<T> 類型,這是 Flink RPC 框架的設計要求

步驟2. 實現服務端

StandaloneSession模式下,Dispatcher的實現類是StandaloneDispatcher,該類是Dispacher的子類。Dispatcher類繼承FencedRpcEndpoint類并實現DispatcherGateway接口
在這里插入圖片描述

RpcEndpointFlink自研RPC`框架中用于實現遠程服務端邏輯的抽象類,它幫你處理 RPC 生命周期、消息分發、線程安全調度等問題,其子類只需專注于“我要提供什么服務”即可。

步驟3. 啟動服務

通過工廠創建了Dispatcher對象后,調用其start()方法啟動服務
在這里插入圖片描述

步驟4. 遠程調用

提交job的時候,會調用dispatchersubmitJob啟動并調度該作業。
在這里插入圖片描述

gateway是一個DispatcherGateway對象,通過下面的代碼獲得到的,相當于Client。
在這里插入圖片描述

通過該對象調用接口方法即可發起遠程調用。由于Dispatcher的客戶端代碼從創建到使用的代碼分的太散了,不方便說明,下面通過一個簡單的示例來描述Client的創建流程。

CompletableFuture<MyServiceGateway> gatewayFuture = rpcService.connect("akka://flink@127.0.0.1:6123/user/myService",MyServiceGateway.class
);MyServiceGateway gateway = gatewayFuture.get();gateway.sayHello("Flink").thenAccept(System.out::println);

MyServiceGateway.class就是定義的RpcGateway接口, gateway是一個遠程代理對象了,調用它就等于遠程 RPC 調用!

Client是如何發送消息的

已知flink底層是利用Pekko來實現rpc調用的,再次回顧flink rpc示例代碼中可以想到

CompletableFuture<MyServiceGateway> gatewayFuture = rpcService.connect("akka://flink@127.0.0.1:6123/user/myService",MyServiceGateway.class
);
MyServiceGateway gateway = gatewayFuture.get();
gateway.sayHello("Flink").thenAccept(System.out::println);

該gateway對象發起遠程調用,本質上應該是使用了類似下面的代碼來發送消息的

CompletionStage<Object> future =  Patterns.ask(selection, "Flink", Duration.ofSeconds(3));

這個gateway對象是由rpcService.connect返回的. rpcService是一個RpcService接口對象,其實現就4個,排除掉測試用的就剩一個 PekkoRpcService了。

connect方法的源碼
繼續看connect方法的源碼,首先會先調用resolveActorAddress解析入參的rpc地址"akka://flink@127.0.0.1:6123/user/myService"得到一個ActorRef對象

private CompletableFuture<ActorRef> resolveActorAddress(String address) {  final ActorSelection actorSel = actorSystem.actorSelection(address);  return actorSel.resolveOne(configuration.getTimeout())  .toCompletableFuture()  .exceptionally(  error -> {  throw new CompletionException(  new RpcConnectionException(  String.format(  "Could not connect to rpc endpoint under address %s.",  address),  error));  });  
}

獲取到ActorRef后,使用 Java 的 動態代理機制創建一個實現了MyServiceGateway接口的代理對象 proxy
在這里插入圖片描述

既然是動態代理,那就得看Handler方法里面的邏輯了,創建Handler的invocationHandlerFactory代碼如下:
在這里插入圖片描述

查看對應的invoke方法會看到實際發消息的是invokeRpc
在這里插入圖片描述

所以最終actor.tell是在這里被調用的

轉換成rpc參數的邏輯如下,只是將被調用方法所需的參數與信息封裝成MethodInvocation對象
在這里插入圖片描述

Server是接收處理消息的

前面的代碼已經知道了client通過Pekko的actor發送了消息,現在要看Server這邊是怎么處理的了(找到Actor處理RpcInvocation消息)。

服務端需要繼承RpcEndpoint類,并在構建的時候傳遞rpcService對象`

final RpcService rpcService = ...; // 通常通過 PekkoRpcServiceUtils 創建
final String endpointId = "myService";MyServiceEndpoint endpoint = new MyServiceEndpoint(rpcService, endpointId);
endpoint.start();  // 啟動 RPC 服務端

查看RpcEndpoint的構造函數,可以看到利用rpcService對象啟動了一個rpcServer
在這里插入圖片描述

繼續往下看前需要了解Actor是如何處理消息的:
?ActorSystem?? 是Pekko應用的中央控制樞紐,作為單例容器管理所有Actor的層級結構和生命周期。當發送消息給遠程Actor時,ActorSystem會自動將消息序列化并通過網絡傳輸到目標節點,在遠程節點反序列化后放入目標ActorMailbox隊列,最終由目標節點的ActorSystem調度線程執行消息處理,整個過程對開發者完全透明,如同本地調用一般。

可以粗略的認為:一個Actor等同于一個Server端(輕量級),Actor內有一個隊列,當有新的消息從客戶端發送過來就放到該隊列中。然后有一個線程不斷從隊列中取消息,然后調用該 Actor 的 createReceive() 所定義的行為處理消息。

了解的Actor是如何接收信息后,繼續看PekkoRpcServicestartServer方法,其中調用下面的方法,通知另一個Actor來創建本RpcEndpoin對應的Actor
在這里插入圖片描述

那么就要找出負責創建Actor的這個supervisor(Actor)在哪里,才能繼續往下看了。

很容易就可以看到PekkoRpcService對象它的構造函數中調用下面的函數找到對應的Actor的具體類型
在這里插入圖片描述

查看SupervisroActor類的createReceive()就可以看到真正創建actor的邏輯了

@Override  
public Receive createReceive() {  return receiveBuilder()  .match(StartRpcActor.class, this::createStartRpcActorMessage)  .matchAny(this::handleUnknownMessage)  .build();  
}

flink rpc框架中,所有RpcEndpoint對應的Actor的類型都是PekkoRpcActor, 只是名字不一樣而已。在PekkoRpcActorCreateReceive()可以看到與Client發送過來的RPC消息相對應的處理邏輯。
在這里插入圖片描述

在這里插入圖片描述

通過反射調用方法,此處的rpcEndpoint就是我們繼承了RcpEndpoint的對象
在這里插入圖片描述

到此,我們就知道了服務端的業務方法是如何被調用了。

RpcService的作用

在前面介紹 Flink 中 Client 與 Server 如何工作的過程中,我們可以看到其底層是通過 Pekko實現遠程通信的。但在調用流程中,業務代碼中并沒有直接與 ActorSystemActorRef 等 Pekko 原生類打交道。這是因為 Flink 通過一層抽象 RpcService優雅地屏蔽了底層通信實現的細節

// 1. 創建 RpcService(基于 Pekko 實現)
RpcService rpcService = ...;// 2. 實例化 Dispatcher(繼承自 RpcEndpoint)
StandaloneDispatcher dispatcher = new StandaloneDispatcher(rpcService, ...);// 3. 注冊服務端
DispatcherGateway dispatcherGateway = rpcService.startServer(dispatcher);// 4. 客戶端連接(可在其他進程中執行)
rpcService.connect("pekko://flink@host:6123/user/dispatcher", DispatcherGateway.class);

如果沒有 RpcService 這一層抽象,Flink 的組件(如 Dispatcher、JobMaster)之間想要通信,就必須直接操作 Pekko 的底層 API,比如:

  • 使用 ActorSystem 創建 ActorRef
  • 使用 tell()ask() 發送消息;
  • 管理消息序列化和遠程地址;
  • 處理超時、線程調度等復雜細節。

這會導致:

  • Actor 概念侵入業務邏輯,開發就需要學習Actor相關的知識;
  • 接口強耦合通信實現,未來若切換通信框架非常困難;
  • 本地調用與遠程調用流程不統一,維護復雜。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/85723.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/85723.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/85723.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Kafka節點注冊沖突問題分析與解決

一、核心錯誤分析 ERROR Error while creating ephemeral at /brokers/ids/1, node already exists and owner does not match org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode NodeExists問題本質&#xff1a;ZooKeeper中已存在ID為1的broker節…

突破PPO訓練效率瓶頸!字節跳動提出T-PPO,推理LLM訓練速度提升2.5倍

突破PPO訓練效率瓶頸&#xff01;字節跳動提出T-PPO&#xff0c;推理LLM訓練速度提升2.5倍 在大語言模型&#xff08;LLM&#xff09;通過長思維鏈&#xff08;CoT&#xff09;展現出強大推理能力的當下&#xff0c;強化學習&#xff08;RL&#xff09;作為關鍵技術卻面臨訓練…

【Python】dictionary

1 字典功能 字典是可變容器模型&#xff0c;且可存儲任意類型對象&#xff1b; 字典的每個鍵值對 <key: value> 用冒號 : 分割&#xff0c;每個對之間用逗號(,)分割&#xff0c;整個字典包括在花括號 {} 中 ,格式如下所示&#xff1a; d {key1 : value1, key2 : value…

【python】If 語句

1 使用if 進行條件判斷 1.1 檢查字符串是否相等 car bmw car BMW # FALSEcar bmw car.upper() BMW # true # 變小寫用方法&#xff1a;lower1.2 檢查字符串是否不相等 my_car yadeaif my_car ! Audi:print("Buy one! Buy one! Buy one!")1.3 比較數字 answe…

Knife4j 使用詳解

一、概述 Knife4j 是一款基于 Swagger 的開源 API 文檔工具&#xff0c;旨在為 Java 開發者提供更美觀、功能更強大的 API 文檔生成、展示和調試體驗。它是 Swagger-Bootstrap-UI 的升級版&#xff0c;通過增強 UI 界面和擴展功能&#xff0c;解決了原生 Swagger UI 界面簡陋、…

Java excel坐標計算

package com.common.base.util.excel;/*** excel 坐標計算*/ public class UtilExcelPosi {/*** deepseek生成 ExcelProperty(index UtilExcelPosi.pA)*/public final static int pA 0;public final static int pB 1;public final static int pC 2;public final static i…

【JavaWeb】Servlet+JSP 實現分頁功能

文章目錄 思路數據抽出功能設計 功能模塊工具類前端內容用戶端數據處理 思路 數據抽出 需要顯示的數據&#xff0c;查詢的數據抽出&#xff1b;進行分頁顯示&#xff0c;需要統計抽出的件數&#xff0c;然后根據頁面顯示尺寸調整顯示頁面內容&#xff1b; 功能設計 翻頁需要…

SpringBoot-準備工作-工程搭建

目錄 1.創建空項目 2.檢查項目jdk版本 3.檢查Maven的全局配置 4.配置項目的字符集 5.創建SpringBoot工程 1.創建空項目 2.檢查項目jdk版本 3.檢查Maven的全局配置 4.配置項目的字符集 5.創建SpringBoot工程

01、python實現matlab的插值算法,以及驗證

import numpy as np from scipy.interpolate import griddata import sys def griddata_wrapper(x, y, v, xq, yq, method): """ 包裝scipy的griddata函數,支持單個點或多個點的插值 """ try: # 將輸入轉換為numpy數組…

React ahooks——useRequest

目錄 簡介 1. 核心功能 2. 基本用法 3. 高級用法 &#xff08;1&#xff09;輪詢請求&#xff08;Polling&#xff09; &#xff08;2&#xff09;防抖&#xff08;Debounce&#xff09; &#xff08;3&#xff09;依賴刷新&#xff08;refreshDeps&#xff09; &#x…

re正則、Xpath、BeautifulSouplxml 區別

目錄 1. re 正則表達式2. XPath3. BeautifulSoup + lxml4. 功能特性對比5.對比與建議在網頁數據解析中,正則表達式(re)XPath(常結合lxml)BeautifulSoup(常依賴解析器如lxml)是三種主流技術,各有核心差異和適用場景。 1. re 正則表達式 優勢:文本匹配效率高,尤其適用于…

教師辦工專用 資源包|課件+手抄報+PPT模板+常用表格 PDF格式93GB

如果家里親戚或朋友有走上教育之路的人&#xff0c;給他這份整合可以減輕不少工作負擔&#xff0c;更快地適應教育的節奏。也可以發給孩子的老師讓他在平時做個班級活動的參考 《老師教學辦工資源包》包括手抄報大全、教學計劃、工作總結、培訓手冊、課程表等教學、辦公常用資…

算法第37天| 完全背包\518. 零錢兌換 II\377. 組合總和 Ⅳ\57. 爬樓梯

完全背包 完全背包和01背包的區別 純完全背包&#xff0c;遍歷背包和物品的順序是可以對調的&#xff0c;只要求得出最大價值&#xff0c;不要求湊成總和的元素的順序&#xff1b; 01背包&#xff0c;遍歷背包和物品的順序是不可以對調的&#xff08;一維不行&#xff0c;二維…

七彩喜智慧康養平臺:重構銀發生活的數字守護網

隨著社會老齡化程度的不斷加深&#xff0c;如何讓老年人安享幸福晚年成為社會關注的焦點。 在這一背景下&#xff0c;七彩喜智慧康養平臺應運而生&#xff0c;以創新的科技手段和貼心的服務理念&#xff0c;為老年人的生活帶來了諸多好處&#xff0c;發揮著重要作用&#xff0…

【設計模式】用觀察者模式對比事件訂閱(相機舉例)

&#x1f4f7; 用觀察者模式對比事件訂閱(相機舉例) 標簽&#xff1a;WPF、C#、Halcon、設計模式、觀察者模式、事件機制 在日常開發中&#xff0c;我們經常使用 事件機制&#xff08;Event&#xff09; 來訂閱圖像采集信號。然而當系統日益復雜&#xff0c;多個模塊同時需要響…

【數據分析九:Association Rule】關聯分析

一、數據挖掘定義 數據挖掘&#xff1a; 從大量的數據中挖掘那些令人感興趣的、有用的、隱含的、先前未知的 和可能有用的 模式或知識 &#xff0c;并據此更好的服務人們的生活。 二、四類任務 數據分析有哪些任務&#xff1f; 今天我們來講述其中的關聯分析 三、關聯分析 典…

AWS Security Hub郵件告警設置

問題 需要給AWS Security Hub設置郵件告警。 前提 已經啟用AWS Security Hub。 AWS SNS 創建一個AWS Security Hub告警主題SecurityHub-Topic&#xff0c;如下圖&#xff1a; 創建完成后&#xff0c;訂閱該主題。 AWS EventBridge 設置規則名SecurityHubFindings-Rules…

(OSGB轉3DTiles強大工具)ModelSer--強大的實景三維數據分布式管理平臺

1. ModelSer 能幫我們做什么 1.1 最快速的 osgb 發布 3dtiles 服務 測試的速度大于 10G/分鐘&#xff0c;且速度基本是線性的&#xff08;100G10分鐘&#xff0c;1T100分鐘&#xff09;。支持城市級傾斜數據半天內完成服務發布&#xff0c;并支持數據的單塊更新。 1.2 支持所見…

《HTTP權威指南》 第5-6章 Web服務器和代理

基本Web服務器請求的步驟 1、建立連接 接受一個客戶端連接&#xff0c;或者如果不希望與這個客戶端建立連接&#xff0c;就將其關閉。 處理新連接客戶端主機名識別&#xff1a;反向DNS查找&#xff0c;將IP地址轉換為客戶端主機名過ident確定客戶端用戶&#xff1a;客戶端支持…

微信二次開發,對接智能客服邏輯

接口友情鏈接&#xff0c;點擊即可訪問。 ## 設備創建與復用機制 首次調用/login/getLoginQrCode需傳空appId觸發設備創建&#xff0c;響應返回固定設備ID。后續登錄必須復用此ID以避免風控&#xff08;同一微信號綁定固定設備&#xff09;。設備類型可選ipad/mac&#xff0c;當…