目录 1. 协程的基本原理 1.1 案例引入 1.2 基础知识 1.3 协程的用法 1.4 定义协程 1.5 绑定回调 1.6 多任务协程 1.7 协程实现 1.8 使用aiohttp 2. aiohttp的使用 2.1 基本介绍 2.2 基本实例 2.3 URL参数设置 2.4 其他请求类型 2.5 POST请求 2.6 响应 2.7 超时设置 2.8 并发限制 3. aiohttp异步爬取实战 3.1 案例介绍 3.2 准备工作 3.3 页面分析 3.4 实现思路 3.5 基本配置 3.6 爬取列表页 3.7 爬取详情页 实现 在main方法中将详情页的ID获取出来 爬取详情页 main方法增加对scrape_detail方法的调用 合并
1. 协程的基本原理
1.1 案例引入
https://www.httpbin.org/delay/5 服务器强制等待5秒才能返回响应
import logging
import time
import requestslogging. basicConfig( level= logging. INFO, format = '%(asctime)s %(levelname)s: %(message)s' ) URL = "https://www.httpbin.org/delay/5"
TOTAL_NUMBER = 10 start_time = time. time( ) for _ in range ( 1 , TOTAL_NUMBER) : logging. info( f"scraping { URL} " ) response = requests. get( URL) end_time = time. time( )
logging. info( f"total time { end_time - start_time} " )
1.2 基础知识
阻塞
阻塞状态:程序未得到所需计算资源时被挂起的状态 程序在操作上是阻塞的:程序在等待某个操作完成期间,自身无法继续干别的事情
非阻塞
程序在操作上非是阻塞的:程序在等待某操作的过程中,自身不被阻塞,可以继续干别的事情 仅当程序封装的级别可以囊括独立的子程序单元时,程序才可能存在非阻塞状态 因阻塞的存在而存在
同步
程序单元同步执行:不同程序单元为了共同完成某个任务,在执行过程中需要靠某种通信方式保持协调一致
异步
为了完成某种任务,有时不同程序单元之间无需通信协调也能完成任务
多进程
利用CPU的多核优势,在同i一时间并行执行多个任务,可以大大提高执行效率
协程
一种运行在用户态的轻量级线程 拥有自己的寄存器上下文和栈 调度切换 将寄存器上下文和栈保存到其他地方,等切回来时,再恢复先前保存的寄存器上下文和栈 能保留上一次调用时的状态,所有局部状态的一个特定组合,每次过程重入,就相当于进入上一次调用的状态 本质上是个单线程 实现异步操作
1.3 协程的用法
asyncio库 event_loop: 事件循环 把函数注册到这个事件循环上,当满足发生条件时,就调用对应的处理方法 coroutine: 协程 代指协程对象类型 可以将协程对象注册到时间循环中,它会被事件循环调用 task: future: 将来执行或者没有执行的任务的结果 和task没有本质区别 async关键字: 定义一个方法(协程) 这个方法在调用时不会立即被执行,而是会返回一个协程对象 await关键字:
1.4 定义协程
import asyncioasync def execute ( x) : print ( f"Number: { x} " ) coroutine = execute( 1 )
print ( f"Coroutine: { coroutine} " )
print ( "After calling execute" ) loop = asyncio. get_event_loop( )
loop. run_until_complete( coroutine)
print ( "After calling loop" )
把协程对象coroutine传递给run_until_complete方法的时候,实际上进行了一个操作 显示声明task
import asyncioasync def execute ( x) : print ( f"Number: { x} " ) coroutine = execute( 1 )
print ( f"Coroutine: { coroutine} " )
print ( "After calling execute" ) loop = asyncio. get_event_loop( )
task = loop. create_task( coroutine)
print ( f"Task: { task} " ) loop. run_until_complete( task)
print ( f"Task: { task} " )
print ( "After calling loop" )
import asyncioasync def execute ( x) : print ( f"Number: { x} " ) coroutine = execute( 1 )
print ( f"Coroutine: { coroutine} " )
print ( "After calling execute" ) task = asyncio. ensure_future( coroutine)
print ( f"Task: { task} " ) loop = asyncio. get_event_loop( )
loop. run_until_complete( task)
print ( f"Task: { task} " )
print ( "After calling loop" )
1.5 绑定回调
import asyncio
import requestsasync def request ( ) : url = "https://www.baidu.com/" status = requests. get( url) return statusdef callback ( task) : print ( f"Status: { task. result( ) } " ) coroutine = request( )
task = asyncio. ensure_future( coroutine)
task. add_done_callback( callback)
print ( f"Task: { task} " ) loop = asyncio. get_event_loop( )
loop. run_until_complete( task)
print ( f"Task: { task} " )
import asyncio
import requestsasync def request ( ) : url = "https://www.baidu.com/" status = requests. get( url) return statuscoroutine = request( )
task = asyncio. ensure_future( coroutine)
print ( f"Task: { task} " ) loop = asyncio. get_event_loop( )
loop. run_until_complete( task)
print ( f"Task: { task} " )
print ( f"Status: { task. result( ) } " )
1.6 多任务协程
import asyncio
import requestsasync def request ( ) : url = "https://www.baidu.com/" status = requests. get( url) return statustasks = [ asyncio. ensure_future( request( ) ) for _ in range ( 5 ) ]
print ( f"Tasks: { tasks} " ) loop = asyncio. get_event_loop( )
loop. run_until_complete( asyncio. wait( tasks) ) for task in tasks: print ( f"Task result: { task. result( ) } " )
1.7 协程实现
import asyncio
import time
import requestsasync def request ( ) : url = "https://www.httpbin.org/delay/5" print ( f"Waiting for { url} " ) response = requests. get( url) print ( f"Response: { response} from { url} " ) start = time. time( ) tasks = [ asyncio. ensure_future( request( ) ) for _ in range ( 5 ) ]
loop = asyncio. get_event_loop( )
loop. run_until_complete( asyncio. wait( tasks) ) end = time. time( )
print ( f"Cost time: { end - start} " )
import asyncio
import time
import requestsasync def request ( ) : url = "https://www.httpbin.org/delay/5" print ( f"Waiting for { url} " ) response = await requests. get( url) print ( f"Response: { response} from { url} " ) start = time. time( ) tasks = [ asyncio. ensure_future( request( ) ) for _ in range ( 5 ) ]
loop = asyncio. get_event_loop( )
loop. run_until_complete( asyncio. wait( tasks) ) end = time. time( )
print ( f"Cost time: { end - start} " )
报错 requests返回的Response对象不能和await一起使用 await后面的对象必须是如下格式之一: 一个原生协程对象 一个由types.coroutine修饰的生成器 由一个包含__ await __方法的对象返回的一个迭代器 将请求页面的方法独立出来
import asyncio
import time
import requestsasync def get ( url) : return requests. get( url) async def request ( ) : url = "https://www.httpbin.org/delay/5" print ( f"Waiting for { url} " ) response = await get( url) print ( f"Response: { response} from { url} " ) start = time. time( ) tasks = [ asyncio. ensure_future( request( ) ) for _ in range ( 5 ) ]
loop = asyncio. get_event_loop( )
loop. run_until_complete( asyncio. wait( tasks) ) end = time. time( )
print ( f"Cost time: { end - start} " )
1.8 使用aiohttp
安装
pip3 install aiohttp
使用
import asyncio
import time
import aiohttpasync def get ( url) : session = aiohttp. ClientSession( ) response = await session. get( url) await response. text( ) await session. close( ) return responseasync def request ( ) : url = "https://www.httpbin.org/delay/5" print ( f"Waiting for { url} " ) response = await get( url) print ( f"Response: { response} from { url} " ) start = time. time( ) tasks = [ asyncio. ensure_future( request( ) ) for _ in range ( 5 ) ]
loop = asyncio. get_event_loop( )
loop. run_until_complete( asyncio. wait( tasks) ) end = time. time( )
print ( f"Cost time: { end - start} " )
2. aiohttp的使用
2.1 基本介绍
asyncio:实现对TCP、UDP、SSL协议的异步操作 aiohttp:实现对HTTP请求的异步操作 aiohttp是基于asyncio的异步HTTP网络模块 既提供了服务端,又提供了客户端 服务端 可以搭建一个支持异步处理的服务器 类似于Django、Flask、Tornado等一些Web服务器 客户端 发起请求 类似于使用request发起一个HTTP请求然后获得响应 request发起的是同步网络请求 aiohttp发起的是异步网络请求
2.2 基本实例
import aiohttp
import asyncioasync def fetch ( session, url) : async with session. get( url) as response: return await response. json( ) , response. statusasync def main ( ) : async with aiohttp. ClientSession( ) as session: url = "https://www.httpbin.org/delay/5" html, status = await fetch( session, url) print ( f"html: { html} " ) print ( f"status: { status} " ) if __name__ == "__main__" : loop = asyncio. get_event_loop( ) loop. run_until_complete( main( ) ) asyncio. run( main( ) )
2.3 URL参数设置
import aiohttp
import asyncioasync def main ( ) : params = { "name" : "abc" , "age" : 10 } async with aiohttp. ClientSession( ) as session: async with session. get( "https://www.httpbin.org/get" , params= params) as response: print ( await response. text( ) ) if __name__ == "__main__" : asyncio. run( main( ) )
2.4 其他请求类型
session. post( "https://www.httpbin.org/post" , data= b"data" )
session. put( "https://www.httpbin.org/put" , data= b"data" )
session. delete( "https://www.httpbin.org/delete" )
session. head( "https://www.httpbin.org/get" )
session. options( "https://www.httpbin.org/get" )
session. patch( "https://www.httpbin.org/patch" , data= b"data" )
2.5 POST请求
表单提交
import aiohttp
import asyncioasync def main ( ) : data = { "name" : "abc" , "age" : 10 } async with aiohttp. ClientSession( ) as session: async with session. post( "https://www.httpbin.org/post" , data= data) as response: print ( await response. text( ) ) if __name__ == "__main__" : asyncio. run( main( ) )
JSON数据提交
import aiohttp
import asyncioasync def main ( ) : data = { "name" : "abc" , "age" : 10 } async with aiohttp. ClientSession( ) as session: async with session. post( "https://www.httpbin.org/post" , json= data) as response: print ( await response. text( ) ) if __name__ == "__main__" : asyncio. run( main( ) )
2.6 响应
import aiohttp
import asyncioasync def main ( ) : data = { "name" : "abc" , "age" : 10 } async with aiohttp. ClientSession( ) as session: async with session. post( "https://www.httpbin.org/post" , data= data) as response: print ( f"status: { response. status} " ) print ( f"headers: { response. headers} " ) print ( f"body: { await response. text( ) } " ) print ( f"bytes: { await response. read( ) } " ) print ( f"json: { await response. json( ) } " ) if __name__ == "__main__" : asyncio. run( main( ) )
2.7 超时设置
import aiohttp
import asyncioasync def main ( ) : timeout = aiohttp. ClientTimeout( total= 2 ) async with aiohttp. ClientSession( timeout= timeout) as session: async with session. get( "https://www.httpbin.org/get" ) as response: print ( f"status: { response. status} " ) if __name__ == "__main__" : asyncio. run( main( ) )
2.8 并发限制
使用Semeaphore来控制并发量 防止网站在短时间内无法响应 避免将目标网站爬挂掉的风险
import aiohttp
import asyncio
CONCURRENCY = 5
URL = "https://www.baidu.com/"
semaphore = asyncio. Semaphore( CONCURRENCY)
session = None async def scrape_api ( ) : async with semaphore: print ( f"Scraping { URL} " ) async with session. get( URL) as response: await asyncio. sleep( 1 ) return await response. text( ) async def main ( ) : global sessionsession = aiohttp. ClientSession( ) scrape_index_tasks = [ asyncio. ensure_future( scrape_api( ) ) for _ in range ( 1000 ) ] await asyncio. gather( * scrape_index_tasks) if __name__ == "__main__" : asyncio. run( main( ) )
3. aiohttp异步爬取实战
3.1 案例介绍
目标网站:Scrape | Book 网站数据由JavaScript渲染获得 数据可以通过Ajax接口获取 接口没有设置任何反爬措施和加密参数 目标: 使用aiohttp爬取全站的数据 将数据通过异步的方式保存到 MongoDB 中
3.2 准备工作
3.3 页面分析
列表页的Ajax请求接口的格式:https://spa5.scrape.center/api/book/?limit=18&offset={offset} limit:每一页包含多少书 offest:每一页的偏移量( o f f s e t = l i m i t ∗ ( p a g e − 1 ) offset=limit*(page-1) o ff se t = l imi t ∗ ( p a g e − 1 ) ) Ajax接口返回的数据中有每本书的id,利用id可以进入该书的详情页 详情页的Ajax请求接口的格式:https://spa5.scrape.center/detail/{id}
3.4 实现思路
爬取的两个阶段: 异步爬取所有列表页 将所有列表页的爬取任务集合在一起,并将其声明为由task组成的列表,进行异步爬取 拿到上一步列表页的所有内容并解析 将所有图书的id信息组合为所有详情页的爬取任务集合,并将其声明为task组成的列表,进行异步爬取,同时爬取结果也以异步方式存储到MongoDB中
3.5 基本配置
import logginglogging. basicConfig( level= logging. INFO, format = '%(asctime)s %(levelname)s: %(message)s' ) INDEX_URL = "https://spa5.scrape.center/api/book/?limit=18&offset={offset}"
DETAIL_URL = "https://spa5.scrape.center/detail/{id}"
PAGE_SIZE = 18
PAGE_NUMBER = 100
CONCURRENCY = 5
3.6 爬取列表页
实现
通用的爬取方法
import asyncio
import aiohttpasync def scrape_api ( url) : async with semaphore: try : logging. info( f"Scraping: { url} " ) async with session. get( url, verify_ssl= False ) as response: return await response. json( ) except aiohttp. ClientError: logging. info( f"Error: { url} " , exc_info= True )
爬取列表页
async def scrape_index ( page) : url = INDEX_URL. format ( offset= PAGE_SIZE * ( page - 1 ) ) return await scrape_api( url)
串联并用
import jsonasync def main ( ) : global sessionsession = aiohttp. ClientSession( ) scrape_index_tasks = [ asyncio. ensure_future( scrape_index( page) ) for page in range ( 1 , PAGE_NUMBER + 1 ) ] results = await asyncio. gather( * scrape_index_tasks) logging. info( f"Results: { json. dumps( results, ensure_ascii= False , indent= 2 ) } " ) if __name__ == "__main__" : asyncio. run( main( ) )
合并
import json
import asyncio
import logging
import aiohttplogging. basicConfig( level= logging. INFO, format = '%(asctime)s %(levelname)s: %(message)s' ) INDEX_URL = "https://spa5.scrape.center/api/book/?limit=18&offset={offset}"
DETAIL_URL = "https://spa5.scrape.center/detail/{id}"
PAGE_SIZE = 18
PAGE_NUMBER = 100
CONCURRENCY = 5 semaphore = asyncio. Semaphore( CONCURRENCY)
session = None async def scrape_api ( url) : async with semaphore: try : logging. info( f"Scraping: { url} " ) async with session. get( url, verify_ssl= False ) as response: return await response. json( ) except aiohttp. ClientError: logging. info( f"Error: { url} " , exc_info= True ) async def scrape_index ( page) : url = INDEX_URL. format ( offset= PAGE_SIZE * ( page - 1 ) ) return await scrape_api( url) async def main ( ) : global sessionsession = aiohttp. ClientSession( ) scrape_index_tasks = [ asyncio. ensure_future( scrape_index( page) ) for page in range ( 1 , PAGE_NUMBER + 1 ) ] results = await asyncio. gather( * scrape_index_tasks) logging. info( f"Results: { json. dumps( results, ensure_ascii= False , indent= 2 ) } " ) if __name__ == "__main__" : asyncio. run( main( ) )
3.7 爬取详情页
实现
在main方法中将详情页的ID获取出来
ids = [ ]
for index_data in results: if not index_data: continue for item in index_data. get( "results" ) : ids. append( item. get( "id" ) )
爬取详情页
async def scrape_detail ( id ) : url = DETAIL_URL. format ( id = id ) data = await scrape_api( url) logging. info( f"Saving: { data} " )
main方法增加对scrape_detail方法的调用
scrape_detail_tasks = [ asyncio. ensure_future( scrape_detail( id ) ) for id in ids] await asyncio. wait( scrape_detail_tasks) await session. close( )
合并
import json
import asyncio
import logging
import aiohttplogging. basicConfig( level= logging. INFO, format = '%(asctime)s %(levelname)s: %(message)s' ) INDEX_URL = "https://spa5.scrape.center/api/book/?limit=18&offset={offset}"
DETAIL_URL = "https://spa5.scrape.center/api/book/{id}"
PAGE_SIZE = 18
PAGE_NUMBER = 100
CONCURRENCY = 5 semaphore = asyncio. Semaphore( CONCURRENCY)
session = None async def scrape_api ( url) : async with semaphore: try : logging. info( f"Scraping: { url} " ) async with session. get( url, verify_ssl= False ) as response: return await response. json( ) except aiohttp. ClientError: logging. info( f"Error: { url} " , exc_info= True ) async def scrape_index ( page) : url = INDEX_URL. format ( offset= PAGE_SIZE * ( page - 1 ) ) return await scrape_api( url) async def scrape_detail ( id ) : url = DETAIL_URL. format ( id = id ) data = await scrape_api( url) logging. info( f"Saving { url} : { data} " ) async def main ( ) : global sessionsession = aiohttp. ClientSession( ) scrape_index_tasks = [ asyncio. ensure_future( scrape_index( page) ) for page in range ( 1 , PAGE_NUMBER + 1 ) ] results = await asyncio. gather( * scrape_index_tasks) logging. info( f"Results: { json. dumps( results, ensure_ascii= False , indent= 2 ) } " ) ids = [ ] for index_data in results: if not index_data: continue for item in index_data. get( "results" ) : ids. append( item. get( "id" ) ) scrape_detail_tasks = [ asyncio. ensure_future( scrape_detail( id ) ) for id in ids] await asyncio. wait( scrape_detail_tasks) await session. close( ) if __name__ == "__main__" : asyncio. run( main( ) )