vLLM - EngineCoreClient

EngineCoreClient是與EngineCore進行交互的基類:

  • API定義了同步和異步兩個版本。
class EngineCoreClient(ABC):@abstractmethoddef shutdown(self):...def get_output(self) -> EngineCoreOutputs:raise NotImplementedErrordef add_request(self, request: EngineCoreRequest) -> None:raise NotImplementedError...def collective_rpc(self,method: Union[str, Callable[..., _R]],timeout: Optional[float] = None,args: tuple = (),kwargs: Optional[dict[str, Any]] = None) -> list[_R]:raise NotImplementedErrordef dp_engines_running(self) -> bool:raise NotImplementedErrorasync def scale_elastic_ep(self, new_data_parallel_size: int) -> None:raise NotImplementedErrorasync def get_output_async(self) -> EngineCoreOutputs:raise NotImplementedErrorasync def add_request_async(self, request: EngineCoreRequest) -> None:raise NotImplementedError...async def collective_rpc_async(self,method: Union[str, Callable[..., _R]],timeout: Optional[float] = None,args: tuple = (),kwargs: Optional[dict[str, Any]] = None) -> list[_R]:raise NotImplementedError

InprocClient

InprocClient是EngineCoreClient的單進程子類,主要用于V0版本:

  • 在同一個進程中直接調用 EngineCore 的方法,而不需要通過IPC。
  • 避免了通信開銷,但會阻塞主線程。
class InprocClient(EngineCoreClient):def __init__(self, *args, **kwargs):self.engine_core = EngineCore(*args, **kwargs)def get_output(self) -> EngineCoreOutputs:outputs, _ = self.engine_core.step()return outputs.get(0) or EngineCoreOutputs()def add_request(self, request: EngineCoreRequest) -> None:self.engine_core.add_request(request)...

MPClient

MPClient是EngineCoreClient的多進程子類:

  • 用一個后臺進程(Background Process)來執行EngineCore
  • 使用input_socket來Push EngineCoreRequests
  • 使用output_socket來Pull EngineCoreOutputs

MPClient.__init__

MPClient.__init__完成:

  • 創建encoder和decoder,用于序列化EngineCoreRequests和反序列化EngineCoreOutputs
  • 創建zmq.Context/zmq.asyncio.Context:EngineCoreClient和EngineCore之間的交互Context。
  • 創建BackgroundResources用于釋放資源(如zmp.Context等)
  • 調用weakref.finalize將BackgroundResources綁定到self,這樣在self對象銷毀時,會自動調用BackgroundResources.__call__。
  • 如果配置了client_addresses,則使用外部創建好的EngineCore,否則則調用launch_core_engines創建EngineCore:engine_manager和coordinator。
  • 調用make_zmq_socket創建EngineCoreClient Push/Pull EngineCore的socket。
  • 根據engine_ranks為每一個RANK分配一個ZMQ identity。
  • 用input_socket接收engine_ranks發送的初始化消息,全部接收完成后說明所有的EngineCore都被正確初始化。
  • 創建pending_messages用于跟蹤socket的消息發送情況,以確保數據發送完成后,再釋放對應的資源(如Pytorch的Tensor內存)
class MPClient(EngineCoreClient):def __init__(self,asyncio_mode: bool,vllm_config: VllmConfig,executor_class: type[Executor],log_stats: bool,client_addresses: Optional[dict[str, str]] = None,):self.vllm_config = vllm_configself.encoder = MsgpackEncoder()self.decoder = MsgpackDecoder(EngineCoreOutputs)sync_ctx = zmq.Context(io_threads=2)self.ctx = zmq.asyncio.Context(sync_ctx) if asyncio_mode else sync_ctxself.resources = BackgroundResources(ctx=sync_ctx)self._finalizer = weakref.finalize(self, self.resources)success = Falsetry:self.engines_running = Falseself.stats_update_address: Optional[str] = Noneif client_addresses is not None:# Engines are managed externally to this client.input_address = client_addresses["input_address"]output_address = client_addresses["output_address"]self.stats_update_address = client_addresses.get("stats_update_address")else:with launch_core_engines(vllm_config, executor_class,log_stats) as (engine_manager,coordinator,addresses):self.resources.coordinator = coordinatorself.resources.engine_manager = engine_manager(input_address, ) = addresses.inputs(output_address, ) = addresses.outputsself.stats_update_address = (addresses.frontend_stats_publish_address)if coordinator is not None:assert self.stats_update_address == (coordinator.get_stats_publish_address())self.input_socket = self.resources.input_socket = make_zmq_socket(self.ctx, input_address, zmq.ROUTER, bind=True)self.resources.output_socket = make_zmq_socket(self.ctx, output_address, zmq.PULL)...engine_ranks = [dp_rank] if (offline_modeor external_dp_lb) else range(dp_size)self.core_engines: list[EngineIdentity] = [index.to_bytes(2, "little") for index in engine_ranks]identities = set(self.core_engines)sync_input_socket = zmq.Socket.shadow(self.input_socket)while identities:if not sync_input_socket.poll(timeout=600_000):raise TimeoutError("Timed out waiting for engines to send""initial message on input socket.")identity, _ = sync_input_socket.recv_multipart()identities.remove(identity)self.core_engine: EngineIdentity = self.core_engines[0]self.utility_results: dict[int, AnyFuture] = {}self.pending_messages = deque[tuple[zmq.MessageTracker, Any]]()success = Truefinally:if not success:self._finalizer()

