协程(Coroutine)是允许执行被挂起、恢复、以及取消的程序。Python 3 中最初是使用 @asyncio.coroutine 装饰器和 yield from 关键字组合来实现协程。单词 yield 在这里并非在生成器(Generator)中所表示的“产出”,而是交通标志中所表达的“让步”之意。其实在生成器中也包含“让步”的意思,即把执行权交给调用者,生成器暂缓执行,等待调用者对生成的结果处理完成,再恢复生成器的执行。在异步程序中,yield 则是把当前的执行权交给事件循环中的其它协程。Python 3.5 开始 async/await 被引入,Python 3.7 开始成为保留关键字,让协程的使用更加方便和直观。本文使用 Python 3.8。
使用 async def 定义一个协程:
async def main():print('hello coroutine')协程具体可以分为使用 async def 定义的协程函数和调用协程函数返回的协程对象。调用一个协程函数并不会执行协程中的程序,而是只返回一个协程对象。
asyncio 提供了三种方式来执行协程:
1. 使用 asyncio.run()
run() 函数接收一个协程对象,在执行时,总会创建一个新的事件循环,并在结束后关闭循环。理想情况下,run() 函数应当被作为程序的总入口,并且只会被调用一次。如果同一线程中还有其它事件循环在运行,则此方法不能被调用。
async def main():print('hello coroutine')asyncio.run(main())2. 使用 await 等待一个协程
await 的作用和 yield from 相同,即让出当前的执行权,等待的对象有结果返回时,再重新获得可以被继续执行的权利。
只有可等待对象(Awaitable object)才能被 await。除协程(Coroutine)外,asyncio 还提供了两种可等待对象:任务(Task)和期货(Future)。
async def main():print('hello')await asyncio.sleep(1)await asyncio.sleep(2)print('coroutine')上述程序在打印出 hello 后会等待 3 秒再打印出 coroutine。存在多个 await 时,会依次顺序执行,因为有两条 sleep() 语句,分别等待 1 秒和 2 秒,所以一共用时 3 秒。
3. 使用 asyncio.create_task() 创建 Task
create_task() 会把一个协程打包成一个任务(Task),并立即排入日程准备执行,函数返回值是打包完成的 Task 对象。
async def foo(n):await asyncio.sleep(n)async def main():task1 = asyncio.create_task(foo(1))task2 = asyncio.create_task(foo(2))print('hello')await task1await task2print('coroutine')当使用 create_task() 时,创建的任务立即被加入到事件循环中,并不会阻塞当前的程序,所以上述程序在打印出 hello 后只需等待 2 秒就打印出 coroutine。
如上面所介绍,使用 create_task() 可以并发执行程序。asyncio 同时提供了几个函数用于方便地实现多任务并发执行:
1. asyncio.gather(*aws, return_exceptions=False)
gather() 函数接受传入多个可等待对象 aws,如果某个可等待对象是协程,则会被自动打包成 Task。gather() 返回结果是和 aws 传入顺序一致的列表。gather() 同时可以传入参数 return_exceptions 来处理异常,默认值为 False。如果为 False 时,执行过程中引发的首个异常会立即返回给等待 gather() 的任务,await 会直接结束等待并抛出异常,但是其它正常执行的 Task 不会被取消,这种情况适用于确保任务尽可能被执行完成,但是不关心返回结果,因为如果有任何一个任务出现异常,返回结果列表就不会顺利生成。如果把 return_exceptions 设为 True,异常会和正常结果一同被聚合进最终结果列表,适用于对结果有需求应用场景。
async def foo():return 'foo'async def bar():raise RuntimeError('fake runtime error')async def main():task1 = asyncio.create_task(foo())task2 = asyncio.create_task(bar())# return_exceptions=Trueresults = await asyncio.gather(task1, task2, return_exceptions=True)# 输出: ['foo', RuntimeError('fake runtime error')]print(results)# 返回结果的顺序和传参顺序一致assert isinstance(results[1], RuntimeError)# return_exceptions=Falsetry:results = await asyncio.gather(task1, task2, return_exceptions=False)# 此处打印并不会被执行, results 也未被赋值print(results)except RuntimeError as runtime_err:# 捕获异常并打印: fake runtime errorprint(runtime_err)2. asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED)
wait() 接受 aws 任务集合传入, 然后并发执行。参数 return_when 用来控制返回条件,当 return_when=ALL_COMPLETED 时,会在所有任务完成后返回结果;当 return_when=FIRST_COMPLETED 时,任务集合中有一个任务完成就立即返回结果。timeout 参数用来控制超时时间,可以是整数或浮点数,以秒为单位。wait() 的返回值是 (done, pending) 元组,done 中包含运行完成的任务,pending 中包含未完成被挂起的任务。
async def foo():await asyncio.sleep(3)return 'foo'async def bar():await asyncio.sleep(1)return 'bar'async def main():# 有一个任务执行完成即返回, 总共耗时 1 秒done, pending = await asyncio.wait({foo(), bar()}, return_when=asyncio.FIRST_COMPLETED)# done 集合里包含打包成 Task 的 bar()print(f'done: {done}')# pendding 集合里包含打包成 Task 的 foo()print(f'pending: {pending}')# 所有任务执行完成后返回, 总共耗时 3 秒done, pending = await asyncio.wait({foo(), bar()}, return_when=asyncio.ALL_COMPLETED)# done 集合里包含被带打包成 Task 的 foo() 和 bar()print(f'done: {done}')# pending 集合为空print(f'pending: {pending}')# 所有任务执行完成, 但运行时间不能超 2 秒后返回, 总共耗时 2 秒done, pending = await asyncio.wait({foo(), bar()}, timeout=2, return_when=asyncio.ALL_COMPLETED)# done 集合里包含打包成 Task 的 bar()print(f'done: {done}')# pendding 集合里包含打包成 Task 的 foo()print(f'pending: {pending}')3. asyncio.as_completed(aws)
as_completed() 接受 aws 集合,然后返回一个 Future 迭代器,遍历这个迭代器会依次遍历剩余可等待对象集合中最早完成的结果。
async def foo():await asyncio.sleep(2)return 'foo'async def bar():await asyncio.sleep(1)return 'bar'async def main():for fut in asyncio.as_completed({foo(), bar()}):earliest_result = await fut# 会依次打印 bar 和 foo, 因为 bar() 会更早执行完毕print(earliest_result)上面介绍多任务并发时引入了超时的概念,超时也可以被应用在单独的一个任务中,使用 asyncio.wait_for(aw, timeout) 函数,该函数接受一个任务 aw 和超时时间 timeout,如果在限制时间内完成,则会正常返回,否则会被取消并抛出 asyncio.TimeoutError 异常。
为了防止任务被取消,可以使用 asyncio.shield(aw) 进行保护。shield() 会屏蔽外部取消操作,如果外部任务被取消,其内部正在执行的任务不会被取消,在内部看来取消操作并没有发生,由于内部仍正常执行,执行完毕后会触发 asyncio.CancelledError 异常,如果确保程序能忽略异常继续执行,需要在外部使用 try-except 捕获异常。如果在任务内部取消,则会被成功取消。