2. python协程/异步编程详解

目录

1. 简单的异步程序

2. 协程函数和协程对象

3. 事件循环

4. 任务对象Task及Future对象

        4.1 Task与Future的关系

        4.2 Future对象

        4.3 全局对象和循环事件对象

5. await关键字

6. 异步上下文管理

7.异步迭代器

8. asyncio的常用函数

        8.1 asyncio.run

        8.2 asyncio.get_event_loop

        8.3 asyncio.wait

        8.4 asynco.gather

        8.5 asyncio.wait_for

        8.6 asyncio.create_task和loop.create_task

        8.7 eventloop.call_later

        8.8 eventloop.stop与eventloop.close

        8.9 event.wait和event.set


        协程(coroutine):类似一种轻量级的多线程程序,是在一个线程里面,实现多任务并行执行的编程方式。相比多线程,它在切换任务时,CPU开销更低,速度更快。

        协程实现多任务并行执行实际是通过各任务异步执行来实现的。

        asyncio 是python异步代码库,是几乎所有python异步实现的基础,因此,使用python的协程功能,得先掌握asyncio库的使用。

1. 简单的异步程序

import asyncio
async def fun1():print('a')await asyncio.sleep(.5)print('b')async def fun2():print('c')await asyncio.sleep(.5)print('d')asyncio.run(asyncio.wait([fun1(),fun2()]))

        运行结果: 

        可以看到,虽然fun1和fun2是依次放进事件列表中的,可实际运行结果,并不是abcd,而是cadb,先执行fun2后执行fun1,这里与列表在函数取参时LIFO(后入先出)有关。

        在执行过程中,fun2先打印字符c后,遇到sleep,并不是原地阻塞0.5秒,而是跳出来,执行fun1,打印a,在fun1中又遇到sleep,这里又跳出来执行fun2,待fun2的挂起时间到了后,执行fun2后续代码,打印出字符d,fun2结束后又去fun1执行,打印出字符b。

        这就是异步执行,它并不是顺序执行,而是可以暂停当前执行的流程,转到另一个程序中去执行,之后又回到暂停位置继续执行。

2. 协程函数和协程对象

        由async声明定义的函数,即是协程函数,它直接返回的就是协程对象。

        协程函数要被执行,须将协程对象交给事件循环来处理,或者通过await关键字执行,不能直接执行协程函数。

        注意,协程函数直接返回协程对象,指函数名()返回,并不是执行结果return返回:

import asyncio
async def fun():await asyncio.sleep(.5)print("This is a coroutine function")return "Hello world"async def main():t = fun()r = await tprint(f"Result:{r}")asyncio.run(main())##运行结果:
This is a coroutine function
Result:Hello world

        示例中函数名fun()返回的是一个协程对象,它被赋值给t,通过await执行后,fun最后的返回结果放在r中,fun的return返回Hello world,因此最后打印出的r就是Hello world 

3. 事件循环

        事件循环是asyncio运行的核心,是由循环调度器_event_loop_policy调度加入到Task队列中的Task对象进行执行。

        在上面调用asyncio.run函数的时候,该函数实际是创建一个循环调度器,并将将可Task对象放进循环中,然后等待对象执行完成,再退出。它相当于:

eventloop = asyncio.new_event_loop()
asyncio.set_event_loop(eventloop)
eventloop.run_until_complete(obj)

        asyncio.new_event_loop创建一个新的循环调度器,set_event_loop启用该调度器,run_until_complete将对象obj放入调度器的Tasks队列中,等待执行完成。

        事件循环的本质是将所有待执行的协程对象包装成任务对象(Task对象),将Task对象放入到事件循环队列中,调度器执行事件循环,当某个对象遇到阻塞,就挂起转而执行队列中的其它对象,当挂起时间到了,又回到该对象挂起位置继续执行。

        run_until_complete函数最后实际是执行loop.run_forever,run_forever是事件循环具体实现代码,最核心的代码是:

while True:

        _run_once()

        if _stopping:

                break

        可见,它是一个while循环,直到_stopping置True退出,因此,实际是可以提前退出事件循环的,EventLoop提供了一个API,eventloop.stop()直接置_stopping为True提前退出事件循环 