launch_core_engines

launch_core_engines完成EnginCore和DPCoordinator進程的創建:

  • 創建EngineZmqAddresses:
@dataclass
class EngineZmqAddresses:inputs: list[str] # LLMEngine Push Request給EngineCoreClient的ZMQ Socket地址outputs: list[str] # LLMEngine Pull EngineCoreClient的Response的ZMQ Socket地址coordinator_input: Optional[str] = None # DPCoordinator發送EngineCoreRequest(START_DP_WAVE)給EngineCore的ZMQ Socket地址coordinator_output: Optional[str] = None # EngineCore發送EngineCoreOutputs給DPCoordinator的ZMQ Socket地址frontend_stats_publish_address: Optional[str] = None # DPCoordinator接收EngineCoreClient發布消息(如SCALE_ELASTIC_EP)的ZMQ Socket地址
  • 創建DPCoordinator:在DP(dp_size > 1)和Online模式下,在DP_RANK=0的機器上,會啟動一個DPCoordinator進程,用于DP域多機資源管理與調度。
  • 根據data_parallel_backend配置來創建對應的EngineCoreManager:如果backend是ray,則創建CoreEngineActorManager,否則創建CoreEngineProcManager。
  • 調用wait_for_engine_startup等待EnginCoreClient和EnginCore完成握手,表明EnginCoreClient代理的所有EngineCore都啟動完成。
@contextlib.contextmanager
def launch_core_engines(vllm_config: VllmConfig,executor_class: type[Executor],log_stats: bool,num_api_servers: int = 1,
) -> Iterator[tuple[Optional[Union[CoreEngineProcManager, CoreEngineActorManager]],Optional[DPCoordinator],EngineZmqAddresses,
]]:...# Set up input and output addresses.addresses = EngineZmqAddresses(inputs=[get_engine_client_zmq_addr(client_local_only, host)for _ in range(num_api_servers)],outputs=[get_engine_client_zmq_addr(client_local_only, host)for _ in range(num_api_servers)],)run_coordinator = dp_size > 1 and not offline_mode and dp_rank == 0if run_coordinator:coordinator = DPCoordinator(parallel_config)addresses.coordinator_input, addresses.coordinator_output = (coordinator.get_engine_socket_addresses())addresses.frontend_stats_publish_address = (coordinator.get_stats_publish_address())else:coordinator = Noneif parallel_config.data_parallel_backend == "ray":engine_actor_manager = CoreEngineActorManager(vllm_config=vllm_config,addresses=addresses,executor_class=executor_class,log_stats=log_stats,)yield engine_actor_manager, coordinator, addressesreturn...with zmq_socket_ctx(local_handshake_address, zmq.ROUTER,bind=True) as handshake_socket:from vllm.v1.engine.core import EngineCoreProcif local_engine_count:local_engine_manager = CoreEngineProcManager(EngineCoreProc.run_engine_core,vllm_config=vllm_config,executor_class=executor_class,log_stats=log_stats,handshake_address=handshake_address,client_handshake_address=client_handshake_address,local_client=True,local_engine_count=local_engine_count,start_index=dp_rank,local_start_index=local_start_index or 0)else:local_engine_manager = Noneyield local_engine_manager, coordinator, addresses# Now wait for engines to start.wait_for_engine_startup(handshake_socket,addresses,engines_to_handshake,parallel_config,vllm_config.cache_config,local_engine_manager,coordinator.proc if coordinator else None,)

