Python 异步库 asyncio、aiohttp

asyncio

版本支持

  • asyncio 模块在 Python3.4 时发布。
  • async 和 await 关键字最早在 Python3.5 中引入。
  • Python3.3 之前不支持。

关键概念

  • event_loop 事件循环:程序开启一个无限的循环,程序员会把一些函数(协程)注册到事件循环上。当满足事件发生的时候,调用相应的协程函数。

  • coroutine 协程:协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。

  • future 对象: 代表将来执行或没有执行的任务的结果。它和task上没有本质的区别

  • task 任务:一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态。Task 对象是 Future 的子类,它将 coroutine 和 Future 联系在一起,将 coroutine 封装成一个 Future 对象。

  • async/await 关键字:python3.5 用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口。其作用在一定程度上类似于yield。

工作流程

  1. 定义/创建协程对象
  2. 将协程转为task任务
  3. 定义事件循环对象容器
  4. 将task任务放到事件循环对象中触发
import asyncioasync def hello(name):print('Hello,', name)# 定义协程对象
coroutine = hello("World")# 定义事件循环对象容器
loop = asyncio.get_event_loop()# 将协程转为task任务
# task = asyncio.ensure_future(coroutine)
task = loop.create_task(coroutine)# 将task任务扔进事件循环对象中并触发
loop.run_until_complete(task)

并发

1. 创建多个协程的列表 tasks:
import asyncioasync def do_some_work(x):print('Waiting: ', x)await asyncio.sleep(x)return 'Done after {}s'.format(x)tasks = [do_some_work(1), do_some_work(2), do_some_work(4)]
2. 将协程注册到事件循环中:

方法一:使用 asyncio.wait()

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

方法二:使用 asyncio.gather()

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*tasks))
3. 查看 return 结果:
for task in tasks:print('Task ret: ', task.result())
4. asyncio.wait()asyncio.gather() 的区别:

接收参数不同:

asyncio.wait():必须是一个 list 对象,list 对象里存放多个 task 任务。

# 使用 asyncio.ensure_future 转换为 task 对象
tasks=[asyncio.ensure_future(factorial("A", 2)),asyncio.ensure_future(factorial("B", 3)),asyncio.ensure_future(factorial("C", 4))
]# 也可以不转为 task 对象
# tasks=[
#        factorial("A", 2),
#        factorial("B", 3),
#        factorial("C", 4)
# ]loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

asyncio.gather():比较广泛,注意接收 list 对象时 * 不能省略。

tasks=[asyncio.ensure_future(factorial("A", 2)),asyncio.ensure_future(factorial("B", 3)),asyncio.ensure_future(factorial("C", 4))
]# tasks=[
#        factorial("A", 2),
#        factorial("B", 3),
#        factorial("C", 4)
# ]loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*tasks))
loop = asyncio.get_event_loop()group1 = asyncio.gather(*[factorial("A" ,i) for i in range(1, 3)])
group2 = asyncio.gather(*[factorial("B", i) for i in range(1, 5)])
group3 = asyncio.gather(*[factorial("B", i) for i in range(1, 7)])loop.run_until_complete(asyncio.gather(group1, group2, group3))

返回结果不同:

asyncio.wait():返回 dones(已完成任务) 和 pendings(未完成任务)

dones, pendings = await asyncio.wait(tasks)for task in dones:print('Task ret: ', task.result())

asyncio.gather():直接返回结果

results = await asyncio.gather(*tasks)for result in results:print('Task ret: ', result)

aiohttp

ClientSession 会话管理

import aiohttp
import asyncioasync def main():async with aiohttp.ClientSession() as session:async with session.get('http://httpbin.org/get') as resp:print(resp.status)print(await resp.text())asyncio.run(main())

其他请求:

session.post('http://httpbin.org/post', data=b'data')
session.put('http://httpbin.org/put', data=b'data')
session.delete('http://httpbin.org/delete')
session.head('http://httpbin.org/get')
session.options('http://httpbin.org/get')
session.patch('http://httpbin.org/patch', data=b'data')

URL 参数传递

async def main():async with aiohttp.ClientSession() as session:params = {'key1': 'value1', 'key2': 'value2'}async with session.get('http://httpbin.org/get', params=params) as r:expect = 'http://httpbin.org/get?key1=value1&key2=value2'assert str(r.url) == expect
async def main():async with aiohttp.ClientSession() as session:params = [('key', 'value1'), ('key', 'value2')]async with session.get('http://httpbin.org/get', params=params) as r:expect = 'http://httpbin.org/get?key=value2&key=value1'assert str(r.url) == expect

获取响应内容

async def main():async with aiohttp.ClientSession() as session:async with session.get('http://httpbin.org/get') as r:# 状态码print(r.status)# 响应内容,可以自定义编码print(await r.text(encoding='utf-8'))# 非文本内容print(await r.read())# JSON 内容print(await r.json())

自定义请求头

headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.106 Safari/537.36"}async def main():async with aiohttp.ClientSession() as session:async with session.get('http://httpbin.org/get', headers=headers) as r:print(r.status)