4. 任务对象Task及Future对象

        4.1 Task与Future的关系

        asyncio.Future类是一个基类,asyncio.Task类继承自Future。事件循环调度器中执行的对象就是任务对象(Task对象),只有Future对象(或Task对象)才能放入进事件循环队列tasks中。

        因此,上面示例中,run_until_complete接收的参数应该是Future对象,当run_until_complete传入一个协程对象后,会先判断是否为Future对象,如果是协程,则被wrap成Task对象(Task(coro,eventloop)),再将对象注册(_register_task)到队列中。

        4.2 Future对象

        它表示一个还未完成的工作结果,它有四个基本状态:

  • Pending
  • Running
  • Done
  • Cancelled

        事件循环调度器会跟着它的状态,以判断它是否被执行完成。Task对象中,当等待Future.result()时,也即任务结整返回,最后执行Future.set_result(value):

import asyncioasync def fun(fut:asyncio.Future):print("This is a coroutine")fut.set_result('ABCDE')async def main():eventloop = asyncio.get_event_loop()fut = eventloop.create_future()eventloop.create_task(fun(fut))r = await futprint(f'Result:{r}')asyncio.run(main())##运行结果:
This is a coroutine
Result:ABCDE

        分析:asyncio.get_event_loop返回当前执行的EventLoop对象,它由asyncio.run创建。eventloop.create_future()创建一个Future对象,它什么也不做,它被当作参数传入到协程中;evenloop.create_task()将协程wrap成Task对象,加入到事件循环调度器的任务队列Tasks中,await fut等待fut有结果,并将结果保存在r中。

        当协程执行完后,调用fut.set_result就会在Future对象中产生一个值(结果),这时协程返回,await fut就因为fut有结果了结束,将结果返回给r,即ABCD。

        4.3 全局对象和循环事件对象
t1 = asyncio.create_task(coro)
t2 = eventloop.create_task(coro)
fut1 = asyncio.create_future()
fut2 = eventloop.create_future()

        ayncio和eventloop都能创建Task对象,前者是个全局Task对象,不依赖特定的事件循环,而后者则依赖当前的eventloop事件循环

5. await关键字

        await 后面跟可等待对象(awaitable,包括协程对象,任务对象,Future对象),这些对象最终都会被wrap成Task对象执行。await主要作用是暂停当前协程,等待对象执行完成,返回执行结果(函数最后的return),才回到当前协程。

import asyncioasync def fun1():print("This fun1")await asyncio.sleep(.5)print("Fun1 over")return "A fun return."
async def fun2():print("This fun2")await asyncio.sleep(.5)print("Fun2 over")
async def fun3():print("This fun3")await asyncio.sleep(.5)print("Fun3 over")async def main():eventloop = asyncio.get_event_loop()a = eventloop.create_task(fun1())b = asyncio.create_task(fun2())print(f'{await a}')await fun3()await basyncio.run(main())##运行结果:
This fun1
This fun2
Fun1 over
Fun2 over
A fun return.
This fun3
Fun3 over

        示例分析:eventloop.create_task返回的是协程wrap后的Task对象,分别赋值给变量a和b,await可以作用Task对象,因此,await a得以执行,它等待a执行,直到a返回结果(字符串A fun return,通过print打印出来)。

        从运行结果看,实际上await a返回时,fun2已经结束了,也就是b对象已经执行结束了,所以,await fun3执行结果并不会出现在a,b对象执行的过程中,后面的await b也不会出现执行结果,因为前面已经执行完了。

        一个特别的函数:

        await asyncio.sleep(n)

        它表示暂停当前任务的事件循环控制权,转而执行其它异步任务,时间到了后,再回到当前协程继续执行。

6. 异步上下文管理

        python中上下文管理关键字with,它可以自动创建资源,并在结束后自动回收资源。with作用于定义了__enter__和__exit__方法的对象。

        __enter__():定义了进入with代码块执行的动作,返回值默认为None,也可以指定某个返回值,返回的接收者就是as后面的变量;

        __exit__(exc_type, exc_value, traceback):定义了退出with代码块执行的动作      

