python asyncio 并发编程_asyncio并发编程

一. 事件循环

1.注:

实现搭配:事件循环+回调(驱动生成器【协程】)+epoll(IO多路复用),asyncio是Python用于解决异步编程的一整套解决方案;

基于asynico:tornado,gevent,twisted(Scrapy,django channels),tornado(实现了web服务器,可以直接部署,真正部署还是要加nginx),django,flask(uwsgi,gunicorn+nginx部署)

1 importasyncio2 importtime3 async defget_html(url):4 print('start get url')5 #不能直接使用time.sleep,这是阻塞的函数,如果使用time在并发的情况有多少个就有多少个2秒

6 await asyncio.sleep(2)7 print('end get url')8 if __name__=='__main__':9 start_time=time.time()10 loop=asyncio.get_event_loop()11 task=[get_html('www.baidu.com') for i in range(10)]12 loop.run_until_complete(asyncio.wait(task))13 print(time.time()-start_time)

View Code

2.如何获取协程的返回值(和线程池类似):

1 importasyncio2 importtime3 from functools importpartial4 async defget_html(url):5 print('start get url')6 await asyncio.sleep(2)7 print('end get url')8 return "HAHA"

9 #需要接收task,如果要接收其他的参数就需要用到partial(偏函数),参数需要放到前面

10 defcallback(url,future):11 print(url+'success')12 print('send email')13 if __name__=='__main__':14 loop=asyncio.get_event_loop()15 task=loop.create_task(get_html('www.baidu.com'))16 #原理还是获取event_loop,然后调用create_task方法,一个线程只有一个loop

17 #get_future=asyncio.ensure_future(get_html('www.baidu.com'))也可以

18 #loop.run_until_complete(get_future)

19 #run_until_complete可以接收future类型,task类型(是future类型的一个子类),也可以接收可迭代类型

20 task.add_done_callback(partial(callback,'www.baidu.com'))21 loop.run_until_complete(task)22 print(task.result())

View Code

3.wait和gather的区别:

3.1wait简单使用:

1 importasyncio2 importtime3 from functools importpartial4 async defget_html(url):5 print('start get url')6 await asyncio.sleep(2)7 print('end get url')8

9 if __name__=='__main__':10 loop=asyncio.get_event_loop()11 tasks=[get_html('www.baidu.com') for i in range(10)]12 #wait和线程的wait相似

13 loop.run_until_complete(asyncio.wait(tasks))

View Code

协程的wait和线程的wait相似,也有timeout,return_when(什么时候返回)等参数

3.2gather简单使用:

1 importasyncio2 importtime3 from functools importpartial4 async defget_html(url):5 print('start get url')6 await asyncio.sleep(2)7 print('end get url')8

9 if __name__=='__main__':10 loop=asyncio.get_event_loop()11 tasks=[get_html('www.baidu.com') for i in range(10)]12 #gather注意加*,这样就会变成参数

13 loop.run_until_complete(asyncio.gather(*tasks))

View Code

3.3gather和wait的区别:(定制性不强时可以优先考虑gather)

gather更加高层,可以将tasks分组;还可以成批的取消任务

1 importasyncio2 importtime3 from functools importpartial4 async defget_html(url):5 print('start get url')6 await asyncio.sleep(2)7 print('end get url')8

9 if __name__=='__main__':10 loop=asyncio.get_event_loop()11 groups1=[get_html('www.baidu.com') for i in range(10)]12 groups2=[get_html('www.baidu.com') for i in range(10)]13 #gather注意加*,这样就会变成参数

14 loop.run_until_complete(asyncio.gather(*groups1,*groups2))15 #这种方式也可以

16 #groups1 = [get_html('www.baidu.com') for i in range(10)]

17 #groups2 = [get_html('www.baidu.com') for i in range(10)]

18 #groups1=asyncio.gather(*groups1)

19 #groups2=asyncio.gather(*groups2)

20 #取消任务

21 #groups2.cancel()

22 #loop.run_until_complete(asyncio.gather(groups1,groups2))

View Code

二. 协程嵌套

1.run_util_complete()源码:和run_forever()区别并不大,只是可以在运行完指定的协程后可以把loop停止掉,而run_forever()不会停止

2.loop会被放在future里面,future又会放在loop中

3.取消future(task):

3.1子协程调用原理:

官网例子:

解释: await相当于yield from,loop运行协程print_sum(),print_sum又会去调用另一个协程compute,run_util_complete会把协程print_sum注册到loop中。

1.event_loop会为print_sum创建一个task,通过驱动task执行print_sum(task首先会进入pending【等待】的状态);

