目錄
1. 簡單的異步程序
2.?協程函數和協程對象
3. 事件循環
4. 任務對象Task及Future對象
? ? ? ? 4.1 Task與Future的關系
? ? ? ? 4.2 Future對象
? ? ? ? 4.3 全局對象和循環事件對象
5. await關鍵字
6. 異步上下文管理
7.異步迭代器
8.?asyncio的常用函數
? ? ? ? 8.1 asyncio.run
????????8.2 asyncio.get_event_loop
? ? ? ? 8.3 asyncio.wait
? ? ? ? 8.4?asynco.gather
? ? ? ? 8.5?asyncio.wait_for
? ? ? ? 8.6?asyncio.create_task和loop.create_task
? ? ? ? 8.7?eventloop.call_later
? ? ? ? 8.8?eventloop.stop與eventloop.close
? ? ? ? 8.9 event.wait和event.set
????????協程(coroutine):類似一種輕量級的多線程程序,是在一個線程里面,實現多任務并行執行的編程方式。相比多線程,它在切換任務時,CPU開銷更低,速度更快。
? ? ? ? 協程實現多任務并行執行實際是通過各任務異步執行來實現的。
????????asyncio?是python異步代碼庫,是幾乎所有python異步實現的基礎,因此,使用python的協程功能,得先掌握asyncio庫的使用。
1. 簡單的異步程序
import asyncio
async def fun1():print('a')await asyncio.sleep(.5)print('b')async def fun2():print('c')await asyncio.sleep(.5)print('d')asyncio.run(asyncio.wait([fun1(),fun2()]))
? ? ? ? 運行結果:?
? ? ? ? 可以看到,雖然fun1和fun2是依次放進事件列表中的,可實際運行結果,并不是abcd,而是cadb,先執行fun2后執行fun1,這里與列表在函數取參時LIFO(后入先出)有關。
? ? ? ? 在執行過程中,fun2先打印字符c后,遇到sleep,并不是原地阻塞0.5秒,而是跳出來,執行fun1,打印a,在fun1中又遇到sleep,這里又跳出來執行fun2,待fun2的掛起時間到了后,執行fun2后續代碼,打印出字符d,fun2結束后又去fun1執行,打印出字符b。
????????這就是異步執行,它并不是順序執行,而是可以暫停當前執行的流程,轉到另一個程序中去執行,之后又回到暫停位置繼續執行。
2.?協程函數和協程對象
????????由async聲明定義的函數,即是協程函數,它直接返回的就是協程對象。
? ? ? ? 協程函數要被執行,須將協程對象交給事件循環來處理,或者通過await關鍵字執行,不能直接執行協程函數。
????????注意,協程函數直接返回協程對象,指函數名()返回,并不是執行結果return返回:
import asyncio
async def fun():await asyncio.sleep(.5)print("This is a coroutine function")return "Hello world"async def main():t = fun()r = await tprint(f"Result:{r}")asyncio.run(main())##運行結果:
This is a coroutine function
Result:Hello world
? ? ? ? 示例中函數名fun()返回的是一個協程對象,它被賦值給t,通過await執行后,fun最后的返回結果放在r中,fun的return返回Hello world,因此最后打印出的r就是Hello world?
3. 事件循環
? ? ? ? 事件循環是asyncio運行的核心,是由循環調度器_event_loop_policy調度加入到Task隊列中的Task對象進行執行。
????????在上面調用asyncio.run函數的時候,該函數實際是創建一個循環調度器,并將將可Task對象放進循環中,然后等待對象執行完成,再退出。它相當于:
eventloop = asyncio.new_event_loop()
asyncio.set_event_loop(eventloop)
eventloop.run_until_complete(obj)
? ? ? ? asyncio.new_event_loop創建一個新的循環調度器,set_event_loop啟用該調度器,run_until_complete將對象obj放入調度器的Tasks隊列中,等待執行完成。
????????事件循環的本質是將所有待執行的協程對象包裝成任務對象(Task對象),將Task對象放入到事件循環隊列中,調度器執行事件循環,當某個對象遇到阻塞,就掛起轉而執行隊列中的其它對象,當掛起時間到了,又回到該對象掛起位置繼續執行。
? ? ? ? run_until_complete函數最后實際是執行loop.run_forever,run_forever是事件循環具體實現代碼,最核心的代碼是:
while True:
? ? ? ? _run_once()
? ? ? ? if _stopping:
? ? ? ? ? ? ? ? break
? ? ? ? 可見,它是一個while循環,直到_stopping置True退出,因此,實際是可以提前退出事件循環的,EventLoop提供了一個API,eventloop.stop()直接置_stopping為True提前退出事件循環?
4. 任務對象Task及Future對象
? ? ? ? 4.1 Task與Future的關系
????????asyncio.Future類是一個基類,asyncio.Task類繼承自Future。事件循環調度器中執行的對象就是任務對象(Task對象),只有Future對象(或Task對象)才能放入進事件循環隊列tasks中。
? ? ? ? 因此,上面示例中,run_until_complete接收的參數應該是Future對象,當run_until_complete傳入一個協程對象后,會先判斷是否為Future對象,如果是協程,則被wrap成Task對象(Task(coro,eventloop)),再將對象注冊(_register_task)到隊列中。
? ? ? ? 4.2 Future對象
????????它表示一個還未完成的工作結果,它有四個基本狀態:
- Pending
- Running
- Done
- Cancelled
? ? ? ? 事件循環調度器會跟著它的狀態,以判斷它是否被執行完成。Task對象中,當等待Future.result()時,也即任務結整返回,最后執行Future.set_result(value):
import asyncioasync def fun(fut:asyncio.Future):print("This is a coroutine")fut.set_result('ABCDE')async def main():eventloop = asyncio.get_event_loop()fut = eventloop.create_future()eventloop.create_task(fun(fut))r = await futprint(f'Result:{r}')asyncio.run(main())##運行結果:
This is a coroutine
Result:ABCDE
? ? ? ? 分析:asyncio.get_event_loop返回當前執行的EventLoop對象,它由asyncio.run創建。eventloop.create_future()創建一個Future對象,它什么也不做,它被當作參數傳入到協程中;evenloop.create_task()將協程wrap成Task對象,加入到事件循環調度器的任務隊列Tasks中,await fut等待fut有結果,并將結果保存在r中。
????????當協程執行完后,調用fut.set_result就會在Future對象中產生一個值(結果),這時協程返回,await fut就因為fut有結果了結束,將結果返回給r,即ABCD。
? ? ? ? 4.3 全局對象和循環事件對象
t1 = asyncio.create_task(coro)
t2 = eventloop.create_task(coro)
fut1 = asyncio.create_future()
fut2 = eventloop.create_future()
? ? ? ? ayncio和eventloop都能創建Task對象,前者是個全局Task對象,不依賴特定的事件循環,而后者則依賴當前的eventloop事件循環
5. await關鍵字
? ? ? ? await 后面跟可等待對象(awaitable,包括協程對象,任務對象,Future對象),這些對象最終都會被wrap成Task對象執行。await主要作用是暫停當前協程,等待對象執行完成,返回執行結果(函數最后的return),才回到當前協程。
import asyncioasync def fun1():print("This fun1")await asyncio.sleep(.5)print("Fun1 over")return "A fun return."
async def fun2():print("This fun2")await asyncio.sleep(.5)print("Fun2 over")
async def fun3():print("This fun3")await asyncio.sleep(.5)print("Fun3 over")async def main():eventloop = asyncio.get_event_loop()a = eventloop.create_task(fun1())b = asyncio.create_task(fun2())print(f'{await a}')await fun3()await basyncio.run(main())##運行結果:
This fun1
This fun2
Fun1 over
Fun2 over
A fun return.
This fun3
Fun3 over
? ? ? ? 示例分析:eventloop.create_task返回的是協程wrap后的Task對象,分別賦值給變量a和b,await可以作用Task對象,因此,await a得以執行,它等待a執行,直到a返回結果(字符串A fun return,通過print打印出來)。
? ? ? ? 從運行結果看,實際上await a返回時,fun2已經結束了,也就是b對象已經執行結束了,所以,await fun3執行結果并不會出現在a,b對象執行的過程中,后面的await b也不會出現執行結果,因為前面已經執行完了。
? ? ? ? 一個特別的函數:
? ? ? ? await asyncio.sleep(n)
? ? ? ? 它表示暫停當前任務的事件循環控制權,轉而執行其它異步任務,時間到了后,再回到當前協程繼續執行。
6. 異步上下文管理
? ? ? ? python中上下文管理關鍵字with,它可以自動創建資源,并在結束后自動回收資源。with作用于定義了__enter__和__exit__方法的對象。
????????__enter__():定義了進入with代碼塊執行的動作,返回值默認為None,也可以指定某個返回值,返回的接收者就是as后面的變量;
????????__exit__(exc_type, exc_value, traceback):定義了退出with代碼塊執行的動作? ? ??
class Person:def __init__(self):self.name:str = "YSZ"self.score:float = 93.7def __enter__(self):print("Person enter")return selfdef __exit__(self, exc_type, exc_value, traceback):print("Person exit.")with Person() as p:print(f"name:{p.name}")print(f"score:{p.score}")##運行結果:
Person enter
name:YSZ
score:93.7
Person exit.
? ? ? ? 異步上下文管理器是在with前面增加async關鍵字的管理器,類似的,這里作用的對象是定義了__aenter__和__aexit__兩個異步函數的對象:
import asyncioclass Person:def __init__(self):self.name:str = "YSZ"self.score:float = 93.7async def __aenter__(self):print("Person enter")return selfasync def __aexit__(self, exc_type, exc_value, traceback):print("Person exit.")async def main():async with Person() as p:print(f"name:{p.name}")print(f"score:{p.score}")asyncio.run(main())##運行結果
Person enter
name:YSZ
score:93.7
Person exit.
7.異步迭代器
? ? ? ? 一般情況下,可以用for in迭代遍歷的對象都是定義了__iter__,__next__函數的對象,其中:
????????__iter__返回一個可迭代對象iterator
????????__next__返回操作對象,賦給for后面的變量(如下例的i),for不停的迭代,直到引發一個StopIteration異常:
class Person:def __init__(self):self.index = 0self.names:tuple = ("YSZ","TMS","JYN","MOD","GXT","QES")self.name:str = Nonedef __iter__(self):return selfdef __next__(self):self.name = self.names[self.index]self.index += 1if self.index > 5:raise StopIterationreturn selffor i in Person():print(f"name:{i.name} index:{i.index}")##運行結果:
name:YSZ index:1
name:TMS index:2
name:JYN index:3
name:MOD index:4
name:GXT index:5
? ? ? ? 類似的,異步迭代器的對象是定義了__aiter__和__anext__函數的對象,其中:
????????__aiter__函數不能是異步的,它返回一個異步可迭代對象asynchronous iterator,賦給in后面的變量
????????__anext__返回對象必須是可等待(awaitable)對象,因此它必須是一個協程函數。async for不停的迭代,直到引發一個StopAsyncIteration異常停止:?
class Person:def __init__(self):self.index = 0self.names:tuple = ("YSZ","TMS","JYN","MOD","GXT","QES")self.name:str = Nonedef __aiter__(self):return selfasync def __anext__(self):self.name = self.names[self.index]self.index += 1if self.index > 5:raise StopAsyncIterationawait asyncio.sleep(.2)return self.nameasync def main():person = Person()async for i in person:print(f"name:{i}")asyncio.run(main())##運行結果:
name:YSZ
name:TMS
name:JYN
name:MOD
name:GXT
? ? ? ? 注意:async for只能在協程函數中使用
8.?asyncio的常用函數
? ? ? ? 8.1 asyncio.run
? ? ? ? 在第3點已經說明了,它實際是創建了一個新的事件循環調度器,并啟動這個調度器,然后等待傳給它的對象執行完成,它的參數接收可等待對象(協程對象,Task對象,Future對象)
????????8.2 asyncio.get_event_loop
? ? ? ? 返回:事件循環調度器
????????它會獲取當前正在運行的事件循環調度器,相當于asyncio.get_running_loop();
????????如果當前沒有正在運行的事件循環,則創建一個新的事件循環,相當于loop = asyncio.new_event_loop,同時運行這個事件循環:asyncio.set_event_loop(loop),并將這個事件循環調度器返回。
? ? ? ? 8.3 asyncio.wait
? ? ? ? 參數:一組可等待對象列表(list)
????????返回:Future對象
????????作用:等待一組可等待對象完成,最后返回任務結束的結果
? ? ? ? 例如:asyncio.run(asyncio.wait(fun1(),fun2(),fun3()),其中fun1()~fun3()是協程對象
? ? ? ? 8.4?asynco.gather
? ? ? ? 參數:一組可等待對象列表(list)
? ? ? ? 返回:函數名直接返回是Future對象,被await后,返回的是所有對象的執行結果的列表(list)
? ? ? ? 作用:將所有對象幾乎同時全部啟動,實現所有對象高并發運行,通常需要asyncio.Semphore控制并發量,以免資源被占用完:
import asyncioasync def fun(sem,n):async with sem: #每個任務執行前必須獲取信號量print(f"Fun {n} start")await asyncio.sleep(2.0)print(f"Fun {n} over")return nasync def main():sem = asyncio.Semaphore(3)tasks = [fun(sem,n) for n in range(20)]r = await asyncio.gather(*tasks)print(f"Result:{r}")asyncio.run(main())##運行結果:
Fun 0 start
Fun 1 start
Fun 2 start
Fun 0 over
Fun 1 over
Fun 2 over
Fun 3 start
...
Fun 18 start
Fun 19 start
Fun 18 over
Fun 19 over
Result:[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
? ? ? ? 示例中gather了20個任務,但是由于限制最高并發量為3,所以,總是三個任務完成后,再進行后面三個,最后的運行結果存放在r中,它是每個任務返回的值組成的列表(list)。
? ? ? ? 特別注意,每個任務在開始執行前都必須要獲取信號,因為實際事件循環是每個任務都去訪問的,但是,規定哪些任務可以執行,就需要這個信號了。
? ? ? ? 8.5?asyncio.wait_for
? ? ? ? 參數:可等待對象,超時的時間timeout
? ? ? ? 返回:可等待對象執行結束后的返回(return)
? ? ? ? 作用:通常和關鍵字await連用,表示對象必須在設定時間內執行完畢,否則拋出一個超時異常asyncio.TimeoutError
import asyncioasync def fun():print("Step1")await asyncio.sleep(.5)print("Step2")await asyncio.sleep(.5)return "Fun back"async def main():try:r = await asyncio.wait_for(fun(),timeout=0.8)print(f"Result:{r}")except asyncio.TimeoutError:print("Object execute timeout.")asyncio.run(main())##運行結果:
Step1
Step2
Object execute timeout.
? ? ? ? 示例中規定了0.8秒內要完成fun()對象,但是里面花了1.0秒,固而拋出超時異常
? ? ? ? 8.6?asyncio.create_task和loop.create_task
? ? ? ? 參數:協程對象
? ? ? ? 返回:Task對象
? ? ? ? 作用:創建一個Task對象,第4.3已經說明了它們的區別,其中loop.create_task同時將創建的對象注冊到任務隊列中了,事件循環會執行它,而asyncio.create_task則是創建一個全局對象,并沒有加到某個事件循環的隊列中,需要await執行
? ? ? ? 8.7?eventloop.call_later
? ? ? ? 參數:時間,回調函數,函數的參數
? ? ? ? 返回:TimerHandle
? ? ? ? 作用:布置一個任務,過了“時間”之后被執行
import asynciodef fun(name:str):print(f"name:{name} Hello world")async def main():eventloop = asyncio.get_event_loop()eventloop.call_later(1,fun,'YSZ')await asyncio.sleep(3)print("Over!")asyncio.run(main())##運行結果:
name:YSZ Hello world
Over!
? ? ? ? 示例中布置了一個任務,注意,并非是協程,而是普通函數,并要求在1秒后執行,需要await asyncio.sleep(3)等待,否則循環事件結束了,這個回調還沒執行。?
? ? ? ? 8.8?eventloop.stop與eventloop.close
? ? ? ? 作用:stop停止事件循環,close關閉事件循環。
? ? ? ? 在關閉事件循環之前,需要先檢查事件循環是否正在運行eventloop.is_running(),否則關不了,會觸發一個RuntimeError異常。在正在運行的事件循環中,需要先stop,再close。
? ? ? ? 8.9 event.wait和event.set
? ? ? ? 這兩個函數都是asyncio.Event成員函數,因此調用時,需要:
? ? ? ? event = asyncio.Event()
? ? ? ? event.wait()
? ? ? ? event.set()
? ? ? ? 作用:在有些時候,需要各協程間進行同步,比如有的協程初始化某些外設時間長,有的短,協程初始化外設完成時間可能不一致,而協程接下來的流程又可能用到別的協程外設資源,這時,就需要同步。
? ? ? ? 在需要同步的地方,執行event.wait()阻塞等待,當所有協程都到event.wait()后,執行一次event.set(),這時所有協程都立即從event.wait()返回,繼續后面的流程,這就實現了同步。
? ? ? ? 當需要再次event.wait()阻塞等待時,調用event.clear(),清除set()的標志,然后調用event.wait()即可再次阻塞。
import asyncioasync def fun1(event):print("來這里等著1")await event.wait()print("繼續執行1")async def fun2(event):print("來這里等著2")await event.wait()print("繼續執行2")async def fun3(event):print("模擬初始化3")await asyncio.sleep(2)print("同步其它協程3")event.set()print("繼續執行3")async def main():event = asyncio.Event()event.clear()await asyncio.gather(fun1(event),fun2(event),fun3(event))asyncio.run(main())##運行結果
來這里等著1
來這里等著2
模擬初始化3
同步其它協程3
繼續執行3
繼續執行1
繼續執行2