SyncMPClient

SyncMPClient是MPClient的同步IO子類:

  • 實現了EngineCoreClient中定義的所有同步IO的API。
  • 使用queue.Queue隊列,實現同步IO。

SyncMPClient.__init__:

  • 創建一個線程,接收EngineCore發送到output_socket的消息,并加入到self.outputs_queue。
class SyncMPClient(MPClient):def __init__(self, vllm_config: VllmConfig, executor_class: type[Executor],log_stats: bool):super().__init__(asyncio_mode=False,vllm_config=vllm_config,executor_class=executor_class,log_stats=log_stats,)...self.outputs_queue = queue.Queue[Union[EngineCoreOutputs, Exception]]()ctx = self.ctxout_socket = self.resources.output_socketdecoder = self.decoderutility_results = self.utility_resultsoutputs_queue = self.outputs_queueshutdown_path = get_open_zmq_inproc_path()resources = self.resourcesresources.shutdown_path = shutdown_pathdef process_outputs_socket():shutdown_socket = ctx.socket(zmq.PAIR)try:shutdown_socket.bind(shutdown_path)poller = zmq.Poller()poller.register(shutdown_socket, zmq.POLLIN)poller.register(out_socket, zmq.POLLIN)while True:socks = poller.poll()...frames = out_socket.recv_multipart(copy=False)resources.validate_alive(frames)outputs: EngineCoreOutputs = decoder.decode(frames)if outputs.utility_output:_process_utility_output(outputs.utility_output,utility_results)else:outputs_queue.put_nowait(outputs)except Exception as e:...self.output_queue_thread = Thread(target=process_outputs_socket,name="EngineCoreOutputQueueThread",daemon=True)self.output_queue_thread.start()

SyncMPClient和EngineCore的交互主要有3類:

  • add_request:添加EngineCoreRequest
  • call_utility:遠程調用EngineCore方法
  • get_output:獲取EngineCoreOutput

SyncMPClient.add_request

  • 序列化EngineCoreRequest,會同時序列化里面的Auxiliary Buffers(比如Pytorch Tensor等)。
  • 使用input_socke發送消息給EngineCore:(Identity, RequestType, EngineCoreRequest(Serialized), Auxiliary Buffers)
  • 如果存在Auxiliary Buffers,會將消息添加到self.pending_messages,用于確保消息發送完成后,再釋放對應的內存(如Pytorch Tensor等)
class SyncMPClient(MPClient):def add_request(self, request: EngineCoreRequest) -> None:if self.is_dp:self.engines_running = Trueself._send_input(EngineCoreRequestType.ADD, request) def _send_input(self, request_type: EngineCoreRequestType, request: Any):self.ensure_alive()self.free_pending_messages()# (Identity, RequestType, SerializedRequest)msg = (self.core_engine, request_type.value,*self.encoder.encode(request))if len(msg) <= 3:# No auxiliary buffers => no tensor backing buffers in request.self.input_socket.send_multipart(msg, copy=False)returntracker = self.input_socket.send_multipart(msg, copy=False, track=True)self.add_pending_message(tracker, request)

SyncMPClient.call_utility

以profile為例:

  • 生成一個call_id
  • 使用input_socke發送消息給EngineCore:(Identity, RequestType, (0, call_id, method, args)(Serialized))
class SyncMPClient(MPClient):def profile(self, is_start: bool = True) -> None:self.call_utility("profile", is_start)def call_utility(self, method: str, *args) -> Any:call_id = uuid.uuid1().int >> 64future: Future[Any] = Future()self.utility_results[call_id] = futureself._send_input(EngineCoreRequestType.UTILITY,(0, call_id, method, args))return future.result()

SyncMPClient.get_output

在SyncMPClient.__init__中,已經獨立創建了一個線程,用于接收并解碼output_socket的消息,并放入self.outputs_queue(queue.Queue類型)。
所以SyncMPClient.get_output只需要調用self.outputs_queue.get()即可以實現同步IO:在未收到數據時,阻塞主線程。

AsyncMPClient

TODO:待補充

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/99146.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/99146.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/99146.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