2.print_sum直接进入字协程的调度,这个时候转向执行另一个协程(compute,所以print_sum变为suspended【暂停】状态);

3.compute这个协程首先打印,然后去调用asyncio的sleep(此时compute进入suspende的状态【暂停】),直接把返回值返回给Task(没有经过print_sum,相当于yield from,直接在调用方和子生成器通信,是由委托方print_sum建立的通道);

4.Task会告诉Event_loop暂停,Event_loop等待一秒后,通过Task唤醒(越过print_sum和compute建立一个通道);

5.compute继续执行,变为状态done【执行完成】,然后抛一个StopIteration的异常,会被await语句捕捉到,然后提取出1+2=3的值,进入print_sum,print_sum也被激活(因为抛出了StopIteration的异常被print_sum捕捉),print_sum执行完也会被标记为done的状态,同时抛出StopIteration会被Task接收

三. call_soon、call_later、call_at、call_soon_threadsafe

1.call_soon:可以直接接收函数,而不用协程

1 importasyncio2 #函数

3 defcallback(sleep_time):4 print('sleep {} success'.format(sleep_time))5 #通过该函数暂停

6 defstoploop(loop):7 loop.stop()8 if __name__=='__main__':9 loop=asyncio.get_event_loop()10 #可以直接传递函数,而不用协程,call_soon其实就是调用的call_later,时间为0秒

11 loop.call_soon(callback,2)12 loop.call_soon(stoploop,loop)13 #不能用run_util_complete(因为不是协程),run_forever找到call_soon一直运行

14 loop.run_forever()

View Code

2.call_later:可以指定多长时间后启动(实际调用call_at,时间不是传统的时间,而是loop内部的时间)

1 importasyncio2 #函数

3 defcallback(sleep_time):4 print('sleep {} success'.format(sleep_time))5 #通过该函数暂停

6 defstoploop(loop):7 loop.stop()8 if __name__=='__main__':9 loop=asyncio.get_event_loop()10 loop.call_later(3,callback,1)11 loop.call_later(1, callback, 2)12 loop.call_later(1, callback, 2)13 loop.call_later(1, callback, 2)14 loop.call_soon(callback,4)15 #loop.call_soon(stoploop,loop)

16 #不能用run_util_complete(因为不是协程),run_forever找到call_soon一直运行

17 loop.run_forever()

View Code

3.call_at:指定某个时间执行

1 importasyncio2 #函数

3 defcallback(sleep_time):4 print('sleep {} success'.format(sleep_time))5 #通过该函数暂停

6 defstoploop(loop):7 loop.stop()8 if __name__=='__main__':9 loop=asyncio.get_event_loop()10 now=loop.time()11 print(now)12 loop.call_at(now+3,callback,1)13 loop.call_at(now+1, callback, 0.5)14 loop.call_at(now+1, callback, 2)15 loop.call_at(now+1, callback, 2)16 #loop.call_soon(stoploop,loop)

17 #不能用run_util_complete(因为不是协程),run_forever找到call_soon一直运行

18 loop.run_forever()

View Code

4.call_soon_threadsafe:

线程安全的方法,不仅能解决协程,也能解决线程,进程,和call_soon几乎一致,多了self._write_to_self(),和call_soon用法一致

四. ThreadPoolExecutor+asyncio(线程池和协程结合)

1.使用run_in_executor:就是把阻塞的代码放进线程池运行,性能并不是特别高,和多线程差不多

1 #使用多线程,在协程中集成阻塞io

2 importasyncio3 importsocket4 from urllib.parse importurlparse5 from concurrent.futures importThreadPoolExecutor6 importtime7 defget_url(url):8 #通过socket请求html

9 url=urlparse(url)10 host=url.netloc11 path=url.path12 if path=="":13 path="/"

14 #建立socket连接

15 client=socket.socket(socket.AF_INET,socket.SOCK_STREAM)16 client.connect((host,80))17 #向服务器发送数据

18 client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))19 #将数据读取完

20 data=b""

21 whileTrue:22 d=client.recv(1024)23 ifd:24 data+=d25 else:26 break

27 #会将header信息作为返回字符串

28 data=data.decode('utf8')29 print(data.split('\r\n\r\n')[1])30 client.close()31

32 if __name__=='__main__':33 start_time=time.time()34 loop=asyncio.get_event_loop()35 excutor=ThreadPoolExecutor()36 tasks=[]37 for i in range(20):38 task=loop.run_in_executor(excutor,get_url,'http://www.baidu.com')39 tasks.append(task)40 loop.run_until_complete(asyncio.wait(tasks))41 print(time.time()-start_time)

View Code

五. asyncio模拟http请求

