使用asyncio包編寫服務器
演示 TCP 服務器時通常使用回顯服務器。我們要構建更好玩一點的示
例服務器,用于查找 Unicode 字符,分別使用簡單的 TCP 協議和 HTTP
協議實現。這兩個服務器的作用是,讓客戶端使用 4.8 節討論過的
unicodedata 模塊,通過規范名稱查找 Unicode 字符。圖 18-2 展示了
在一個 Telnet 會話中訪問 TCP 版字符查找服務器所做的兩次查詢,一次
查詢國際象棋棋子字符,一次查詢名稱中包含“sun”的字符。
圖 18-2:在一個 Telnet 會話中訪問 tcp_charfinder.py 服務器——查
詢“chess black”和“sun”
使用asyncio包編寫TCP服務器
下面幾個示例的大多數邏輯在 charfinder.py 模塊中,這個模塊沒有任何
并發。你可以在命令行中使用 charfinder.py 腳本查找字符,不過這個腳
本更為重要的作用是為使用 asyncio 包編寫的服務器提供支持。
charfinder.py 腳本的代碼在本書的代碼倉庫中
(https://github.com/fluentpython/example-code)。
charfinder 模塊讀取 Python 內建的 Unicode 數據庫,為每個字符名稱
中的每個單詞建立索引,然后倒排索引,存進一個字典。例如,在倒排
索引中,‘SUN’ 鍵對應的條目是一個集合(set),里面是名稱中包含
‘SUN’ 這個詞的 10 個 Unicode 字符。 倒排索引保存在本地一個名為
charfinder_index.pickle 的文件中。如果查詢多個單詞,charfinder 會
計算從索引中所得集合的交集。
下面我們把注意力集中在響應圖 18-2 中那兩個查詢的 tcp_charfinder.py
腳本上。我要對這個腳本中的代碼做大量說明,因此把它分為兩部分,
分別在示例 18-14 和示例 18-15 中列出。
示例 18-14 tcp_charfinder.py:使用 asyncio.start_server 函數
實現的簡易 TCP 服務器;這個模塊余下的代碼在示例 18-15 中
import sys
import asyncio
from charfinder import UnicodeNameIndex ?
CRLF = b'\r\n'
PROMPT = b'?> '
index = UnicodeNameIndex() ?
@asyncio.coroutine
def handle_queries(reader, writer): ?while True: ?writer.write(PROMPT) # 不能使用yield from! ?yield from writer.drain() # 必須使用yield from! ?data = yield from reader.readline() ?try:query = data.decode().strip()except UnicodeDecodeError: ?query = '\x00'client = writer.get_extra_info('peername') ?print('Received from {}: {!r}'.format(client, query)) ?if query:if ord(query[:1]) < 32: ?breaklines = list(index.find_description_strs(query)) ?if lines:writer.writelines(line.encode() + CRLF for line in lines) ?writer.write(index.status(query, len(lines)).encode() + CRLF) ?yield from writer.drain() ?print('Sent {} results'.format(len(lines))) ?print('Close the client socket') ?
writer.close()
? UnicodeNameIndex 類用于構建名稱索引,提供查詢方法。
? 實例化 UnicodeNameIndex 類時,它會使用 charfinder_index.pickle
文件(如果有的話),或者構建這個文件,因此第一次運行時可能要等
幾秒鐘服務器才能啟動。
? 這個協程要傳給 asyncio.start_server 函數,接收的兩個參數是
asyncio.StreamReader 對象和 asyncio.StreamWriter 對象。
? 這個循環處理會話,直到從客戶端收到控制字符后退出。
? StreamWriter.write 方法不是協程,只是普通的函數;這一行代
碼發送 ?> 提示符。
? StreamWriter.drain 方法刷新 writer 緩沖;因為它是協程,所以
必須使用 yield from 調用。
? StreamReader.readline 方法是協程,返回一個 bytes 對象。
? Telnet 客戶端發送控制字符時,可能會拋出 UnicodeDecodeError
異常;遇到這種情況時,為了簡單起見,假裝發送的是空字符。
? 返回與套接字連接的遠程地址。
? 在服務器的控制臺中記錄查詢。
? 如果收到控制字符或者空字符,退出循環。
? 返回一個生成器,產出包含 Unicode 碼位、真正的字符和字符名稱的
字符串(例如, U+0039\t9\tDIGIT NINE);為了簡單起見,我從中
構建了一個列表。
? 使用默認的 UTF-8 編碼把 lines 轉換成 bytes 對象,并在每一行末
尾添加回車符和換行符;注意,參數是一個生成器表達式。
? 輸出狀態,例如 627 matches for ‘digit’。
? 刷新輸出緩沖。
? 在服務器的控制臺中記錄響應。
? 在服務器的控制臺中記錄會話結束。
? 關閉 StreamWriter 流。
handle_queries 協程的名稱是復數,因為它啟動交互式會話后能處理
各個客戶端發來的多次請求。
注意,示例 18-14 中所有的 I/O 操作都使用 bytes 格式。因此,我們要
解碼從網絡中收到的字符串,還要編碼發出的字符串。Python 3 默認使
用的編碼是 UTF-8,這里就隱式使用了這個編碼。
注意一點,有些 I/O 方法是協程,必須由 yield from 驅動,而另一些則是普通的函數。例如,StreamWriter.write 是普通的函數,我們假
定它大多數時候都不會阻塞,因為它把數據寫入緩沖;而刷新緩沖并真
正執行 I/O 操作的 StreamWriter.drain 是協
程,StreamReader.readline 也是協程。寫作本書時,asyncio 包的
API 文檔有重大的改進,明確標識出了哪些方法是協程。
示例 18-15 接續示例 18-14,列出這個模塊的 main 函數。
示例 18-15 tcp_charfinder.py(接續示例 18-14):main 函數創建
并銷毀事件循環和套接字服務器
def main(address='127.0.0.1', port=2323): ?port = int(port)loop = asyncio.get_event_loop()server_coro = asyncio.start_server(handle_queries, address, port,
loop=loop) ?server = loop.run_until_complete(server_coro) ?host = server.sockets[0].getsockname() ?print('Serving on {}. Hit CTRL-C to stop.'.format(host)) ?try:loop.run_forever() ?except KeyboardInterrupt: # 按CTRL-C鍵passprint('Server shutting down.')server.close() ?loop.run_until_complete(server.wait_closed()) ?loop.close() ?
if __name__ == '__main__':main(*sys.argv[1:]) ?
? 調用 main 函數時可以不傳入參數。
? asyncio.start_server 協程運行結束后,返回的協程對象返回一
個 asyncio.Server 實例,即一個 TCP 套接字服務器。
? 驅動 server_coro 協程,啟動服務器(server)。
? 獲取這個服務器的第一個套接字的地址和端口,然后……
? ……在服務器的控制臺中顯示出來。這是這個腳本在服務器的控制
臺中顯示的第一個輸出。
? 運行事件循環;main 函數在這里阻塞,直到在服務器的控制臺中按
CTRL-C 鍵才會關閉。
? 關閉服務器。
? server.wait_closed() 方法返回一個期物;調用
loop.run_until_complete 方法,運行期物。
? 終止事件循環。
? 這是處理可選的命令行參數的簡便方式:展開 sys.argv[1:],傳給
main 函數,未指定的參數使用相應的默認值。
注意,run_until_complete 方法的參數是一個協程(start_server
方法返回的結果)或一個 Future 對象(server.wait_closed 方法返
回的結果)。如果傳給 run_until_complete 方法的參數是協程,會
把協程包裝在 Task 對象中。
仔細查看 tcp_charfinder.py 腳本在服務器控制臺中生成的輸出(如示例
18-16),更易于理解腳本中控制權的流動。
示例 18-16 tcp_charfinder.py:這是圖 18-2 所示會話在服務器端的
輸出
$ python3 tcp_charfinder.py
Serving on ('127.0.0.1', 2323). Hit CTRL-C to stop. ?
Received from ('127.0.0.1', 62910): 'chess black' ?
Sent 6 results
Received from ('127.0.0.1', 62910): 'sun' ?
Sent 10 results
Received from ('127.0.0.1', 62910): '\x00' ?
Close the client socket ?
? 這是 main 函數的輸出。
? handle_queries 協程中那個 while 循環第一次迭代的輸出。
? 那個 while 循環第二次迭代的輸出。
? 用戶按下 CTRL-C 鍵;服務器收到控制字符,關閉會話。
? 客戶端套接字關閉了,但是服務器仍在運行,準備為其他客戶端提
供服務。
注意,main 函數幾乎會立即顯示 Serving on… 消息,然后在調用
loop.run_forever() 方法時阻塞。在那一點,控制權流動到事件循環
中,而且一直待在那里,不過偶爾會回到 handle_queries 協程,這個
協程需要等待網絡發送或接收數據時,控制權又交還事件循環。在事件
循環運行期間,只要有新客戶端連接服務器就會啟動一個
handle_queries 協程實例。因此,這個簡單的服務器可以并發處理多
個客戶端。出現 KeyboardInterrupt 異常,或者操作系統把進程殺
死,服務器會關閉。
tcp_charfinder.py 腳本利用 asyncio 包提供的高層流
API(https://docs.python.org/3/library/asyncio-stream.html),有現成的服
務器可用,所以我們只需實現一個處理程序(普通的回調或協程)。此
外,asyncio 包受 Twisted 框架中抽象的傳送和協議啟發,還提供了低
層傳送和協議 API。詳情請參見 asyncio 包的文檔
(https://docs.python.org/3/library/asyncio-protocol.html),里面有一個使
用低層 API 實現的 TCP 回顯服務器。
使用aiohttp包編寫Web服務器
asyncio 版國旗下載示例使用的 aiohttp 庫也支持服務器端 HTTP,我
就使用這個庫實現了 http_charfinder.py 腳本。圖 18-3 是這個簡易服務器
的 Web 界面,顯示搜索“cat face”表情符號得到的結果。
圖 18-3:瀏覽器窗口中顯示在 http_charfinder.py 服務器中搜索“cat
face”得到的結果
有些瀏覽器顯示 Unicode 字符的效果比其他瀏覽器好。圖
18-3 中的截圖在 OS X 版 Firefox 瀏覽器中截取,我在 Safari 中也得
到了相同的結果。但是,運行在同一臺設備中的最新版 Chrome 和
Opera 卻不能顯示貓臉等表情符號。不過其他搜索結果(例
如“chess”)正常,因此這可能是 OS X 版 Chrome 和 Opera 的字體
問題。
我們先分析 http_charfinder.py 腳本中最重要的后半部分:啟動和關閉事
件循環與 HTTP 服務器。參見示例 18-17。
示例 18-17 http_charfinder.py:main 和 init 函數
@asyncio.coroutine
def init(loop, address, port): ?app = web.Application(loop=loop) ?app.router.add_route('GET', '/', home) ?handler = app.make_handler() ?server = yield from loop.create_server(handler,
address, port) ?return server.sockets[0].getsockname() ?
def main(address="127.0.0.1", port=8888):port = int(port)loop = asyncio.get_event_loop()host = loop.run_until_complete(init(loop, address, port)) ?print('Serving on {}. Hit CTRL-C to stop.'.format(host))try:loop.run_forever() ?except KeyboardInterrupt: # 按CTRL-C鍵passprint('Server shutting down.')loop.close() ?
if __name__ == '__main__':main(*sys.argv[1:])
? init 協程產出一個服務器,交給事件循環驅動。
? aiohttp.web.Application 類表示 Web 應用……
? ……通過路由把 URL 模式映射到處理函數上;這里,把 GET / 路由
映射到 home 函數上(參見示例 18-18)。
? app.make_handler 方法返回一個 aiohttp.web.RequestHandler
實例,根據 app 對象設置的路由處理 HTTP 請求。
? create_server 方法創建服務器,以 handler 為協議處理程序,并
把服務器綁定在指定的地址(address)和端口(port)上。
? 返回第一個服務器套接字的地址和端口。
? 運行 init 函數,啟動服務器,獲取服務器的地址和端口。
? 運行事件循環;控制權在事件循環手上時,main 函數會在這里阻
塞。
? 關閉事件循環。
我們已經熟悉了 asyncio 包的 API,現在可以對比一下示例 18-17 與前面的 TCP 示例(見示例 18-15),看它們創建服務器的方式有何不同。
在前面的 TCP 示例中,服務器通過 main 函數中的下面兩行代碼創建并
排定運行時間:
server_coro = asyncio.start_server(handle_queries, address, port,
loop=loop)
server = loop.run_until_complete(server_coro)
在這個 HTTP 示例中,init 函數通過下述方式創建服務器:
server = yield from loop.create_server(handler,
address, port)
但是 init 是協程,驅動它運行的是 main 函數中的這一行:
host = loop.run_until_complete(init(loop, address, port))
asyncio.start_server 函數和 loop.create_server 方法都是協
程,返回的結果都是 asyncio.Server 對象。為了啟動服務器并返回服
務器的引用,這兩個協程都要由他人驅動,完成運行。在 TCP 示例
中,做法是調用 loop.run_until_complete(server_coro),其中
server_coro 是 asyncio.start_server 函數返回的結果。在 HTTP
示例中,create_server 方法在 init 協程中的一個 yield from 表達
式里調用,而 init 協程則由 main 函數中的
loop.run_until_complete(init(…)) 調用驅動。
我提到這一點是為了強調之前討論過的一個基本事實:只有驅動協程,
協程才能做事,而驅動 asyncio.coroutine 裝飾的協程有兩種方法,
要么使用 yield from,要么傳給 asyncio 包中某個參數為協程或期物
的函數,例如 run_until_complete。
示例 18-18 列出 home 函數。根據這個 HTTP 服務器的配置,home 函數
用于處理 /(根)URL。
示例 18-18 http_charfinder.py:home 函數
def home(request): ?query = request.GET.get('query', '').strip() ?print('Query: {!r}'.format(query)) ?if query: ?descriptions = list(index.find_descriptions(query))res = '\n'.join(ROW_TPL.format(**vars(descr))for descr in descriptions)msg = index.status(query, len(descriptions))else:descriptions = []res = ''msg = 'Enter words describing characters.'html = template.format(query=query, result=res, ?
message=msg)print('Sending {} results'.format(len(descriptions))) ?return web.Response(content_type=CONTENT_TYPE, text=html) ?
? 一個路由處理函數,參數是一個 aiohttp.web.Request 實例。
? 獲取查詢字符串,去掉首尾的空白。
? 在服務器的控制臺中記錄查詢。
? 如果有查詢字符串,從索引(index)中找到結果,使用 HTML 表格
中的行渲染結果,把結果賦值給 res 變量,再把狀態消息賦值給 msg
變量。
? 渲染 HTML 頁面。
? 在服務器的控制臺中記錄響應。
? 構建 Response 對象,將其返回。
注意,home 不是協程,既然定義體中沒有 yield from 表達式,也沒
必要是協程。在 aiohttp 包的文檔中,add_route 方法的條目
(http://aiohttp.readthedocs.org/en/v0.14.4/web_reference.html#aiohttp.web.UrlDispatcher.下面說道,“如果處理程序是普通的函數,在內部會將其轉換成協程”。
示例 18-18 中的 home 函數雖然簡單,卻有一個缺點。home 是普通的函
數,而不是協程,這一事實預示著一個更大的問題:我們需要重新思考
如何實現 Web 應用,以獲得高并發。下面來分析這個問題。
更好地支持并發的智能客戶端
示例 18-18 中的 home 函數很像是 Django 或 Flask 中的視圖函數,實現
方式完全沒有考慮異步:獲取請求,從數據庫中讀取數據,然后構建響
應,渲染完整的 HTML 頁面。在這個示例中,存儲在內存中的
UnicodeNameIndex 對象是“數據庫”。但是,對真正的數據庫來說,應
該異步訪問,否則在等待數據庫查詢結果的過程中,事件循環會阻塞。
例如,aiopg 包(https://aiopg.readthedocs.org/en/stable/)提供了一個異
步 PostgreSQL 驅動,與 asyncio 包兼容;這個包支持使用 yield
from 發送查詢和獲取結果,因此視圖函數的表現與真正的協程一樣。
除了防止阻塞調用之外,高并發的系統還必須把復雜的工作分成多步,
以保持敏捷。http_charfinder.py 服務器表明了這一點:如果搜索“cjk”,
得到的結果是 75 821 個中文、日文和韓文象形文字。 此時,home 函
數會返回一個 5.3MB 的 HTML 文檔,顯示一個有 75 821 行的表格。
我在自己的設備中使用命令行 HTTP 客戶端 curl 訪問架設在本地的
http_charfinder.py 服務器,查詢“cjk”,2 秒鐘后獲得響應。瀏覽器要布
局包含這么大一個表格的頁面,用的時間會更長。當然,大多數查詢返
回的響應要小得多:查詢“braille”返回 256 行結果,頁面大小為 19KB,
在我的設備中用時 0.017 秒。可是,如果服務器要用 2 秒鐘處理“cjk”查
詢,那么其他所有客戶端都至少要等 2 秒——這是不可接受的。
避免響應時間太長的方法是實現分頁:首次至多返回(比如說)200
行,用戶點擊鏈接或滾動頁面時再獲取更多結果。如果查看本書代碼倉
庫(https://github.com/fluentpython/example-code)中的 charfinder.py 模
塊,你會發現 UnicodeNameIndex.find_descriptions 方法有兩個可
選的參數——start 和 stop,這是偏移值,用于支持分頁。因此,我
們可以返回前 200 個結果,當用戶想查看更多結果時,再使用 AJAX 或
WebSockets 發送下一批結果。
實現分批發送結果所需的大多數代碼都在瀏覽器這一端,因此 Google
和所有大型互聯網公司都大量依賴客戶端代碼構建服務:智能的異步客
戶端能更好地使用服務器資源。
雖然智能的客戶端甚至對老式 Django 應用也有幫助,但是要想真正為
這種客戶端服務,我們需要全方位支持異步編程的框架,從處理 HTTP
請求和響應到訪問數據庫,全都支持異步。如果想實現實時服務,例如
游戲和以 WebSockets 支持的媒體流,那就尤其應該這么做。
這里留一個練習給讀者:改進 http_charfinder.py 腳本,添加下載進度
條。此外還有一個附加題:實現 Twitter 那樣的“無限滾動”。做完這個
練習后,我們對如何使用 asyncio 包做異步編程的討論就結束了。