为所有会话设置请求头:

headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.106 Safari/537.36"}async def main():async with aiohttp.ClientSession(headers=headers) as session:async with session.get('http://httpbin.org/get') as r:print(r.status)

自定义 cookies

async def main():cookies = {'cookies_are': 'working'}async with aiohttp.ClientSession() as session:async with session.get('http://httpbin.org/cookies', cookies=cookies) as resp:assert await resp.json() == {"cookies": {"cookies_are": "working"}}

为所有会话设置 cookies:

async def main():cookies = {'cookies_are': 'working'}async with aiohttp.ClientSession(cookies=cookies) as session:async with session.get('http://httpbin.org/cookies') as resp:assert await resp.json() == {"cookies": {"cookies_are": "working"}}

设置代理

注意:只支持 http 代理。

async def main():async with aiohttp.ClientSession() as session:proxy = "http://127.0.0.1:1080"async with session.get("http://python.org", proxy=proxy) as r:print(r.status)

需要用户名密码授权的代理:

async def main():async with aiohttp.ClientSession() as session:proxy = "http://127.0.0.1:1080"proxy_auth = aiohttp.BasicAuth('username', 'password')async with session.get("http://python.org", proxy=proxy, proxy_auth=proxy_auth) as r:print(r.status)

也可以直接传递:

async def main():async with aiohttp.ClientSession() as session:proxy = "http://username:password@127.0.0.1:1080"async with session.get("http://python.org", proxy=proxy) as r:print(r.status)

异步爬虫示例

import asyncio
import aiohttpfrom lxml import etree
from datetime import datetimeheaders = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.106 Safari/537.36"}async def get_movie_url():req_url = "https://movie.douban.com/chart"async with aiohttp.ClientSession() as session:async with session.get(url=req_url, headers=headers) as response:result = await response.text()result = etree.HTML(result)return result.xpath("//*[@id='content']/div/div[1]/div/div/table/tr/td/a/@href")async def get_movie_content(movie_url):async with aiohttp.ClientSession() as session:async with session.get(url=movie_url, headers=headers) as response:result = await response.text()result = etree.HTML(result)movie = dict()name = result.xpath('//*[@id="content"]/h1/span[1]//text()')author = result.xpath('//*[@id="info"]/span[1]/span[2]//text()')movie["name"] = namemovie["author"] = authorreturn moviedef run():start = datetime.now()loop = asyncio.get_event_loop()movie_url_list = loop.run_until_complete(get_movie_url())tasks = [get_movie_content(url) for url in movie_url_list]movies = loop.run_until_complete(asyncio.gather(*tasks))print(movies)print("异步用时为:{}".format(datetime.now() - start))if __name__ == '__main__':run()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/437646.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Spark-自定义累加器-进行字符串拼接(代码及详细实现步骤)

看longAccumulator()方法源码里是val acc new LongAccumulator然后用register(acc)在Spark中注册了累加器,进入ctrl鼠标左键进入LongAccumulator,可以看到继承了AccumulatorV2[jl.Long, jl.Long],根据LongAccumulator来实现自定义累加器 实现类 //1.继…

Python3 学习系列 丨 博客目录索引

整个博客有关 Python 学习目录索引,方便快捷定位查询基础学习篇 Python3 基础学习笔记 C01【变量和简单数据类型】Python3 基础学习笔记 C02【列表】Python3 基础学习笔记 C03【操作列表】Python3 基础学习笔记 C04【if 语句】Python3 基础学习笔记 C05【字典】Pyt…

【转】日邮物流:实现智慧物流,这个云上对了!

和阳光、空气、水、网络一样,「物流」早已成为当代企业、个人赖以生存的必要条件。2020第一季度全球物流受疫情影响面临挑战,业内普遍预计全球物流及供应链将重新优化布局。借此时机,物流业纷纷将目光投向“数字化智慧物流”方向,…

Spark-三大数据结构之-广播变量