幾種排序算法(2)

幾種排序算法&#xff08;2&#xff09;1冒泡排序2.快速排序2.1hoare版本找基準值2.2lomuto前后指針3.非遞歸版本快速排序4.遞歸排序5.排序算法復雜度及穩定性分析我們已經詳解了插入排序和選擇排序&#xff0c;不了解的可以翻看我上一篇博客。1冒泡排序 void BubbleSort(int*…

Excel甘特圖

1. 創建表格&#xff08;Excel2021&#xff09;只有天數是使用公式計算的選中表格按Ctrl T&#xff0c;將表格設置為超級表格2. 創建堆積條形圖3. 添加設置圖例項3.1 添加開始時間3.2 修改圖例項順序 3.3 編輯軸標簽3.4 Y軸逆序類別 3.5 添加開始時間數據標簽選擇 所用橘色圖&…

基于OpenCV的答題卡自動識別與評分系統

引言 在教育考試場景中&#xff0c;手動批改答題卡效率低下且容易出錯。本文將介紹如何使用Python和OpenCV實現一個答題卡自動識別與評分系統&#xff0c;通過計算機視覺技術完成答題卡的透視校正、選項識別和得分計算。該系統可廣泛應用于學校考試、培訓測評等場景&#xff0c…

LLaMA-MoE v2:基于后訓練混合專家模型的稀疏性探索與技術突破

重新定義大型語言模型的效率邊界在人工智能飛速發展的今天&#xff0c;大型語言模型&#xff08;LLMs&#xff09;已成為推動技術進步的核心力量。然而&#xff0c;模型規模的不斷擴大帶來了驚人的計算成本和高昂的部署門檻&#xff0c;使得眾多研究機構和中小型企業難以承擔。…

R geo 然后讀取數據的時候 make.names(vnames, unique = TRUE): invalid multibyte string 9

setwd("K:/download/geo") # 替換為實際工作目錄 # 修改get_geo_data_local函數中的讀取部分 #file_path <- "K:/download/geo/raw_data/GEO/GSE32967_series_matrix_fixed.txt" file_path <- "K:/download/geo/data/GSE32967_series_matrix.t…

深入理解 Spring @Async 注解:原理、實現與實踐

在現代 Java 應用開發中&#xff0c;異步編程是提升系統吞吐量和響應速度的關鍵技術之一。Spring 框架提供的Async注解極大簡化了異步方法的實現&#xff0c;讓開發者無需手動管理線程即可輕松實現異步操作。本文將從底層原理到實際應用&#xff0c;全面解析Async注解的工作機制…

linux C 語言開發 (七) 文件 IO 和標準 IO

文章的目的為了記錄使用C語言進行linux 開發學習的經歷。開發流程和要點有些記憶模糊&#xff0c;趕緊記錄&#xff0c;防止忘記。 相關鏈接&#xff1a; linux C 語言開發 (一) Window下用gcc編譯和gdb調試 linux C 語言開發 (二) VsCode遠程開發 linux linux C 語言開發 (…

maven , mvn 運行 項目

提示&#xff1a;環境搭建 文章目錄前言一、使用步驟1. 以構建含有 pom.xml 的項目2.mvn 運行具體項目3.mvn 指定模塊>運行具體項目前言 提示&#xff1a;版本 spirngboot 3.2 jdk 21 mvn 3.9 提示&#xff1a;以下是本篇文章正文內容&#xff0c;下面案例可供參考 一、使…

JVM垃圾回收的時機是什么時候(深入理解 JVM 垃圾回收時機:什么時候會觸發 GC?)

深入理解 JVM 垃圾回收時機&#xff1a;什么時候會觸發 GC&#xff1f;在 Java 開發中&#xff0c;我們常聽說 “JVM 會自動進行垃圾回收”&#xff0c;但很少有人能說清&#xff1a;GC 究竟在什么情況下會被觸發&#xff1f;是到固定時間就執行&#xff1f;還是內存滿了才會啟…

在Vue項目中Axios發起請求時的小知識

在Vue項目中Axios發起請求時的小知識 在Vue項目開發中&#xff0c;Axios作為基于Promise的HTTP客戶端&#xff0c;憑借其簡潔的API設計和強大的功能&#xff08;如請求/響應攔截、自動JSON轉換、取消請求等&#xff09;&#xff0c;已成為前端與后端通信的主流選擇。本文將深入…

