基于Python實現生產者—消費者分布式消息隊列:構建高可用異步通信系統

深入剖析分布式消息隊列的核心原理與Python實現,附完整架構設計和代碼實現

引言:分布式系統的通信基石

在微服務架構和云原生應用普及的今天,服務間的異步通信成為系統設計的核心挑戰。當單體應用拆分為數十個微服務后,服務間通信呈現出新的特征:

  • 網絡不可靠:節點故障、網絡分區成為常態

  • 流量波動:突發流量可能壓垮接收方

  • 數據一致性:跨服務事務難以保證原子性

  • 服務解耦:生產者不應依賴消費者可用性

分布式消息隊列(Distributed Message Queue)正是解決這些挑戰的利器。它通過異步通信持久化存儲實現了服務間的松耦合,為現代分布式系統提供可靠的數據傳輸通道。

一、分布式消息隊列核心架構

1.1 基本組成元素

1.2 核心組件詳解
組件職責關鍵特性
生產者消息創建和發布負載均衡、失敗重試
消息代理消息路由和存儲高可用、持久化、分區
消費者組消息處理并行消費、負載均衡
注冊中心服務發現心跳檢測、元數據管理
  1. 生產者:消息創建方,需實現負載均衡和重試機制

  2. 消息代理:核心路由層,負責消息存儲/分發/持久化

  3. 消費者組:消息處理單元,支持并行消費

  4. 注冊中心:服務發現樞紐,管理動態節點狀態
    表格詳細對比消息隊列的五大核心特性,強調順序性、持久化和容錯機制的設計必要性。

1.3 消息隊列核心特性
  1. 消息有序性:分區內順序保證

  2. 持久化存儲:磁盤+副本機制

  3. 至少一次投遞:ACK確認機制

  4. 消費進度跟蹤:Offset管理

  5. 死信隊列:處理失敗消息

二、Python實現分布式消息隊列

我們使用Python構建一個輕量級分布式消息隊列,包含以下模塊:

project/
├── broker/          # 消息代理
│   ├── server.py     # 主服務
│   ├── partition.py  # 分區管理
│   └── storage.py    # 存儲引擎
├── client/          # 客戶端SDK
│   ├── producer.py   # 生產者
│   └── consumer.py   # 消費者
├── registry/        # 注冊中心
│   └── zookeeper.py  # 服務發現
└── tests/           # 測試用例

完整的Python實現方案:

  1. 項目結構:采用模塊化設計(Broker/Client/Registry)

  2. 網絡層:基于ZeroMQ的ROUTER/DEALER模式實現高性能通信

  3. 分區管理:一致性哈希算法解決數據分布問題

  4. 存儲引擎:WAL日志實現消息持久化
    代碼片段展示關鍵實現邏輯,如Broker的消息路由、虛擬節點環構建、日志追加操作等核心功能。

2.1 網絡通信層:ZeroMQ實現
# broker/server.py
import zmq
import threadingclass BrokerServer:def __init__(self, host='localhost', port=5555):self.context = zmq.Context()self.socket = self.context.socket(zmq.ROUTER)self.socket.bind(f"tcp://{host}:{port}")self.workers = {}self.worker_lock = threading.Lock()def start(self):poller = zmq.Poller()poller.register(self.socket, zmq.POLLIN)while True:socks = dict(poller.poll(1000))if self.socket in socks:msg = self.socket.recv_multipart()identity = msg[0]command = msg[1].decode()if command == "REGISTER":with self.worker_lock:self.workers[identity] = msg[2]self.socket.send_multipart([identity, b"ACK"])elif command == "PRODUCE":topic = msg[2].decode()message = msg[3]# 存儲消息邏輯...self.socket.send_multipart([identity, b"ACK"])elif command == "CONSUME":# 消費邏輯...pass
2.2 分區管理:一致性哈希算法
# broker/partition.py
import hashlib
from bisect import bisectclass PartitionManager:def __init__(self, partitions=3, replicas=3):self.partitions = partitionsself.replicas = replicasself.ring = []self.nodes = {}self._build_ring()def _build_ring(self):for partition in range(self.partitions):for replica in range(self.replicas):key = f"partition-{partition}-replica-{replica}"hash_val = self._hash(key)self.ring.append(hash_val)self.nodes[hash_val] = (partition, replica)self.ring.sort()def _hash(self, key):return int(hashlib.md5(key.encode()).hexdigest()[:8], 16)def get_partition(self, key):if not self.ring:return Nonehash_key = self._hash(key)pos = bisect(self.ring, hash_key)if pos == len(self.ring):pos = 0return self.nodes[self.ring[pos]]
2.3 存儲引擎:WAL日志實現
# broker/storage.py
import os
import structclass WriteAheadLog:def __init__(self, data_dir):self.data_dir = data_diros.makedirs(data_dir, exist_ok=True)self.segments = {}self.current_segment = Nonedef _get_segment_file(self, partition, offset):return os.path.join(self.data_dir, f"partition-{partition}-{offset}.log")def append(self, partition, message):if partition not in self.segments:self.segments[partition] = []self._create_segment(partition, 0)seg_file = self.current_segment[partition]with open(seg_file, 'ab') as f:# 消息格式:長度(4字節) + 消息體msg_bytes = message.encode()f.write(struct.pack(">I", len(msg_bytes)))f.write(msg_bytes)return self.current_offset[partition]def read(self, partition, offset, max_bytes=1024*1024):# 實現消息讀取邏輯pass