什么是广播变量 分布式只读共享变量 首先广播变量是一个调优策略(可以减少数据的传输,也就是数据从driver传输到executor) (每一个executor都要传list数据,如果数据太多就很慢,采用广播变量他是一个共享只读变量,可以减少数据传…

Python 实现十大经典排序算法

目录排序算法分类一、冒泡排序(Bubble Sort)1、原理2、步骤3、动画演示4、代码实现5、具体示例二、选择排序(Selection Sort)1、原理2、步骤3、动画演示4、代码实现5、具体示例三、插入排序(Insertion Sort&#xff09…

【转】Microsoft Graph 桌面应用程序

桌面应用程序,在我这篇文章的语境中,我是特指在Windows桌面上面直接运行的.NET应用程序,包括Console Application,WPF Application,Windows Forms Application, UWP Application,并且限于篇幅,我…

【转】Microsoft Graph Web应用程序极致开发体验

前言 这篇文章最早写于2017年5月2日,当时的想法是从最简单的方式来写如何在一个ASP.NET MVC应用程序中集成Microsoft Graph,但实际上还真不是那么简单,至少我是不满意的,加上这一两周都比较忙,所以这一篇就一直搁置。…

Spark(idea)操作mysql进行查询和插入 (代码+理解)

首先在maven中加入配置 <!--mysql--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.27</version></dependency>然后在idea配置数据库 1&#xff09; 查询 //1.查询数…

【转】在无人值守程序(服务)中调用Microsoft Graph

什么是无人值守程序&#xff08;服务&#xff09; 我在此前用了几篇文章分别介绍了在桌面应用程序&#xff08;控制台&#xff09;&#xff0c;Web应用程序&#xff08;ASP.NET MVC&#xff09;&#xff0c;以及PowerSehll脚本中如何访问Microsoft Graph&#xff0c;今天这一篇…

MapReduce的shuffle阶段

Shuffle 为何需要shuffle • Reduce阶段的数据来源于不同的Map Shuffle由Map端和Reduce端组成 Shuffle的核心机制 • 数据分区排序 Map端 • 对Map输出结果进行spill Reduce端 • 拷贝Map端输出结果到本地 • 对拷贝的数据进行归并排序Shuffle Map端 Map端会源源不断的把数据输…

【转】使用PowerApps快速构建基于主题的轻业务应用 —— 入门篇

前言 在上一篇文章 基于Office 365的随需应变业务应用平台 中我提到&#xff0c;随着随需应变的业务需要&#xff0c;以及技术的发展&#xff0c;业务应用的开发的模式也有了深刻的变化。基于微软的平台&#xff0c;有服务于主干业务应用的Dynamic 365 业务应用平台&#xff0…

Spark内核源码学习(暂未学完)

1&#xff09; 回顾 1.1 Spark通用运行流程概述 在submit任务条件是需要指定executo个数&#xff0c;executor-CUP个数&#xff0c;可以提高并行度。 什么是并行&#xff0c;什么是并发&#xff1f; 并发&#xff1a;假如有多个任务task&#xff0c;并行是在一个cup中&#x…

【转】使用PowerApps快速构建基于主题的轻业务应用 —— 进阶篇

在上一篇 使用PowerApps快速构建基于主题的轻业务应用 —— 入门篇 中&#xff0c;我用了三个实际的例子演示了如何快速开始使用PowerApps构建轻业务应用&#xff0c;你可能已经发现&#xff0c;我都是使用默认生成的设置&#xff0c;没有做任何修改。当然&#xff0c;那样做出…

Spark一些组件的定义

Driver program: 运行应用程序的main函数并创建SparkContext的进程 除了RDD的最终执行所写的业务逻辑&#xff0c;剩下的都在Driver里生成&#xff0c;Driver端执行action算子才会到开始执行所创建的DAG-RDD图。 Cluster manager&#xff1a; 用于获取集群资源外部服务 Mas…

【转】D365 FO第三方集成(二)---访问认证(获取访问令牌)

D365 FO 在github上发布了第三方访问D365 FO的示例代码&#xff0c;里面包含了各种调用示例&#xff0c;代码很清晰。https://github.com/microsoft/Dynamics-AX-Integration 这篇blog简单分析一下代码中获取访问令牌的部分代码。 与获取访问令牌相关的代码有两个类ClientConfi…

SparkSQL练习+理解+详解

def main(args: Array[String]): Unit {//创建配置对象val conf new SparkConf().setAppName("SparkSQL01_demo").setMaster("local[*]")val session SparkSession.builder().config(conf).getOrCreate()//创建RDD (session里包含sparkContext&#xf…

【转】D365 FO第三方集成(三)---服务实现

D365 FO的Custom Service的实现比AX2012简单了很多。 AX2012服务方法要用属性SysEntryPointAttribute标记&#xff0c;添加到Services以后&#xff0c;还要发布服务并在系统管理入站端口添加操作&#xff0c;服务运行在CIL下&#xff0c;所以每次改动服务方法的代码都要增量生成…

SparkSQL自定义AVG强类型聚合函数与弱类型聚合函数汇总

AVG是求平均值&#xff0c;所以输出类型是Double类型 1&#xff09;创建弱类型聚合函数类extends UserDefinedAggregateFunction class MyAgeFunction extends UserDefinedAggregateFunction {//函数输入的数据结构&#xff0c;需要new一个具体的结构对象&#xff0c;然后添加…

PHP连接sql seaver数据库

我的PHP版本7.0 通过sqlsrv系列函数&#xff0c;需要下载安装Microsoft Drivers for PHP for SQL Server驱动&#xff1a; 地址&#xff1a;https://msdn.microsoft.com/library/dn865013.aspx。 根据自己需求下载安装&#xff0c;安装地址php下ext目录下&#xff0c;我的是4.0…

【转】D365 FO第三方集成(四)---客户端调用

客户端调用json-based服务非常简单&#xff0c;就是标准的http调用。 http调用首先要解决URL的组成&#xff0c;D365 FO json-based调用的url组成如下&#xff1a; https://usnconeboxax1aos.cloud.onebox.dynamics.cn/api/services/{服务组名}/{服务名}/{方法名} 调用的代码很…