GeoHash分級索引技術

GeoHash分級索引技術是一種將二維地理坐標轉換為一維字符串的空間索引方法,其核心是通過分級網格劃分和前綴編碼實現高效的空間數據檢索。以下從技術原理、實現細節到工程優化展開詳細解析: 一、編碼原理與分級結構 1. 經緯度二進制化 GeoHash通過遞歸二分地球表面生成網格…

HTML HTML基礎(4)

1.列表 (1).有序列表 概念&#xff1a;有順序或側重順序的列表。 <h2>要把大象放冰箱總共分幾步</h2> <ol> <li>把冰箱門打開</li> <li>把大象放進去</li> <li>把冰箱門關上</li> </ol> (2).無序列表 概念&a…

MySQL中的回表操作

在數據庫查詢&#xff08;尤其是基于 B樹索引 的關系型數據庫&#xff0c;如MySQL、PostgreSQL&#xff09;中&#xff0c;“回表”是一個核心且高頻出現的概念&#xff0c;直接影響查詢性能。要理解回表&#xff0c;需先理清索引結構與數據存儲的關聯&#xff0c;再拆解其發生…

QT子線程與GUI線程安全交互

在Qt應用程序開發中&#xff0c;涉及到多線程處理時&#xff0c;如何安全地從子線程更新UI界面是一個常見的問題。Qt的UI界面并不是線程安全的&#xff0c;意味著你不能直接在子線程中操作UI組件&#xff08;比如按鈕、標簽等&#xff09;。如果不遵循線程安全的規則&#xff0…

RL【10-2】:Actor - Critic

系列文章目錄 Fundamental Tools RL【1】&#xff1a;Basic Concepts RL【2】&#xff1a;Bellman Equation RL【3】&#xff1a;Bellman Optimality Equation Algorithm RL【4】&#xff1a;Value Iteration and Policy Iteration RL【5】&#xff1a;Monte Carlo Learnin…

開源大模型天花板?DeepSeek-V3 6710億參數MoE架構深度拆解

文章目錄認知解構&#xff1a;DeepSeek的定位與核心價值模型概述與發展歷程創立初期與技術奠基&#xff08;2023年7月-2024年11月&#xff09;里程碑一&#xff1a;MoE架構規模化突破&#xff08;2024年12月&#xff09;里程碑二&#xff1a;推理成本革命性優化&#xff08;202…

10 訓練中的一些問題

&#x1f31f; 大背景&#xff1a;訓練神經網絡 下山尋寶 訓練神經網絡就像你蒙著眼在一座大山里&#xff0c;想找最低點&#xff08;最小損失&#xff09;。你只能靠腳下的坡度&#xff08;梯度&#xff09;來決定往哪兒走。 你的位置 模型參數&#xff08;權重 www&#xf…

synchronized鎖升級的過程(從無鎖到偏向鎖,再到輕量級鎖,最后到重量級鎖的一個過程)

鎖升級是 Java 中 synchronized 鎖 的核心優化機制&#xff08;基于 JVM 的 對象頭 Mark Word 實現&#xff09;&#xff0c;指鎖的狀態從 無鎖 → 偏向鎖 → 輕量級鎖 → 重量級鎖 逐步升級的過程。其目的是通過 “按需升級”&#xff0c;在不同并發場景下選擇最優的鎖實現&am…

HOT100--Day25--84. 柱狀圖中最大的矩形,215. 數組中的第K個最大元素,347. 前 K 個高頻元素

HOT100–Day25–84. 柱狀圖中最大的矩形&#xff0c;215. 數組中的第K個最大元素&#xff0c;347. 前 K 個高頻元素 每日刷題系列。今天的題目是《力扣HOT100》題單。 題目類型&#xff1a;棧&#xff0c;堆。 84. 柱狀圖中最大的矩形 思路&#xff1a; class Solution {publ…

基于 Apache Doris 的用戶畫像數據模型設計方案

一、 需求分析與設計目標數據源&#xff1a;用戶基本信息&#xff1a;用戶ID、性別、出生日期、注冊時間、常駐地域&#xff08;省、市、區&#xff09;、職業等。用戶體檢報告&#xff1a;每次體檢的報告ID、體檢時間、各項指標&#xff08;如血壓、血糖、血脂、BMI等&#xf…