三、核心機制深入解析

3.1 消息生產流程
  • 生產流程:注冊中心發現→消息路由→持久化存儲→ACK確認

3.2 消息消費流程
  • 消費流程:分區分配→消息拉取→處理確認→偏移量更新
    重點解析消費者組負載均衡算法,展示如何通過rebalance()方法動態分配分區,確保消費能力最大化。

3.3 消費者組負載均衡
# client/consumer.py
import random
from collections import defaultdictclass ConsumerGroup:def __init__(self, group_id, registry):self.group_id = group_idself.registry = registryself.members = {}self.assignment = defaultdict(list)def join(self, consumer_id):self.members[consumer_id] = time.time()self.rebalance()def leave(self, consumer_id):if consumer_id in self.members:del self.members[consumer_id]self.rebalance()def rebalance(self):# 獲取所有分區partitions = self.registry.get_partitions()# 排序消費者和分區sorted_consumers = sorted(self.members.keys())sorted_partitions = sorted(partitions)# 平均分配分區self.assignment = defaultdict(list)for i, partition in enumerate(sorted_partitions):consumer_idx = i % len(sorted_consumers)consumer_id = sorted_consumers[consumer_idx]self.assignment[consumer_id].append(partition)# 通知所有消費者新的分配方案self._notify_assignment()

四、高可用性實現方案

解決分布式環境的核心問題——高可用:

  1. 主從復制:基于Raft思想實現Leader/Follower數據同步

  2. 故障轉移:流程圖展示心跳檢測→選舉→數據恢復的全過程
    Python代碼實現復制狀態機(commit_index維護)和選舉觸發機制,確保單點故障時服務秒級切換。

4.1 主從復制機制
# broker/replication.py
import logging
from threading import Threadclass PartitionReplica:def __init__(self, partition_id, role='follower'):self.partition_id = partition_idself.role = roleself.leader = Noneself.followers = []self.log = []self.commit_index = 0self.last_applied = 0def start_replication(self):if self.role == 'leader':Thread(target=self._leader_loop).start()else:Thread(target=self._follower_loop).start()def _leader_loop(self):while True:# 發送心跳給所有followerfor follower in self.followers:try:follower.send_heartbeat(self.commit_index)except NetworkError:logging.warning("Follower unreachable")# 等待新消息time.sleep(0.5)def _follower_loop(self):while True:# 等待leader心跳if self._heartbeat_timeout():self.start_election()# 處理復制請求time.sleep(0.1)
4.2 故障轉移流程

五、消息可靠性保障

可靠性是消息隊列的生命線,本節實現:

  1. ACK機制:通過MessageTracker跟蹤未確認消息,實現超時重試

  2. 死信隊列:對多次重試失敗的消息隔離存儲
    代碼展示消息跟蹤器的環形緩沖區設計和死信存儲策略,確保消息至少投遞一次(at-least-once)。