class Person:def __init__(self):self.name:str = "YSZ"self.score:float = 93.7def __enter__(self):print("Person enter")return selfdef __exit__(self, exc_type, exc_value, traceback):print("Person exit.")with Person() as p:print(f"name:{p.name}")print(f"score:{p.score}")##运行结果:
Person enter
name:YSZ
score:93.7
Person exit.

        异步上下文管理器是在with前面增加async关键字的管理器,类似的,这里作用的对象是定义了__aenter__和__aexit__两个异步函数的对象:

import asyncioclass Person:def __init__(self):self.name:str = "YSZ"self.score:float = 93.7async def __aenter__(self):print("Person enter")return selfasync def __aexit__(self, exc_type, exc_value, traceback):print("Person exit.")async def main():async with Person() as p:print(f"name:{p.name}")print(f"score:{p.score}")asyncio.run(main())##运行结果
Person enter
name:YSZ
score:93.7
Person exit.

7.异步迭代器

        一般情况下,可以用for in迭代遍历的对象都是定义了__iter__,__next__函数的对象,其中:

        __iter__返回一个可迭代对象iterator

        __next__返回操作对象,赋给for后面的变量(如下例的i),for不停的迭代,直到引发一个StopIteration异常:

class Person:def __init__(self):self.index = 0self.names:tuple = ("YSZ","TMS","JYN","MOD","GXT","QES")self.name:str = Nonedef __iter__(self):return selfdef __next__(self):self.name = self.names[self.index]self.index += 1if self.index > 5:raise StopIterationreturn selffor i in Person():print(f"name:{i.name} index:{i.index}")##运行结果:
name:YSZ index:1
name:TMS index:2
name:JYN index:3
name:MOD index:4
name:GXT index:5

        类似的,异步迭代器的对象是定义了__aiter__和__anext__函数的对象,其中:

        __aiter__函数不能是异步的,它返回一个异步可迭代对象asynchronous iterator,赋给in后面的变量

        __anext__返回对象必须是可等待(awaitable)对象,因此它必须是一个协程函数。async for不停的迭代,直到引发一个StopAsyncIteration异常停止: 

class Person:def __init__(self):self.index = 0self.names:tuple = ("YSZ","TMS","JYN","MOD","GXT","QES")self.name:str = Nonedef __aiter__(self):return selfasync def __anext__(self):self.name = self.names[self.index]self.index += 1if self.index > 5:raise StopAsyncIterationawait asyncio.sleep(.2)return self.nameasync def main():person = Person()async for i in person:print(f"name:{i}")asyncio.run(main())##运行结果:
name:YSZ
name:TMS
name:JYN
name:MOD
name:GXT

        注意:async for只能在协程函数中使用