注:asyncio目前没有提供http协议的接口

1 #asyncio目前没有提供http协议的接口

2 importasyncio3 from urllib.parse importurlparse4 importtime5

6

7 async defget_url(url):8 #通过socket请求html

9 url =urlparse(url)10 host =url.netloc11 path =url.path12 if path == "":13 path = "/"

14 #建立socket连接(比较耗时),非阻塞需要注册,都在open_connection中实现了

15 reader, writer = await asyncio.open_connection(host, 80)16 #向服务器发送数据,unregister和register都实现了

17 writer.write("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))18 #读取数据

19 all_lines =[]20 #源码实现较复杂,有__anext__的魔法函数(协程)

21 async for line inreader:22 data = line.decode('utf8')23 all_lines.append(data)24 html = '\n'.join(all_lines)25 returnhtml26

27

28 async defmain():29 tasks =[]30 for i in range(20):31 url = "http://www.baidu.com/"

32 tasks.append(asyncio.ensure_future(get_url(url)))33 for task inasyncio.as_completed(tasks):34 result =await task35 print(result)36

37

38 if __name__ == '__main__':39 start_time =time.time()40 loop =asyncio.get_event_loop()41 #tasks=[get_url('http://www.baidu.com') for i in range(10)]

42 #在外部获取结果,保存为future对象

43 #tasks = [asyncio.ensure_future(get_url('http://www.baidu.com')) for i in range(10)]

44 #loop.run_until_complete(asyncio.wait(tasks))

45 #for task in tasks:

46 #print(task.result())

47 #执行完一个打印一个

48 loop.run_until_complete(main())49 print(time.time() - start_time)

View Code

六. future和task

1.future:协程中的future和线程池中的future相似

future中的方法,都和线程池中的相似

set_result方法

不像线程池中运行完直接运行代码(这是单线程的,会调用call_soon方法)

2.task:是future的子类,是future和协程之间的桥梁

会首先启动_step方法

该方法会首先启动协程,把返回值(StopIteration的值)做处理,用于解决协程和线程不一致的地方

七. asyncio同步和通信

1.单线程协程不需要锁:

1 importasyncio2 total=03 async defadd():4 globaltotal5 for i in range(1000000):6 total+=1

7

8

9 async defdecs():10 globaltotal11 for i in range(1000000):12 total-=1

13 if __name__=='__main__':14 loop=asyncio.get_event_loop()15 tasks=[add(),decs()]16 loop.run_until_complete(asyncio.wait(tasks))17 print(total)

View Code

2.某种情况需要锁:

asyncio中的锁(同步机制)

1 importasyncio,aiohttp2 #这是并没有调用系统的锁,只是简单的自己实现(注意是非阻塞的),Queue也是非阻塞的,都用了yield from,不用用到condition【单线程】】

3 #Queue还可以限流,如果只需要通信还可以直接使用全局变量否则可以

4 from asyncio importLock,Queue5 catche={}6 lock=Lock()7 async defget_stuff():8 #实现了__enter__和__exit__两个魔法函数,可以用with

9 #with await lock:

10 #更明确的语法__aenter__和__await__

11 async with lock:12 #注意加await,是一个协程

13 #await lock.acquire()

14 for url incatche:15 returncatche[url]16 #异步的接收

17 stauff=aiohttp.request('Get',url)18 catche[url]=stauff19 returnstauff20 #release是一个简单的函数

21 #lock.release()

22

23 async defparse_stuff():24 stuff=await get_stuff()25

26 async defuse_stuff():27 stuff=await get_stuff()28 #如果没有同步机制,就会发起两次请求(这里就可以加一个同步机制)

29 tasks=[parse_stuff(),use_stuff()]30 loop=asyncio.get_event_loop()31 loop.run_until_complete(asyncio.wait(tasks))

View Code

八. aiohttp实现高并发爬虫

1 #asyncio去重url,入库(异步的驱动aiomysql)

2 importaiohttp3 importasyncio4 importre5 importaiomysql6 from pyquery importpyquery7

8 start_url = 'http://www.jobbole.com/'

9 waiting_urls =[]10 seen_urls =[]11 stopping =False12 #限制并发数

13 sem=asyncio.Semaphore(3)14

15

16 async deffetch(url, session):17 async with sem:18 await asyncio.sleep(1)19 try:20 async with session.get(url) as resp:21 print('url_status:{}'.format(resp.status))22 if resp.status in [200, 201]:23 data =await resp.text()24 returndata25 exceptException as e:26 print(e)27

28

29 defextract_urls(html):30 '''

31 解析无io操作32 '''

33 urls =[]34 pq =pyquery(html)35 for link in pq.items('a'):36 url = link.attr('href')37 if url and url.startwith('http') and url not inurls:38 urls.append(url)39 waiting_urls.append(url)40 returnurls41

42

43 async definit_urls(url, session):44 html =await fetch(url, session)45 seen_urls.add(url)46 extract_urls(html)47

48

49 async defhandle_article(url, session, pool):50 '''

51 处理文章52 '''

53 html =await fetch(url, session)54 seen_urls.append(url)55 extract_urls(html)56 pq =pyquery(html)57 title = pq('title').text()58 async with pool.acquire() as conn:59 async with conn.cursor() as cur:60 insert_sql = "insert into Test(title) values('{}')".format(title)61 await cur.execute(insert_sql)62

63

64 async defconsumer(pool):65 with aiohttp.CLientSession() as session:66 while notstopping:67 if len(waiting_urls) ==0:68 await asyncio.sleep(0.5)69 continue

70 url =waiting_urls.pop()71 print('start url:' + 'url')72 if re.match('http://.*?jobble.com/\d+/', url):73 if url not inseen_urls:74 asyncio.ensure_future(handle_article(url, session, pool))75 await asyncio.sleep(30)76 else:77 if url not inseen_urls:78 asyncio.ensure_future(init_urls(url, session))79

80

81 async defmain():82 #等待mysql连接好

83 pool = aiomysql.connect(host='localhost', port=3306, user='root',84 password='112358', db='my_aio', loop=loop, charset='utf8', autocommit=True)85 async with aiohttp.CLientSession() as session:86 html =await fetch(start_url, session)87 seen_urls.add(start_url)88 extract_urls(html)89 asyncio.ensure_future(consumer(pool))90

91 if __name__ == '__main__':92 loop =asyncio.get_event_loop()93 asyncio.ensure_future(loop)94 loop.run_forever(main(loop))

View Code

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/550203.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ORM多表操作之多对多查询

创建多对多的关系 authormodels.ManyToManyFleld(" ")  (推荐) 书籍对象它的所有关联作者 book_obj.authors.all() 掌握:通过filter values(双下划线)进行多对多的关联查询(形式一对多) django是将python语句翻译成sql语句执行 聚…

计算机的iscsi配置,我们将了解如何设置自己的支持iscsi配置的存储节点

iSCSI代表Internet小型计算机系统接口。它用于使用块级数据传输通过TCP / IP访问网络上的存储。NFS与iSCSI之间通常存在比较。关键区别在于NFS是文件级实现,而iSCSI是块级实现。这适用于TCP / IP层,并允许通过局域网(LAN)发送SCSI命令。在诸如iSCSI和光纤…

介词for和with 和of的用法_英语中的for,to,at,of,in,on,with的用法

英语中的for,to,at,of,in,on,with的用法英语中的for,to,at,of,in,on,with的用法人气:594 ℃时间:2019-11-07 00:56:02优质解答一、介词按其构成可分为:1.简单介词 at,in,on,to,since,until 等.如:Hes worked there since 1998.2.复…

转帖:解决System.Data.OracleClient requires Oracle client software version 8.1.7 or greater

原帖:http://blog.csdn.net/killer000777/archive/2008/12/03/3438519.aspx 原来当Oracle 9.2运行在NTFS的分区上时,对于某些非administrator组的用户,ORACLE_HOME 目录是不可见的,而在windows server 2003下Asp.net应用使用的帐户…

超级计算机发展及现状论文,浅谈超级计算机发展的过程及研究现状

摘要:当前,多核技术的不断发展和日渐成熟,使得处理器的性能得到巨大提升.但是对于存储设备来说,无论是速度还是容量都无法跟上这种步伐.随着处理器和其它子系统发展差距的日益加大,超级计算机的效率问题逐渐成为人们讨论和研究的热点,大部分的实际应用在超级计算机上…

P1047 校门外的树 Noip2005普及组第二题

洛谷红题(咳咳)>>>>点击跳转 题目描述 某校大门外长度为L的马路上有一排树,每两棵相邻的树之间的间隔都是11米。我们可以把马路看成一个数轴,马路的一端在数轴00的位置,另一端在LL的位置;数轴上…

裂变红包码的制作_红包墙裂变源码活码玩法 实现多次裂变传播

这是一个移动互联网迅速发展的时代,每个人都是一个自媒体,都有自己的朋友圈和社群。门店除了依靠自己雄厚的实力外,还应该充分用好每个店员的资源,让每个店员这个自媒体为门店所用,将能力发挥到极致。红包推出的超级红…

计算机基础知识复习资料,计算机基础知识复习资料

.Word 资料Ch1 计算机基础知识1.1 现代信息技术1.1.1、特征:●以数字技术和电子技术为基础;●以计算机及其软件为核心;●采用电子技术(包括激光技术)进行信息的收集、传递、加工、存储、显示与控制。1.1.2、数字技术一、二进位数字——比特(b…

linux上安装Docker

Docker的三大核心概念:镜像、容器、仓库 镜像:类似虚拟机的镜像、用俗话说就是安装文件 容器:类似一个轻量级的沙箱,容器是从镜像创建应用运行实例,可以将其启动、开始、停止、删除、而这些容器都是相互隔离、互不可见…

python节日贺卡图片_节日贺卡图片制作手工

手工贺卡,相信你也对贺卡感到很大的兴趣的吧,手工贺卡。手工贺卡范文,欢迎阅读。所需材料:黑纸胶水压钱粉红色的纸具体步骤:1黑色和红色纸剪成宽度为5mm的长条的,画出你喜欢的花树轮廓,使用笔温柔的拉伸黑色…

幼儿园计算机教师论文,幼儿园中班教师论文

幼儿园中班教师论文导语:幼儿教育是什么?幼儿应如何正确接受教育?幼儿接受与不接受教育区别又如 何?这些看似简单的问题,其实不然。在当今科技发达的今天,幼儿的教育起着承前启后的重要阶段,不仅…

ASP.NET 5 入门 (2) – 自定义配置

原文:ASP.NET 5 入门 (2) – 自定义配置ASP.NET 5 入门 (2) – 自定义配置 ASP.NET 5 理解和入门 建立和开发ASP.NET 5 项目 初步理解ASP.NET5的配置 正如我的第一篇文章ASP.NET 5 (vNext) 理解和概述 所说,ASP.NET 5的具有全新的配置机制,我们可以通过以下几点来进行理解: 支持…

中有冒号 文件路径_用Matlab脚本文件实现Excel文件的合并

日常吐槽前段时间跟同事聊天,同事洗脑了一个新的(扎心的)世界观,“人生分三个阶段,20岁时承认父母很平庸,30岁时承认自己很平庸,40岁时承认孩子很平庸”。这是这位同事在孩子学而思考试后的心得…

2019计算机国二操作题,2019年3月计算机二级C++操作练习题及答案(十二)

一、程序改错题使用VC6.0打开考生文件夹下的源程序文件1.cpp,该程序运行时有错,请改正其中的错误,使程序正常运行,输出的结果为Constructor,i0,Destructor注意:错误的语句在//******error******…

最简单的一个 STL格式的网格文件

简介 最简单格式的一个STL格式的文件 文件内容 solid filenamestlfacet normal 1 1 1outer loopvertex 0 0 1vertex 0 1 0vertex 1 0 0endloopendfacet endsolid filenamestl 简单描述 solid filenamestlfacet normal 1 1 1//面的法向量outer loopvertex 0 0 1 // 顶点1vertex …

ntrip获取源列表_Ntrip协议简介(转)

1 什么是Ntrip?CORS(Continuously Operating Reference Stations)就是网络基准站,通过网络收发GPS差分数据。用户访问CORS后,不用单独架设GPS基准站,即可实现GPS流动站的差分定位。访问CORS系统,就需要网络通讯协议。N…

计算机数据与安全课件,计算机数据及软件的安全.ppt

计算机数据及软件的安全计算机数据及软件的安全一、计算机软件的安全问题 1、软件在计算机安全中的二重性 软件是计算机系统的重要组成部分。和硬件相比,软件是计算机系统的灵魂,用户通过软件才能使用计算机。 (1)计算机软件是系统安全保护的对象和安全控…

spring配置多视图解析器

最近做一个小项目(移动端),自己搭了个简单的SSM框架(spring spring MVC Mybitis),展示层本来选用的是jsp,各方便都已经搭建好,结果发现有些页面需要用到H5的一些功能,所以展示层需…

ios系统gps测试软件,GPS工具箱苹果版

GPS工具箱为用户准备的以手机GPS为基础的多功能位置服务的软件,它包括了很多非常实用的工具,包含线路追踪、测速、位置记录、面积测量等等,是GPS模块的功能发挥的非常充分到位,并且软件支持离线map和KML、KMZ导入导出、GPX文件导出…

建模实训报告总结_计算机三维建模实训报告

计算机三维建模实训报告实验时间:2014-6-23实验地点:明虹楼实验目的:理解三维CAD技术的相关概念和三维CAD的基础知识熟练CAD软件的基本操作,掌握软件的使用方法。能够更直观、更全面地反映设计意图,为将来从事计算机辅…