5.1 消息確認機制
# broker/message_tracker.py
import time
from collections import defaultdictclass MessageTracker:def __init__(self, ack_timeout=30):self.pending_acks = defaultdict(dict)self.ack_timeout = ack_timeoutself.cleaner_thread = Thread(target=self._clean_expired)self.cleaner_thread.daemon = Trueself.cleaner_thread.start()def add_message(self, partition, offset, message_id):self.pending_acks[partition][offset] = {'message_id': message_id,'timestamp': time.time(),'retries': 0}def ack_message(self, partition, offset):if partition in self.pending_acks and offset in self.pending_acks[partition]:del self.pending_acks[partition][offset]return Truereturn Falsedef _clean_expired(self):while True:current_time = time.time()for partition, messages in list(self.pending_acks.items()):for offset, data in list(messages.items()):if current_time - data['timestamp'] > self.ack_timeout:if data['retries'] < 3:# 重試邏輯self._retry_message(partition, offset, data)else:# 移入死信隊列self._move_to_dlq(partition, offset, data)time.sleep(5)
5.2 死信隊列實現
# broker/dlq_manager.py
import json
from datetime import datetimeclass DeadLetterQueue:def __init__(self, storage_path):self.storage_path = storage_pathos.makedirs(storage_path, exist_ok=True)def add_message(self, message, reason):dlq_entry = {'original': message,'reason': reason,'timestamp': datetime.utcnow().isoformat(),'retries': message.get('retries', 0)}# 文件名格式: DLQ_<topic>_<partition>_<timestamp>.jsonfilename = f"DLQ_{message['topic']}_{message['partition']}_{int(time.time())}.json"with open(os.path.join(self.storage_path, filename), 'w') as f:json.dump(dlq_entry, f)logging.error(f"消息移入死信隊列: {reason}")

六、分布式協調服務

可靠性是消息隊列的生命線,本節實現:

  1. ACK機制:通過MessageTracker跟蹤未確認消息,實現超時重試

  2. 死信隊列:對多次重試失敗的消息隔離存儲
    代碼展示消息跟蹤器的環形緩沖區設計和死信存儲策略,確保消息至少投遞一次(at-least-once)。

6.1 基于ZooKeeper的服務發現
# registry/zookeeper.py
from kazoo.client import KazooClientclass ServiceRegistry:def __init__(self, hosts='127.0.0.1:2181'):self.zk = KazooClient(hosts=hosts)self.zk.start()self._setup_paths()def _setup_paths(self):base_path = "/pyqueue"self.zk.ensure_path(f"{base_path}/brokers")self.zk.ensure_path(f"{base_path}/topics")self.zk.ensure_path(f"{base_path}/consumers")def register_broker(self, broker_id, endpoint):path = f"/pyqueue/brokers/{broker_id}"self.zk.create(path, endpoint.encode(), ephemeral=True, makepath=True)def get_brokers(self):brokers = {}broker_ids = self.zk.get_children("/pyqueue/brokers")for bid in broker_ids:data, _ = self.zk.get(f"/pyqueue/brokers/{bid}")brokers[bid] = data.decode()return brokersdef watch_brokers(self, callback):@self.zk.ChildrenWatch("/pyqueue/brokers")def watch_children(children):callback(self.get_brokers())

七、性能優化策略

總結實戰經驗如下:

  1. 消息規范:標準化消息格式(唯一ID/時間戳/版本控制)

  2. 冪等設計:通過processed_ids集合避免重復消費

  3. 監控指標:跟蹤隊列深度/消費延遲/錯誤率
    代碼示例展示訂單處理的冪等實現,解決分布式場景的重復消費難題。

7.1 零拷貝傳輸
# 使用memoryview減少數據拷貝
def send_large_message(socket, data):# 創建內存視圖buffer = memoryview(data)total_size = len(buffer)sent = 0# 分塊發送while sent < total_size:# 每次發送最多64KBchunk_size = min(64 * 1024, total_size - sent)socket.send(buffer[sent:sent+chunk_size])sent += chunk_size
7.2 批量消息處理
# producer.py
class BatchProducer:def __init__(self, broker, batch_size=1024, linger_ms=100):self.broker = brokerself.batch_size = batch_sizeself.linger_ms = linger_msself.batch = []self.batch_lock = threading.Lock()self.flush_thread = threading.Thread(target=self._auto_flush)self.flush_thread.daemon = Trueself.flush_thread.start()def send(self, topic, message):with self.batch_lock:self.batch.append((topic, message))if len(self.batch) >= self.batch_size:self._flush()def _auto_flush(self):while True:time.sleep(self.linger_ms / 1000.0)with self.batch_lock:if self.batch:self._flush()def _flush(self):# 序列化批處理消息batch_data = self._serialize_batch()self.broker.send_batch(batch_data)self.batch = []