8. asyncio的常用函数

        8.1 asyncio.run

        在第3点已经说明了,它实际是创建了一个新的事件循环调度器,并启动这个调度器,然后等待传给它的对象执行完成,它的参数接收可等待对象(协程对象,Task对象,Future对象)

        8.2 asyncio.get_event_loop

        返回:事件循环调度器

        它会获取当前正在运行的事件循环调度器,相当于asyncio.get_running_loop();

        如果当前没有正在运行的事件循环,则创建一个新的事件循环,相当于loop = asyncio.new_event_loop,同时运行这个事件循环:asyncio.set_event_loop(loop),并将这个事件循环调度器返回。

        8.3 asyncio.wait

        参数:一组可等待对象列表(list)

        返回:Future对象

        作用:等待一组可等待对象完成,最后返回任务结束的结果

        例如:asyncio.run(asyncio.wait(fun1(),fun2(),fun3()),其中fun1()~fun3()是协程对象

        8.4 asynco.gather

        参数:一组可等待对象列表(list)

        返回:函数名直接返回是Future对象,被await后,返回的是所有对象的执行结果的列表(list)

        作用:将所有对象几乎同时全部启动,实现所有对象高并发运行,通常需要asyncio.Semphore控制并发量,以免资源被占用完:

import asyncioasync def fun(sem,n):async with sem: #每个任务执行前必须获取信号量print(f"Fun {n} start")await asyncio.sleep(2.0)print(f"Fun {n} over")return nasync def main():sem = asyncio.Semaphore(3)tasks = [fun(sem,n) for n in range(20)]r = await asyncio.gather(*tasks)print(f"Result:{r}")asyncio.run(main())##运行结果:
Fun 0 start
Fun 1 start
Fun 2 start
Fun 0 over
Fun 1 over
Fun 2 over
Fun 3 start
...
Fun 18 start
Fun 19 start
Fun 18 over
Fun 19 over
Result:[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

        示例中gather了20个任务,但是由于限制最高并发量为3,所以,总是三个任务完成后,再进行后面三个,最后的运行结果存放在r中,它是每个任务返回的值组成的列表(list)。

        特别注意,每个任务在开始执行前都必须要获取信号,因为实际事件循环是每个任务都去访问的,但是,规定哪些任务可以执行,就需要这个信号了。

        8.5 asyncio.wait_for

        参数:可等待对象,超时的时间timeout

        返回:可等待对象执行结束后的返回(return)

        作用:通常和关键字await连用,表示对象必须在设定时间内执行完毕,否则抛出一个超时异常asyncio.TimeoutError

import asyncioasync def fun():print("Step1")await asyncio.sleep(.5)print("Step2")await asyncio.sleep(.5)return "Fun back"async def main():try:r = await asyncio.wait_for(fun(),timeout=0.8)print(f"Result:{r}")except asyncio.TimeoutError:print("Object execute timeout.")asyncio.run(main())##运行结果:
Step1
Step2
Object execute timeout.

        示例中规定了0.8秒内要完成fun()对象,但是里面花了1.0秒,固而抛出超时异常

        8.6 asyncio.create_task和loop.create_task

        参数:协程对象

        返回:Task对象

        作用:创建一个Task对象,第4.3已经说明了它们的区别,其中loop.create_task同时将创建的对象注册到任务队列中了,事件循环会执行它,而asyncio.create_task则是创建一个全局对象,并没有加到某个事件循环的队列中,需要await执行

        8.7 eventloop.call_later

        参数:时间,回调函数,函数的参数

        返回:TimerHandle

        作用:布置一个任务,过了“时间”之后被执行

import asynciodef fun(name:str):print(f"name:{name}  Hello world")async def main():eventloop = asyncio.get_event_loop()eventloop.call_later(1,fun,'YSZ')await asyncio.sleep(3)print("Over!")asyncio.run(main())##运行结果:
name:YSZ  Hello world
Over!

        示例中布置了一个任务,注意,并非是协程,而是普通函数,并要求在1秒后执行,需要await asyncio.sleep(3)等待,否则循环事件结束了,这个回调还没执行。 

        8.8 eventloop.stop与eventloop.close

        作用:stop停止事件循环,close关闭事件循环。

        在关闭事件循环之前,需要先检查事件循环是否正在运行eventloop.is_running(),否则关不了,会触发一个RuntimeError异常。在正在运行的事件循环中,需要先stop,再close。

        8.9 event.wait和event.set

        这两个函数都是asyncio.Event成员函数,因此调用时,需要:

        event = asyncio.Event()

        event.wait()

        event.set()

        作用:在有些时候,需要各协程间进行同步,比如有的协程初始化某些外设时间长,有的短,协程初始化外设完成时间可能不一致,而协程接下来的流程又可能用到别的协程外设资源,这时,就需要同步。

        在需要同步的地方,执行event.wait()阻塞等待,当所有协程都到event.wait()后,执行一次event.set(),这时所有协程都立即从event.wait()返回,继续后面的流程,这就实现了同步。

        当需要再次event.wait()阻塞等待时,调用event.clear(),清除set()的标志,然后调用event.wait()即可再次阻塞。

import asyncioasync def fun1(event):print("来这里等着1")await event.wait()print("继续执行1")async def fun2(event):print("来这里等着2")await event.wait()print("继续执行2")async def fun3(event):print("模拟初始化3")await asyncio.sleep(2)print("同步其它协程3")event.set()print("继续执行3")async def main():event = asyncio.Event()event.clear()await asyncio.gather(fun1(event),fun2(event),fun3(event))asyncio.run(main())##运行结果
来这里等着1
来这里等着2
模拟初始化3
同步其它协程3
继续执行3
继续执行1
继续执行2

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/903498.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

智慧园区IOT项目与AI时代下的机遇 - Java架构师面试实战

在互联网大厂的Java求职者面试中,面试官通常会针对实际业务场景提出一系列问题。以下是关于智慧园区IOT项目及AI时代下的机遇的面试模拟对话。 第一轮提问 面试官:马架构,请简要介绍下智慧园区IOT项目的整体架构设计。 马架构:…

论文导读 - 基于特征融合的电子鼻多任务深度学习模型研究

基于特征融合的电子鼻多任务深度学习模型研究 原论文地址:https://www.sciencedirect.com/science/article/pii/S0925400524009365 引用此论文(GB/T 7714-2015): NI W, WANG T, WU Y, et al. Multi-task deep learning model f…

AI超级智能体项目教程(二)---后端项目初始化(设计knif4j接口文档的使用)

文章目录 1.选择JDK的版本和相关配置2.添加依赖信息2.1指定lombok版本信息2.2引入hutool工具类2.3了解knif4j依赖2.4引入knif4j依赖 3.contrller测试3.1完成yml文件配置3.2修改默认扫描路径3.3controller具体的内容3.4配置接口和访问路径3.5如何访问3.6调试接口3.6调试接口 1.选…

linux blueZ 第四篇:BLE GATT 编程与自动化——Python 与 C/C++ 实战

本篇聚焦 BLE(Bluetooth Low Energy)GATT 协议层的编程与自动化实践,涵盖 GATT 基础、DBus API 原理、Python(dbus-next/bleak)示例、C/C++ (BlueZ GATT API)示例,以及自动发现、读写特征、订阅通知、安全配对与脚本化测试。 目录 BLE GATT 基础概念 BlueZ DBus GATT 模…

kafka与flume的整合、spark-streaming

kafka与flume的整合 前期配置完毕,开启集群 需求1: 利用flume监控某目录中新生成的文件,将监控到的变更数据发送给kafka,kafka将收到的数据打印到控制台(三个node01中运行) 1.在kafka中建立topic kafka…

redis高级进阶

1.redis主从复制 redis主从复制1 2.redis哨兵模式 哔哩哔哩视频 redis哨兵模式1 redis哨兵模式2 redis哨兵模式3 3.redis分片集群 redis分片集群1 redis分片集群2 redis分片集群3

uniapp: 低功耗蓝牙(BLE)的使用

在微信小程序中实现蓝牙对接蓝牙秤的重量功能,主要依赖微信小程序提供的低功耗蓝牙(BLE)API。以下是一个清晰的步骤指南,帮助你完成从连接蓝牙秤到获取重量数据的开发流程。需要注意的是,具体实现可能因蓝牙秤的协议和…

3D架构图软件 iCraft Editor 正式发布 @icraft/player-react 前端组件, 轻松嵌入3D架构图到您的项目

安装 pnpm install icraft/player-react --saveimport { ICraftPlayer } from "icraft/player-react";export default function MyScene() {return <ICraftPlayer srcyour-scene.iplayer />; }icraft/player-react 为开发者提供了一站式的3D数字孪生可视化解决…

云数据中心整体规划方案PPT(113页)

1. 引言 概述&#xff1a;云数据中心整体规划方案旨在构建弹性、高效的云计算基础设施&#xff0c;通过软件定义数据中心&#xff08;SDDC&#xff09;实现资源虚拟化与管理自动化。 2. 技术趋势与背景 技术革新&#xff1a;随着云计算、虚拟化及自动化技术的发展&#xff0c…

(六)机器学习---聚类与K-means

到本篇文章&#xff0c;我们先对前几篇所学习的算法进行一个回顾&#xff1a; 而本篇文章我们将会介绍聚类以及K-means算法。 分类问题回归问题聚类问题各种复杂问题决策树√线性回归√K-means√神经网络√逻辑回归√岭回归密度聚类深度学习√集成学习√Lasso回归谱聚类条件随机…

在html中如何创建vue自定义组件(以自定义文件上传组件为例,vue2+elementUI)

1、先上代码&#xff1a;vueUpload.js var dom <div class"upload-file"><el-upload :action"uploadFileUrl" :before-upload"handleBeforeUpload" :file-list"fileList" :limit"limit":on-error"handleUpl…

计算机基础:二进制基础14,二进制加法

专栏导航 本节文章分别属于《Win32 学习笔记》和《MFC 学习笔记》两个专栏&#xff0c;故划分为两个专栏导航。读者可以自行选择前往哪个专栏。 &#xff08;一&#xff09;WIn32 专栏导航 上一篇&#xff1a;计算机基础&#xff1a;二进制基础13&#xff0c;十六进制与二进…

可视化图解算法: 判断是不是二叉搜索树(验证二叉搜索树)

1. 题目 描述 给定一个二叉树根节点&#xff0c;请你判断这棵树是不是二叉搜索树。 二叉搜索树满足每个节点的左子树上的所有节点的值均严格小于当前节点的值&#xff1b;并且右子树上的所有节点的值均严格大于当前节点的值。 数据范围&#xff1a;节点数量满足 1≤n≤10^4…

Markdown转WPS office工具pandoc实践笔记

随着DeepSeek、文心一言、讯飞星火等AI工具快速发展&#xff0c;其输出网页内容拷贝到WPS Office过程中&#xff0c;文档编排规整的格式很难快速复制。 注&#xff1a;WPS Office不支持Markdown格式&#xff0c;无法识别式样。 在这里推荐个免费开源工具Pandoc&#xff0c;实现…

python的turtle库实现四叶草

实现代码&#xff1a; import turtle turtle.pencolor(‘green’) turtle.fillcolor(‘green’) turtle.begin_fill() turtle.circle(100,90) turtle.left(90) turtle.circle(100,90) turtle.right(180) turtle.circle(100, 90) turtle.left(90) turtle.circle(100,90) tu…

北重数控滑台加工厂家:汽车零部件试验铁地板-安全性能的测试方法

汽车零部件的安全性能测试是非常重要的&#xff0c;其中铁地板测试是其中的一种常见测试方法之一。铁地板测试主要用于评估汽车零部件在发生碰撞或事故时的安全性能&#xff0c;以确保零部件在各种情况下都能提供有效的保护和安全性能。 铁地板测试通常包括以下步骤和方法&…

Linux0.11系统调用:预备知识

系统调用 预备知识 目标&#xff1a;了解系统调用的流程&#xff0c;在Linux 0.11上添加两个系统调用&#xff0c;并编写两个简单的应用程序测试它们。 对应章节&#xff1a;同济大学赵炯博士的《Linux内核0.11完全注释&#xff08;修正版V3.0&#xff09;》的第5.5节 下面就针…

如何防止 ES 被 Linux OOM Killer 杀掉

当 Linux 系统内存不足时&#xff0c;内核会找出一个进程 kill 掉它释放内存&#xff0c;旨在保障整个系统不至于崩溃。如果 ES 按照最佳实践去实施部署&#xff0c;会保留一半的内存&#xff0c;不至于发生此类事情。但事情总有例外&#xff0c;有的朋友可能 ES 和其他的程序部…

swagger2升级至openapi3的利器--swagger2openapi

背景&#xff1a; 因为项目需要升级JDK&#xff0c;涉及到swagger2升级至openapi3的情况。由于swagger 2和openapi 3的语法差距太大&#xff0c;需要对yaml进行升级。无奈单个yaml文件的内容太大&#xff0c;高至4万多行&#xff0c;手动进行语法的转换肯定是不可能了&#xff…

在yolo中Ultralytics是什么意思呢?超越分析的智能

在YOLO&#xff08;You Only Look Once&#xff09;目标检测框架中&#xff0c;Ultralytics 是一家专注于计算机视觉和机器学习技术的公司&#xff0c;同时也是YOLO系列模型&#xff08;如YOLOv5、YOLOv8等&#xff09;的官方开发和维护团队。以下是关键点解析&#xff1a; 1. …