EngineCoreClient是與EngineCore進行交互的基類:
- API定義了同步和異步兩個版本。
class EngineCoreClient(ABC):@abstractmethoddef shutdown(self):...def get_output(self) -> EngineCoreOutputs:raise NotImplementedErrordef add_request(self, request: EngineCoreRequest) -> None:raise NotImplementedError...def collective_rpc(self,method: Union[str, Callable[..., _R]],timeout: Optional[float] = None,args: tuple = (),kwargs: Optional[dict[str, Any]] = None) -> list[_R]:raise NotImplementedErrordef dp_engines_running(self) -> bool:raise NotImplementedErrorasync def scale_elastic_ep(self, new_data_parallel_size: int) -> None:raise NotImplementedErrorasync def get_output_async(self) -> EngineCoreOutputs:raise NotImplementedErrorasync def add_request_async(self, request: EngineCoreRequest) -> None:raise NotImplementedError...async def collective_rpc_async(self,method: Union[str, Callable[..., _R]],timeout: Optional[float] = None,args: tuple = (),kwargs: Optional[dict[str, Any]] = None) -> list[_R]:raise NotImplementedError
InprocClient
InprocClient是EngineCoreClient的單進程子類,主要用于V0版本:
- 在同一個進程中直接調用 EngineCore 的方法,而不需要通過IPC。
- 避免了通信開銷,但會阻塞主線程。
class InprocClient(EngineCoreClient):def __init__(self, *args, **kwargs):self.engine_core = EngineCore(*args, **kwargs)def get_output(self) -> EngineCoreOutputs:outputs, _ = self.engine_core.step()return outputs.get(0) or EngineCoreOutputs()def add_request(self, request: EngineCoreRequest) -> None:self.engine_core.add_request(request)...
MPClient
MPClient是EngineCoreClient的多進程子類:
- 用一個后臺進程(Background Process)來執行EngineCore
- 使用input_socket來Push EngineCoreRequests
- 使用output_socket來Pull EngineCoreOutputs
MPClient.__init__
MPClient.__init__完成:
- 創建encoder和decoder,用于序列化EngineCoreRequests和反序列化EngineCoreOutputs
- 創建zmq.Context/zmq.asyncio.Context:EngineCoreClient和EngineCore之間的交互Context。
- 創建BackgroundResources用于釋放資源(如zmp.Context等)
- 調用weakref.finalize將BackgroundResources綁定到self,這樣在self對象銷毀時,會自動調用BackgroundResources.__call__。
- 如果配置了client_addresses,則使用外部創建好的EngineCore,否則則調用launch_core_engines創建EngineCore:engine_manager和coordinator。
- 調用make_zmq_socket創建EngineCoreClient Push/Pull EngineCore的socket。
- 根據engine_ranks為每一個RANK分配一個ZMQ identity。
- 用input_socket接收engine_ranks發送的初始化消息,全部接收完成后說明所有的EngineCore都被正確初始化。
- 創建pending_messages用于跟蹤socket的消息發送情況,以確保數據發送完成后,再釋放對應的資源(如Pytorch的Tensor內存)
class MPClient(EngineCoreClient):def __init__(self,asyncio_mode: bool,vllm_config: VllmConfig,executor_class: type[Executor],log_stats: bool,client_addresses: Optional[dict[str, str]] = None,):self.vllm_config = vllm_configself.encoder = MsgpackEncoder()self.decoder = MsgpackDecoder(EngineCoreOutputs)sync_ctx = zmq.Context(io_threads=2)self.ctx = zmq.asyncio.Context(sync_ctx) if asyncio_mode else sync_ctxself.resources = BackgroundResources(ctx=sync_ctx)self._finalizer = weakref.finalize(self, self.resources)success = Falsetry:self.engines_running = Falseself.stats_update_address: Optional[str] = Noneif client_addresses is not None:# Engines are managed externally to this client.input_address = client_addresses["input_address"]output_address = client_addresses["output_address"]self.stats_update_address = client_addresses.get("stats_update_address")else:with launch_core_engines(vllm_config, executor_class,log_stats) as (engine_manager,coordinator,addresses):self.resources.coordinator = coordinatorself.resources.engine_manager = engine_manager(input_address, ) = addresses.inputs(output_address, ) = addresses.outputsself.stats_update_address = (addresses.frontend_stats_publish_address)if coordinator is not None:assert self.stats_update_address == (coordinator.get_stats_publish_address())self.input_socket = self.resources.input_socket = make_zmq_socket(self.ctx, input_address, zmq.ROUTER, bind=True)self.resources.output_socket = make_zmq_socket(self.ctx, output_address, zmq.PULL)...engine_ranks = [dp_rank] if (offline_modeor external_dp_lb) else range(dp_size)self.core_engines: list[EngineIdentity] = [index.to_bytes(2, "little") for index in engine_ranks]identities = set(self.core_engines)sync_input_socket = zmq.Socket.shadow(self.input_socket)while identities:if not sync_input_socket.poll(timeout=600_000):raise TimeoutError("Timed out waiting for engines to send""initial message on input socket.")identity, _ = sync_input_socket.recv_multipart()identities.remove(identity)self.core_engine: EngineIdentity = self.core_engines[0]self.utility_results: dict[int, AnyFuture] = {}self.pending_messages = deque[tuple[zmq.MessageTracker, Any]]()success = Truefinally:if not success:self._finalizer()
launch_core_engines
launch_core_engines完成EnginCore和DPCoordinator進程的創建:
- 創建EngineZmqAddresses:
@dataclass
class EngineZmqAddresses:inputs: list[str] # LLMEngine Push Request給EngineCoreClient的ZMQ Socket地址outputs: list[str] # LLMEngine Pull EngineCoreClient的Response的ZMQ Socket地址coordinator_input: Optional[str] = None # DPCoordinator發送EngineCoreRequest(START_DP_WAVE)給EngineCore的ZMQ Socket地址coordinator_output: Optional[str] = None # EngineCore發送EngineCoreOutputs給DPCoordinator的ZMQ Socket地址frontend_stats_publish_address: Optional[str] = None # DPCoordinator接收EngineCoreClient發布消息(如SCALE_ELASTIC_EP)的ZMQ Socket地址
- 創建DPCoordinator:在DP(dp_size > 1)和Online模式下,在DP_RANK=0的機器上,會啟動一個DPCoordinator進程,用于DP域多機資源管理與調度。
- 根據data_parallel_backend配置來創建對應的EngineCoreManager:如果backend是ray,則創建CoreEngineActorManager,否則創建CoreEngineProcManager。
- 調用wait_for_engine_startup等待EnginCoreClient和EnginCore完成握手,表明EnginCoreClient代理的所有EngineCore都啟動完成。
@contextlib.contextmanager
def launch_core_engines(vllm_config: VllmConfig,executor_class: type[Executor],log_stats: bool,num_api_servers: int = 1,
) -> Iterator[tuple[Optional[Union[CoreEngineProcManager, CoreEngineActorManager]],Optional[DPCoordinator],EngineZmqAddresses,
]]:...# Set up input and output addresses.addresses = EngineZmqAddresses(inputs=[get_engine_client_zmq_addr(client_local_only, host)for _ in range(num_api_servers)],outputs=[get_engine_client_zmq_addr(client_local_only, host)for _ in range(num_api_servers)],)run_coordinator = dp_size > 1 and not offline_mode and dp_rank == 0if run_coordinator:coordinator = DPCoordinator(parallel_config)addresses.coordinator_input, addresses.coordinator_output = (coordinator.get_engine_socket_addresses())addresses.frontend_stats_publish_address = (coordinator.get_stats_publish_address())else:coordinator = Noneif parallel_config.data_parallel_backend == "ray":engine_actor_manager = CoreEngineActorManager(vllm_config=vllm_config,addresses=addresses,executor_class=executor_class,log_stats=log_stats,)yield engine_actor_manager, coordinator, addressesreturn...with zmq_socket_ctx(local_handshake_address, zmq.ROUTER,bind=True) as handshake_socket:from vllm.v1.engine.core import EngineCoreProcif local_engine_count:local_engine_manager = CoreEngineProcManager(EngineCoreProc.run_engine_core,vllm_config=vllm_config,executor_class=executor_class,log_stats=log_stats,handshake_address=handshake_address,client_handshake_address=client_handshake_address,local_client=True,local_engine_count=local_engine_count,start_index=dp_rank,local_start_index=local_start_index or 0)else:local_engine_manager = Noneyield local_engine_manager, coordinator, addresses# Now wait for engines to start.wait_for_engine_startup(handshake_socket,addresses,engines_to_handshake,parallel_config,vllm_config.cache_config,local_engine_manager,coordinator.proc if coordinator else None,)
SyncMPClient
SyncMPClient是MPClient的同步IO子類:
- 實現了EngineCoreClient中定義的所有同步IO的API。
- 使用queue.Queue隊列,實現同步IO。
SyncMPClient.__init__:
- 創建一個線程,接收EngineCore發送到output_socket的消息,并加入到self.outputs_queue。
class SyncMPClient(MPClient):def __init__(self, vllm_config: VllmConfig, executor_class: type[Executor],log_stats: bool):super().__init__(asyncio_mode=False,vllm_config=vllm_config,executor_class=executor_class,log_stats=log_stats,)...self.outputs_queue = queue.Queue[Union[EngineCoreOutputs, Exception]]()ctx = self.ctxout_socket = self.resources.output_socketdecoder = self.decoderutility_results = self.utility_resultsoutputs_queue = self.outputs_queueshutdown_path = get_open_zmq_inproc_path()resources = self.resourcesresources.shutdown_path = shutdown_pathdef process_outputs_socket():shutdown_socket = ctx.socket(zmq.PAIR)try:shutdown_socket.bind(shutdown_path)poller = zmq.Poller()poller.register(shutdown_socket, zmq.POLLIN)poller.register(out_socket, zmq.POLLIN)while True:socks = poller.poll()...frames = out_socket.recv_multipart(copy=False)resources.validate_alive(frames)outputs: EngineCoreOutputs = decoder.decode(frames)if outputs.utility_output:_process_utility_output(outputs.utility_output,utility_results)else:outputs_queue.put_nowait(outputs)except Exception as e:...self.output_queue_thread = Thread(target=process_outputs_socket,name="EngineCoreOutputQueueThread",daemon=True)self.output_queue_thread.start()
SyncMPClient和EngineCore的交互主要有3類:
- add_request:添加EngineCoreRequest
- call_utility:遠程調用EngineCore方法
- get_output:獲取EngineCoreOutput
SyncMPClient.add_request
- 序列化EngineCoreRequest,會同時序列化里面的Auxiliary Buffers(比如Pytorch Tensor等)。
- 使用input_socke發送消息給EngineCore:(Identity, RequestType, EngineCoreRequest(Serialized), Auxiliary Buffers)
- 如果存在Auxiliary Buffers,會將消息添加到self.pending_messages,用于確保消息發送完成后,再釋放對應的內存(如Pytorch Tensor等)
class SyncMPClient(MPClient):def add_request(self, request: EngineCoreRequest) -> None:if self.is_dp:self.engines_running = Trueself._send_input(EngineCoreRequestType.ADD, request) def _send_input(self, request_type: EngineCoreRequestType, request: Any):self.ensure_alive()self.free_pending_messages()# (Identity, RequestType, SerializedRequest)msg = (self.core_engine, request_type.value,*self.encoder.encode(request))if len(msg) <= 3:# No auxiliary buffers => no tensor backing buffers in request.self.input_socket.send_multipart(msg, copy=False)returntracker = self.input_socket.send_multipart(msg, copy=False, track=True)self.add_pending_message(tracker, request)
SyncMPClient.call_utility
以profile為例:
- 生成一個call_id
- 使用input_socke發送消息給EngineCore:(Identity, RequestType, (0, call_id, method, args)(Serialized))
class SyncMPClient(MPClient):def profile(self, is_start: bool = True) -> None:self.call_utility("profile", is_start)def call_utility(self, method: str, *args) -> Any:call_id = uuid.uuid1().int >> 64future: Future[Any] = Future()self.utility_results[call_id] = futureself._send_input(EngineCoreRequestType.UTILITY,(0, call_id, method, args))return future.result()
SyncMPClient.get_output
在SyncMPClient.__init__中,已經獨立創建了一個線程,用于接收并解碼output_socket的消息,并放入self.outputs_queue(queue.Queue類型)。
所以SyncMPClient.get_output只需要調用self.outputs_queue.get()即可以實現同步IO:在未收到數據時,阻塞主線程。
AsyncMPClient
TODO:待補充