八、部署架構與容災方案

設計企業級部署方案:

  1. 多機房部署:通過ZK集群跨機房同步元數據

  2. 數據復制:跨機房異步復制保證數據安全

  3. 災備切換:五步故障恢復流程(檢測→選舉→同步→重定向→恢復)
    架構圖展示多機房容災部署模型,確保RPO<5秒,RTO<30秒。

8.1 多機房部署架構

8.2 災備切換流程
  1. 故障檢測:ZooKeeper心跳超時(>15秒)

  2. 主節點切換:選舉新Leader

  3. 數據同步:從副本恢復未同步數據

  4. 客戶端重定向:更新Broker列表

  5. 服務恢復:繼續處理積壓消息

九、生產環境最佳實踐

總結實戰經驗:

  1. 消息規范:標準化消息格式(唯一ID/時間戳/版本控制)

  2. 冪等設計:通過processed_ids集合避免重復消費

  3. 監控指標:跟蹤隊列深度/消費延遲/錯誤率
    代碼示例展示訂單處理的冪等實現,解決分布式場景的重復消費難題。

9.1 消息設計規范
# 推薦消息格式
{"id": "msg_1234567890",  # 唯一ID"timestamp": 1672531200.000,  # 精確到毫秒"source": "order_service","type": "OrderCreated","version": "v1","payload": {  # 實際業務數據"order_id": 1001,"amount": 99.99},"headers": {  # 擴展元數據"retry_count": 0,"trace_id": "abc123"}
}
9.2 消費者冪等處理
class OrderProcessor:def __init__(self, db):self.db = dbself.processed_ids = set()self.lock = threading.Lock()def process_message(self, message):msg_id = message['id']# 檢查是否已處理with self.lock:if msg_id in self.processed_ids:return True  # 冪等跳過# 開始處理try:result = self._create_order(message['payload'])# 記錄已處理self.processed_ids.add(msg_id)self.db.save_processed(msg_id)return Trueexcept Exception as e:logging.error(f"訂單處理失敗: {e}")return False

十、與開源方案對比

特性自實現隊列RabbitMQKafkaRedis Stream
協議支持自定義AMQP自定義RESP
吞吐量中等中等
延遲
持久化WAL日志磁盤磁盤可選
開發復雜度
Python集成完美

通過特性對比表幫助技術選型:

  1. 吞吐量:Kafka > Redis Stream > 自實現 > RabbitMQ

  2. 延遲:自實現/Redis < RabbitMQ < Kafka

  3. 適用場景

    • 自實現:定制化需求

    • Kafka:日志處理

    • RabbitMQ:事務消息

    • Redis:實時流處理
      指出自實現的優勢在于靈活性和學習價值,但生產環境推薦使用成熟方案。

結語:消息隊列的設計哲學

分布式消息隊列的本質是時空解耦器,它通過三個核心機制解決分布式系統通信問題:

  1. 時間解耦:生產者消費者無需同時在線

  2. 空間解耦:服務間不直接依賴

  3. 流量削峰:緩沖突發流量

在Python中實現分布式消息隊列需要平衡:

  • 性能:零拷貝、批處理、異步IO

  • 可靠性:持久化、復制、ACK機制

  • 擴展性:分區、負載均衡、無狀態設計

雖然已有RabbitMQ、Kafka等成熟方案,但理解其底層實現原理:

  1. 有助于根據業務需求選擇合適的消息中間件

  2. 能在特殊場景下進行針對性優化

  3. 為開發分布式系統提供基礎架構能力

分布式系統的通信藝術:好的消息隊列設計如同精密的郵遞系統——生產者是寄件人,Broker是郵局網絡,消費者是收件人。只有每個環節都可靠高效,信息才能跨越時空的阻隔準確送達。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/917441.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/917441.shtml
英文地址,請注明出處:http://en.pswp.cn/news/917441.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【大模型核心技術】Agent 理論與實戰

