說明
這個用來替代nginx。
最初是希望用nginx進行故障檢測和負載均衡,花了很多時間,大致的結論是:nginx可以實現,但是是在商業版里。非得要找替代肯定可以搞出來,但是太麻煩了(即使是nginx本身的配置也很煩)。更重要的是,我突然意識到其實我對這方面的需求大幅提升了,nginx無法實現我最核心的功能。
于是我打算自己做一個網關。以下是兩者的差別:
我并不是要做一個新的產品來替代nginx,只是需要對我的應用服務進行非常精細的控制,然后我問了下大模型,是否可行:
使用 Tornado 構建自定義應用網關 是一個非常有挑戰性但也非常靈活的選擇。通過 Tornado 的異步特性和強大的編程能力,你可以實現非常精細的控制,滿足特定的業務需求。以下是實現這一目標的思路和建議:
1. Tornado 作為應用網關的核心功能
你可以利用 Tornado 實現以下網關功能:
反向代理
- 使用 Tornado 的
AsyncHTTPClient
或httpclient
模塊,將客戶端請求轉發到后端服務。 - 示例:
from tornado.httpclient import AsyncHTTPClientclass ProxyHandler(tornado.web.RequestHandler):async def get(self):http_client = AsyncHTTPClient()response = await http_client.fetch("http://backend-service" + self.request.uri)self.write(response.body)
負載均衡
- 實現自定義的負載均衡算法(如輪詢、加權輪詢、最小連接數等)。
- 示例:
backend_servers = ["http://backend1", "http://backend2", "http://backend3"] current_server = 0class LoadBalancerHandler(tornado.web.RequestHandler):async def get(self):global current_serverbackend_url = backend_servers[current_server]current_server = (current_server + 1) % len(backend_servers)http_client = AsyncHTTPClient()response = await http_client.fetch(backend_url + self.request.uri)self.write(response.body)
動態路由
- 根據請求的路徑、頭部或其他條件,將請求路由到不同的后端服務。
- 示例:
class RouterHandler(tornado.web.RequestHandler):async def get(self):if self.request.path.startswith("/api/v1"):backend_url = "http://api-v1-service"elif self.request.path.startswith("/api/v2"):backend_url = "http://api-v2-service"else:backend_url = "http://default-service"http_client = AsyncHTTPClient()response = await http_client.fetch(backend_url + self.request.uri)self.write(response.body)
限流和限速
- 使用 Tornado 的異步特性實現請求速率限制。
- 示例:
from tornado.ioloop import IOLoop from tornado.web import RequestHandler from tornado.queues import Queuerequest_queue = Queue(maxsize=100) # 限制并發請求數class RateLimitHandler(RequestHandler):async def get(self):await request_queue.put(None)try:# 處理請求self.write("Hello, world!")finally:request_queue.get_nowait()
身份驗證和授權
- 在網關層實現統一的身份驗證(如 JWT 驗證、OAuth 等)。
- 示例:
import jwtclass AuthHandler(tornado.web.RequestHandler):def prepare(self):token = self.request.headers.get("Authorization")if not token:self.set_status(401)self.finish("Unauthorized")try:payload = jwt.decode(token, "secret_key", algorithms=["HS256"])self.current_user = payload["user"]except jwt.InvalidTokenError:self.set_status(401)self.finish("Invalid token")
日志和監控
- 在網關層記錄請求日志,并集成監控工具(如 Prometheus)。
- 示例:
class LoggingHandler(tornado.web.RequestHandler):def on_finish(self):print(f"Request: {self.request.method} {self.request.uri} - Status: {self.get_status()}")
2. Tornado 網關的優勢
- 高度定制化:你可以完全控制網關的行為,實現特定的業務邏輯。
- 靈活性:Tornado 的異步特性使其能夠高效處理高并發請求。
- 與 Python 生態無縫集成:可以輕松集成其他 Python 庫(如數據庫驅動、消息隊列等)。
3. Tornado 網關的挑戰
- 開發成本高:需要從頭實現許多功能(如負載均衡、健康檢查、限流等)。
- 性能瓶頸:Tornado 是 Python 編寫的,性能可能不如 Nginx 這樣的 C 語言實現。
- 維護復雜性:自定義網關需要更多的測試和維護工作。
4. 建議的架構
如果你決定使用 Tornado 構建網關,建議采用以下架構:
-
Nginx 作為前置網關:
- 處理 SSL/TLS 終止、靜態文件緩存、DDoS 防護等。
- 將動態請求轉發到 Tornado 網關。
-
Tornado 作為應用網關:
- 實現精細的流量控制、動態路由、身份驗證等功能。
- 將請求轉發到后端的微服務或應用服務器。
-
監控和日志:
- 集成 Prometheus、Grafana 等工具,實時監控網關性能。
- 使用 ELK(Elasticsearch、Logstash、Kibana)堆棧進行日志分析。
5. 示例代碼
以下是一個簡單的 Tornado 網關示例:
import tornado.ioloop
import tornado.web
from tornado.httpclient import AsyncHTTPClientclass ProxyHandler(tornado.web.RequestHandler):async def get(self):backend_url = "http://backend-service" + self.request.urihttp_client = AsyncHTTPClient()response = await http_client.fetch(backend_url)self.write(response.body)def make_app():return tornado.web.Application([(r"/.*", ProxyHandler),])if __name__ == "__main__":app = make_app()app.listen(8888)tornado.ioloop.IOLoop.current().start()
總結
使用 Tornado 構建自定義應用網關是可行的,尤其適合需要高度定制化的場景。你可以實現反向代理、負載均衡、動態路由、限流、身份驗證等功能,但需要注意開發成本和性能瓶頸。如果可能,建議將 Nginx 作為前置網關,Tornado 作為應用網關,結合兩者的優勢,構建高性能、高可用的系統。
看看性能是不是ok:
所以,總體上tornado在性能上可能會比nginx低一個量級:
- 1 簡單的基礎服務可以繼續走nginx,或者直連,這部分假設無問題。
- 2 復雜的、核心的服務走smartlink。這部分的處理本身就非常“昂貴”,比如用大模型處理數據,天然的TPS就比較低,smartlink額外的開銷比起來不算啥,是可行的(或者說現在的計算資源太多了,費點沒啥)。即使是較低配的生產機也應該有8個核,花1個核做網關在我看來是值得的。
smartlink 可以帶來什么:
- 1 權限控制。過去我一般不去設置權限,這樣開發和使用比較方便,但這一步肯定是要做的。通過smartlink,僅開放一個對外端口,訪問其他服務時必須提供對應口令,在smartlink完成鑒權。
- 2 流量及質量。現在有一類服務要求比較高,按照固定周期統計可能會有些滯后。通過smartlink(結合redis), 可以實時的對流量進行控制,獲取請求的響應頭,將成功(2xx)、失敗(5xx)等不同類型的錯誤進行實時統計和通知。另外可以統計實時的影響速度,由別的算法應用讀取,來進行分發優化。
- 3 優化與強化。這個問題是最為核心且重要的,當我們花費成千上萬的人民幣在進行數據的處理時,我們知道做的不夠好。通常來說,算法很難在第一版達到完美。然而當服務進入生產狀態時,我們又很難去停下來再次改進;即使我們離線改好了新版算法,又擔心上線時出bug。這樣的最終結果是:不出問題不改進。smartlink在增加cpu開銷時,也給了足夠強大的控制邏輯,可以解決這些問題。 這里不討論細節,只看機制上的可行性。當請求到達時,smarklink會按照實時加載的策略(通過redis)決定請求是否可以多發。比如除了將請求實時轉向對應的穩定版服務,也發到kafka。收到請求的處理結果后,除了返回用戶,也發一份到kafka。這個過程會增加2次寫kafka的操作,對于秒級級的請求來說是可以接受的(假設請求30秒超時)。數據到了kafka之后,可以有多個消費者準實時的執行新版本服務的請求,然后和當前服務的結果一并進行效果評估與比對。首先會排除的問題是bug引起的請求失敗,然后新版本服務可以準實時的進行評估、打標(RLHF)。評估結束后,可以采用手動/自動的方法,修改redis中的數據,從而使請求平滑切換到新的服務。這個服務是經過了效果評估和上線測試的,不會有低級錯誤。
內容
本次的目標先實現幾個基本功能
功能:
- 1 轉發
- 2 health-check
- 3 負載均衡
1 測試樣例
- 1 轉發
在一臺主機上,啟動一個微服務(實體識別),通過smartlink轉發服務到此位置
- 2 health-check
當請求報連接錯誤時,smartlink會將對應服務記為不可用;當服務恢復時,會將其標記為可用。
- 3 負載均衡(least time)
再啟動一個服務,加入并在服務中進行時間擾動。要求smartlink在將服務轉發時會依據其響應時間動態分配。
2 實驗
我發現今天可能寫不完全部了,內容有點多,我先把完成的有部分整理上。
2.1 基礎配置部分 base_config.py
這里先手動配置了一個測試服務。假設每個服務的可用資源是一個字典, 并且認為一定有一個默認的服務版本(base)
...
base_config.server_dict = {}
base_config.server_dict['test_server'] = {}
base_config.server_dict['test_server']['base'] = "http://IP:PORT/get_lng_lat/"
2.2 數據模型 data_model.py
約定(建議性質)了模型的輸入和輸出。本來是想強制的,但那樣難度太高了,而且作為一個網關,目前的主要任務還是能夠進行有效轉發。未來可以盡量往這個標準靠。
# 服務輸入
class StandardInput(BaseModel):server_name : str # 要請求的服務名params : dict = {} # 請求參數data : list = []# 請求數據 - 一般以lod的方式輸入# 服務輸出
class StandardOutput(BaseModel):server_name: str = 'default'status : bool = Truemsg : str = 'ok'data : list = [] # 返回數據 - 以lod的方式返回duration : int = 0 # ms
2.3 服務 server.py
2.3.1 基本部分導入部分
import tornado.httpserver # http服務器
import tornado.ioloop # ?
import tornado.options # 指定服務端口和路徑解析
import tornado.web # web模塊
from tornado.options import define, options
import os.path # 獲取和生成template文件路徑
import tornado.httpclient
from Basefuncs import *
from base_config import base_config
from data_model import *
# 配置日志
logger = get_base_logger()import json
from json import JSONEncoder
class MyEncoder(JSONEncoder):def default(self, obj):if isinstance(obj, np.integer):return int(obj)elif isinstance(obj, np.floating):return float(obj)elif isinstance(obj, np.ndarray):return obj.tolist()if isinstance(obj, datetime):return obj.__str__()if isinstance(obj, dd.timedelta):return obj.__str__()else:return super(MyEncoder, self).default(obj)
2.3.2 基礎轉發
# 基礎轉發處理器
class BaseForwardHandler(tornado.web.RequestHandler):def set_default_headers(self):self.set_header("Access-Control-Allow-Origin", "*")self.set_header("Access-Control-Allow-Headers", "Content-Type")self.set_header("Access-Control-Allow-Methods", "POST, OPTIONS")def options(self):self.set_status(204)self.finish()# 被轉發的請求async def forward_request(self, target_url, body=None, headers=None):# 獲取原始請求的 body 和 headers# 如果沒有傳遞 body 和 headers,則使用原始請求的 body 和 headersif body is None:body = self.request.bodyif headers is None:headers = self.request.headers# 使用 Tornado 的異步 HTTP 客戶端轉發請求http_client = tornado.httpclient.AsyncHTTPClient()try:logger.info(f"Forwarding request to {target_url}")response = await http_client.fetch(target_url,method="POST",headers=headers,body=body)self.write(response.body)except tornado.httpclient.HTTPError as e:logger.error(f"HTTP Error: {e.code} - {str(e)}")self.set_status(e.code)self.write(json.dumps({"error": str(e)}))# 觸發告警通知await self.trigger_alert(e)except Exception as e:logger.error(f"Unexpected error: {str(e)}")self.set_status(500)self.write(json.dumps({"error": "Internal Server Error"}))# 觸發告警通知await self.trigger_alert(e)async def trigger_alert(self, error):"""觸發告警通知的邏輯"""alert_message = f"Service Error: {str(error)}"logger.error(f"Sending alert: {alert_message}")# 示例:發送告警到內部系統(如郵件、Slack、Webhook 等)try:# 假設我們有一個發送告警的函數await self.send_alert_to_internal_system(alert_message)except Exception as e:logger.error(f"Failed to send alert: {str(e)}")async def send_alert_to_internal_system(self, message):"""發送告警到內部系統的邏輯"""# 示例:使用 HTTP 客戶端發送告警到 Webhookhttp_client = tornado.httpclient.AsyncHTTPClient()alert_url = "http://internal-alert-system/endpoint"body = json.dumps({"message": message})try:await http_client.fetch(alert_url,method="POST",headers={"Content-Type": "application/json"},body=body)logger.info("Alert sent successfully")except Exception as e:logger.error(f"Failed to send alert: {str(e)}")raise e
2.3.3 應用的轉發邏輯
import time
class AIServer(BaseForwardHandler):print('AIServer Working ', get_time_str1())async def post(self):standard_output = StandardOutput()print('解析請求體')# 1. 解析請求體try:request_data = json.loads(self.request.body)print("Parsed request data:", request_data)except json.JSONDecodeError:self.set_status(400)standard_output.status = False standard_output.msg = "Invalid JSON format"self.write(json.dumps(standard_output.dict()))return# 2. 驗證請求格式try:standard_input = StandardInput(**request_data)except (TypeError, ValueError) as e:# 如果 request_data 不符合 StandardInput 的要求,捕獲 TypeError 或 ValueErrorself.set_status(400)standard_output.status = False standard_output.msg = f"Invalid input data: {str(e)}"self.write(json.dumps(standard_output.dict()))return# 3.轉發請求target_server_dict = base_config.server_dict.get(standard_input.server_name)if target_server_dict is None:self.set_status(400)standard_output.status = False standard_output.msg = f"Invalid input data: No Such Service {standard_input.server_name}"self.write(json.dumps(standard_output.dict()))return# 這里控制版本await self.forward_request(target_server_dict['base'])
然后進行調用
請求端:
import httpx
data_dict = {'server_name':'test_server', 'address': '上海九六廣場'}
resp = httpx.post('http://localhost:9001/ai_server/' ,json = data_dict)
In [31]: resp.json()
Out[31]:
{'name': 'get_lng_lat','status': True,'msg': 'ok','duration': 86,'data': {'status': 0,'lng': 121.5312514401146,'lat': 31.23340608690243,'precise': 1,'confidence': 75,'comprehension': 100,'level': '購物'}}
服務端:
...
解析請求體
Parsed request data: {'server_name': 'test_server', 'address': '上海九六廣場'}
2025-02-02 01:09:04 - my_logger - INFO - Forwarding request to http://IP/get_lng_lat/
2025-02-02 01:09:04 - tornado.access - INFO - 200 POST /ai_server/ (127.0.0.1) 158.67ms
...
可以看到轉發成功了,原始的請求大約80ms,因為轉發帶來的額外延時大約也是80ms。所以如果對于實時性要求特別高的可能還需要注意。后續再加上其他的額外處理,單個請求最終的延時可能到達200ms,甚至500ms。對于類似大模型之類的調用來說,一般會花費數秒到數十秒來處理一個請求,這種開銷就可以接受。