一、基本概念 LLM 特性&#xff1a;擅長理解和生成文本&#xff0c;但采用 “一次性” 響應模式&#xff0c;本質上是無記憶的生成模型。Agent 本質&#xff1a;包含 LLM 的系統應用&#xff0c;具備自主規劃、工具調用和環境反饋能力&#xff0c;是將 LLM 從 “聊天機器人” 升…

Maven - 依賴的生命周期詳解

作者&#xff1a;唐叔在學習 專欄&#xff1a;唐叔的Java實踐 標簽&#xff1a;Maven依賴管理、Java項目構建、依賴傳遞性、Spring Boot依賴、Maven最佳實踐、項目構建工具、依賴沖突解決、POM文件詳解 文章目錄一、開篇二、Maven依賴生命周期2.1 依賴聲明階段&#xff1a;POM文…

從零打造大語言模型--處理文本數據

從零打造大語言模型 第 1 章&#xff1a;處理文本數據 章節導讀 在把文本投喂進 Transformer 之前&#xff0c;需要兩步&#xff1a;① 將字符流切分成離散 Token&#xff1b;② 把 Token 映射成連續向量。 1.1 理解詞嵌入&#xff08;Word Embedding&#xff09; 嵌入向量 一…

【Spring】Bean的生命周期,部分源碼解釋

文章目錄Bean 的生命周期執行流程代碼演示執行結果源碼閱讀AbstractAutowireCapableBeanFactorydoCreateBeaninitializeBeanBean 的生命周期 生命周期指的是一個對象從誕生到銷毀的整個生命過程&#xff0c;我們把這個過程就叫做一個對象的聲明周期 Bean 的聲明周期分為以下 …

[spring-cloud: 服務發現]-源碼解析

DiscoveryClient DiscoveryClient 接口定義了常見的服務發現操作&#xff0c;如獲取服務實例、獲取所有服務ID、驗證客戶端可用性等&#xff0c;通常用于 Eureka 或 Consul 等服務發現框架。 public interface DiscoveryClient extends Ordered {/*** Default order of the dis…

QML 基礎語法與對象模型

QML (Qt Meta-Object Language) 是一種聲明式語言&#xff0c;專為創建流暢的用戶界面和應用程序邏輯而設計。作為 Qt 框架的一部分&#xff0c;QML 提供了簡潔、直觀的語法來描述 UI 組件及其交互方式。本文將深入解析 QML 的基礎語法和對象模型。 一、QML 基礎語法 1. 基本對…

HTTPS的概念和工作過程

一.HTTPS是什么HTTPS也是一個應用層協議&#xff0c;是在HTTP協議的基礎上引入了一個加密層&#xff08;SSL&#xff09;HTTP協議內容都是按照文本的方式明文傳輸的&#xff0c;這就導致傳輸過程中可能出現被篡改的情況最著名的就是十多年前網絡剛發展的時期&#xff0c;出現“…

Unity —— Android 應用構建與發布?

文章目錄1 ?Gradle模板??&#xff1a;了解Gradle模板的作用及使用方法&#xff0c;以增強對構建流程的控制。?2 ?Gradle模板變量??&#xff1a;參考文檔——自定義Gradle模板文件中可用的變量列表。2.1 修改Unity應用的Gradle工程文件2.1.1 通過Gradle模板文件2.1.2 導出…

【iOS】strong和copy工作流程探尋、OC屬性關鍵字復習

文章目錄前言strong和copy的區別為什么要用copy&#xff1f;什么時候用什么修飾&#xff1f;strong&#xff08;ARC自動管理&#xff09;strong修飾變量的底層流程圖底層代碼核心實現小結copy底層流程圖對比與strong的關鍵不同之處內部調用關系&#xff08;偽代碼&#xff09;小…

程序代碼篇---多循環串口程序切換

上位機版&#xff08;Python&#xff09;要實現根據串口接收結果高效切換四個 while 循環函數&#xff0c;我們可以采用狀態機模式&#xff0c;配合非阻塞串口讀取來設計程序結構。這種方式可以實現快速切換&#xff0c;避免不必要的資源消耗。下面是一個高效的實現方案&#x…

rk3568上,實現ota,計算hash,驗證簽名,判斷激活分區,并通過dd命令,寫入對應AB分區

通過自定義升級程序&#xff0c;更直觀的理解ota升級原理。 一、模擬計算hash&#xff0c;驗證簽名&#xff0c;判斷激活分區&#xff0c;并通過dd命令&#xff0c;寫入對應分區 #include <stdio.h> #include <stdlib.h> #include <string.h> #include <u…

數據分析—numpy庫

numpy庫NumPy 庫全面指南NumPy (Numerical Python) 是 Python 科學計算的基礎庫&#xff0c;提供了高性能的多維數組對象和工具。以下是 NumPy 的核心功能和使用方法。一、安裝與基礎1. 安裝 NumPypip install numpy2. 導入 NumPyimport numpy as np # 標準導入方式二、數組創建…

Vue3 setup、ref和reactive函數

一、setup函數1.理解&#xff1a;Vue3.0中一個新的配置項&#xff0c;值為一個函數。2.setup是所有Composition API(組合API)的“表演舞臺”。3.組件中用到的&#xff1a;數據、方法等等&#xff0c;均要配置在setup中。4.setup函數的兩種返回值&#xff1a;(1).若返回一個對象…

python中appium 的NoSuchElementException錯誤 原因以及解決辦法

錯誤收集D:\Program\Util\python.exe "D:/Program/myUtil/PyCharm 2024.3.5/plugins/python-ce/helpers/pycharm/_jb_pytest_runner.py" --target demo.py::TestAppium Testing started at 15:57 ... Launching pytest with arguments demo.py::TestAppium --no-hea…

mybatis-plus從入門到入土(四):持久層接口之BaseMapper和選裝件

大家好&#xff0c;今天繼續更新MybatisPlus從入門到入土系列&#xff0c;上一次的持久層接口還沒講完&#xff0c;只講了IService接口&#xff0c;今天我們繼續來講一下。 BaseMapper BaseMapper中的方法也比較簡單&#xff0c;都是增刪改查的基礎API&#xff0c;不知道大家還…

數組和指針的關系

在 C 語言中&#xff0c;?指針和數組有著非常緊密的聯系&#xff0c;但它們本質上是 ?不同的概念。理解它們的關系是掌握 C 語言內存操作的關鍵。下面我會從多個角度幫你梳理 ?指針和數組的直接聯系&#xff0c;并解釋它們的異同點。1. 數組和指針的本質區別?概念本質存儲方…

AI大模型探索之路-實戰篇:智能化IT領域搜索引擎之github網站在線搜索

系列篇章?? No. 文章 1 AI大模型探索之路-實戰篇:智能化IT領域搜索引擎的構建與初步實踐 2 AI大模型探索之路-實戰篇:智能化IT領域搜索引擎之GLM-4大模型技術的實踐探索 3 AI大模型探索之路-實戰篇:智能化IT領域搜索引擎之知乎網站數據獲取(初步實踐) 4 AI大模型探索之路…

從0到1學PHP(十二):PHP 框架入門與項目實戰

目錄一、主流 PHP 框架介紹1.1 Laravel1.2 ThinkPHP1.3 Yii1.4 框架的優勢二、框架基本使用&#xff08;以 Laravel 為例&#xff09;2.1 框架的安裝與配置2.2 路由定義、控制器創建、視圖渲染2.3 數據庫操作&#xff08;ORM 的使用&#xff09;三、小型項目實戰3.1 項目需求分…

MPLS LSP

一、概述上一章我們已經介紹過,LSP是MPLS報文在MPLS網絡中轉發時經過的路徑,可以看作是由報文傳輸方向節點為對應FEC分配的MPLS入標簽組成的,因為每臺設備上為每個FEC分配的入標簽是唯一 的&#xff0c;并與由下游節點為本地節點上該FEC分配的出標簽建立映射關系&#xff0c; 所…

圖像、視頻、音頻多模態大模型中長上下文token壓縮方法綜述

多模態大模型MLLMs 能夠處理高分辨率圖像、長視頻序列和冗長音頻輸入等復雜上下文&#xff0c;但自注意力機制的二次復雜度使得大量輸入 token 帶來了巨大的計算和內存需求。 如下圖&#xff0c;上&#xff1a;圖像、視頻和音頻數據類型可以在其表示維度上進行擴展&#xff0c;…