Python 异步 redis

 

现在的 Python 的异步 redis,有三种( aredis 、aioredis、asynio_redis)

 

 

aredis 、aioredis、asynio_redis 对比

 

From:https://zhuanlan.zhihu.com/p/24720629

  1. aioredis 要求装上 hiredis , 而 aredis 可以不需要相关依赖地运行,速度上两者持平,且都可以使用 hiredis 来作为 parser ,用 uvloop 代替 asyncio 的 eventloop 来加速
  2. asyncio_redis 使用了 Python 提供的 protocol 来进行异步通信,而 aredis 则使用 StreamReader 和 StreamWriter 来进行异步通信,在运行速度上两倍于 asyncio_redis ,附上 benchmark
  3. aioredis 和 asyncio_redis 这两个客户端目前都还没有对于集群的支持,相对来说 aredis 的功能更为全面一些

 

asyncio 从 redis 取任务

asyncio 怎么与 redis 队列协同?:https://www.v2ex.com/amp/t/489042

 

 

1. aredis

 

github 地址:https://github.com/NoneGG/aredis

aredis 官方英文文档:https://aredis.readthedocs.io/en/latest/

aredis 一个高效和用户友好的异步Redis客户端:https://www.ctolib.com/aredis.html

:https://github.com/NoneGG/aredis/tree/master/examples

 

安装:

pip install aredis

 

开始使用:

更多使用示例:https://github.com/NoneGG/aredis/tree/master/examples

 

1. 单节点版

import asyncio
from aredis import StrictRedisasync def example():client = StrictRedis(host='127.0.0.1', port=6379, db=0)await client.flushdb()await client.set('foo', 1)assert await client.exists('foo') is Trueawait client.incr('foo', 100)assert int(await client.get('foo')) == 101await client.expire('foo', 1)await asyncio.sleep(0.1)await client.ttl('foo')await asyncio.sleep(1)assert not await client.exists('foo')loop = asyncio.get_event_loop()
loop.run_until_complete(example())

 

2. 集群版

import asyncio
from aredis import StrictRedisClusterasync def example():client = StrictRedisCluster(host='172.17.0.2', port=7001)await client.flushdb()await client.set('foo', 1)await client.lpush('a', 1)print(await client.cluster_slots())await client.rpoplpush('a', 'b')assert await client.rpop('b') == b'1'loop = asyncio.get_event_loop()
loop.run_until_complete(example())

 

 

2. aioredis

 

github 地址:https://github.com/aio-libs/aioredis

官方文档:https://aioredis.readthedocs.io/en/v1.3.0/

 

开始使用

 

安装:pip install aioredis

连接 redis

import asyncio
import aioredisasync def main():redis = await aioredis.create_redis_pool('redis://localhost')await redis.set('my-key', 'value')value = await redis.get('my-key', encoding='utf-8')print(value)redis.close()await redis.wait_closed()asyncio.run(main())

simple low-level interface:

import asyncio
import aioredisloop = asyncio.get_event_loop()async def go():conn = await aioredis.create_connection(('localhost', 6379), loop=loop)await conn.execute('set', 'my-key', 'value')val = await conn.execute('get', 'my-key')print(val)conn.close()await conn.wait_closed()loop.run_until_complete(go())
# will print 'value'

simple high-level interface:

import asyncio
import aioredisloop = asyncio.get_event_loop()async def go():redis = await aioredis.create_redis(('localhost', 6379), loop=loop)await redis.set('my-key', 'value')val = await redis.get('my-key')print(val)redis.close()await redis.wait_closed()loop.run_until_complete(go())
# will print 'value'

Connections pool:

import asyncio
import aioredisloop = asyncio.get_event_loop()async def go():pool = await aioredis.create_pool(('localhost', 6379), minsize=5, maxsize=10, loop=loop)with await pool as redis:  # high-level redis API instanceawait redis.set('my-key', 'value')print(await redis.get('my-key'))# graceful shutdownpool.close()await pool.wait_closed()loop.run_until_complete(go())

 

连接到指定 db 的 两种方法:

  1. 指定 db 参数:redis = await aioredis.create_redis_pool('redis://localhost', db=1)
  2. 在 URL 中指定 db:redis = await aioredis.create_redis_pool('redis://localhost/2')
  3. 使用 select 方法:
    redis = await aioredis.create_redis_pool('redis://localhost/')
    await redis.select(3)
    

 

连接带密码的 redis 实例:

The password can be specified either in keyword argument or in address URI:

redis = await aioredis.create_redis_pool('redis://localhost', password='sEcRet')redis = await aioredis.create_redis_pool('redis://:sEcRet@localhost/')

 

结果编码:

By default aioredis will return bytes for most Redis commands that return string replies. Redis error replies are known to be valid UTF-8 strings so error messages are decoded automatically.

If you know that data in Redis is valid string you can tell aioredis to decode result by passing keyword-only argument encoding in a command call:

示例代码:

import asyncio
import aioredisasync def main():redis = await aioredis.create_redis_pool('redis://localhost')await redis.set('key', 'string-value')bin_value = await redis.get('key')assert bin_value == b'string-value'str_value = await redis.get('key', encoding='utf-8')assert str_value == 'string-value'redis.close()await redis.wait_closed()asyncio.run(main())

示例代码:

import asyncio
import aioredisasync def main():redis = await aioredis.create_redis_pool('redis://localhost')await redis.hmset_dict('hash', key1='value1', key2='value2', key3=123)result = await redis.hgetall('hash', encoding='utf-8')assert result == {'key1': 'value1','key2': 'value2','key3': '123',  # note that Redis returns int as string}redis.close()await redis.wait_closed()asyncio.run(main())

 

事务( Multi/Exec )

import asyncio
import aioredisasync def main():redis = await aioredis.create_redis_pool('redis://localhost')tr = redis.multi_exec()tr.set('key1', 'value1')tr.set('key2', 'value2')ok1, ok2 = await tr.execute()assert ok1assert ok2asyncio.run(main())

multi_exec() method creates and returns new MultiExec object which is used for buffering commands and then executing them inside MULTI/EXEC block.

重要提示:不要在 类似 ( tr.set('foo', '123') ) 上 使用 await buffered 命令, 因为它将被永远阻塞。

下面的代码将会给永远阻塞:

tr = redis.multi_exec()
await tr.incr('foo')   # that's all. we've stuck!

 

发布订阅 模式

aioredis 提供了对 Redis 的 发布/订阅(Publish / Subscribe) 消息的支持。

To start listening for messages you must call either subscribe() or psubscribe() method. Both methods return list of Channel objects representing subscribed channels.

Right after that the channel will receive and store messages (the Channel object is basically a wrapper around asyncio.Queue). To read messages from channel you need to use get() or get_json() coroutines.

订阅 和 阅读 频道 示例:

import asyncio
import aioredisasync def main():redis = await aioredis.create_redis_pool('redis://localhost')ch1, ch2 = await redis.subscribe('channel:1', 'channel:2')assert isinstance(ch1, aioredis.Channel)assert isinstance(ch2, aioredis.Channel)async def reader(channel):async for message in channel.iter():print("Got message:", message)asyncio.get_running_loop().create_task(reader(ch1))asyncio.get_running_loop().create_task(reader(ch2))await redis.publish('channel:1', 'Hello')await redis.publish('channel:2', 'World')redis.close()await redis.wait_closed()asyncio.run(main())

订阅 和 阅读 模式:

import asyncio
import aioredisasync def main():redis = await aioredis.create_redis_pool('redis://localhost')ch, = await redis.psubscribe('channel:*')assert isinstance(ch, aioredis.Channel)async def reader(channel):async for ch, message in channel.iter():print("Got message in channel:", ch, ":", message)asyncio.get_running_loop().create_task(reader(ch))await redis.publish('channel:1', 'Hello')await redis.publish('channel:2', 'World')redis.close()await redis.wait_closed()asyncio.run(main())

 

Sentinel client

import asyncio
import aioredisasync def main():sentinel = await aioredis.create_sentinel(['redis://localhost:26379', 'redis://sentinel2:26379'])redis = sentinel.master_for('mymaster')ok = await redis.set('key', 'value')assert okval = await redis.get('key', encoding='utf-8')assert val == 'value'asyncio.run(main())

Sentinel 客户端需要一个 Redis Sentinel 地址列表,来连接并开始发现服务。

调用 master_for() 或 slave_for() 方法 将返回连接到 Sentinel 监视的指定服务的 Redis 客户端。

Sentinel 客户端将自动检测故障转移并重新连接 Redis 客户端。

import asyncio
import aioredisloop = asyncio.get_event_loop()async def go():conn = await aioredis.create_connection(('localhost', 6379), loop=loop)await conn.execute('set', 'my-key', 'value')val = await conn.execute('get', 'my-key')print(val)conn.close()await conn.wait_closed()
loop.run_until_complete(go())
# will print 'value'

连接池

from sanic import Sanic, response
import aioredisapp = Sanic(__name__)@app.route("/")
async def handle(request):async with request.app.redis_pool.get() as redis:await redis.execute('set', 'my-key', 'value')val = await redis.execute('get', 'my-key')return response.text(val.decode('utf-8'))@app.listener('before_server_start')
async def before_server_start(app, loop):app.redis_pool = await aioredis.create_pool(('localhost', 6379),minsize=5,maxsize=10,loop=loop)@app.listener('after_server_stop')
async def after_server_stop(app, loop):app.redis_pool.close()await app.redis_pool.wait_closed()if __name__ == '__main__':app.run(host="0.0.0.0", port=80)

 

示例:

import asyncio
import aioredisloop = asyncio.get_event_loop()async def go_1():conn = await aioredis.create_connection(('localhost', 6379), loop=loop)await conn.execute('set', 'my-key', 'value')val = await conn.execute('get', 'my-key')print(val)conn.close()await conn.wait_closed()async def go_2():redis = await aioredis.create_redis(('localhost', 6379), loop=loop)await redis.set('my-key', 'value')val = await redis.get('my-key')print(val)redis.close()await redis.wait_closed()async def go_3():redis_pool = await aioredis.create_pool(('localhost', 6379), minsize=5, maxsize=10, loop=loop)async with redis_pool.get() as conn:  # high-level redis API instanceawait conn.execute('set', 'my-key', 'value')print(await conn.execute('get', 'my-key'))# graceful shutdownredis_pool.close()await redis_pool.wait_closed()loop.run_until_complete(go_1())
# loop.run_until_complete(go_2())
# loop.run_until_complete(go_3())

 

 

3. asynio_redis

 

GitHub 地址:https://github.com/jonathanslenders/asyncio-redis

官方英文文档:https://asyncio-redis.readthedocs.io/en/latest/

 

安装:pip install asyncio_redis

 

The connection class

asyncio_redis.Connection instance will take care of the connection and will automatically reconnect, using a new transport when the connection drops. This connection class also acts as a proxy to a asyncio_redis.RedisProtocol instance; any Redis command of the protocol can be called directly at the connection.

import asyncio
import asyncio_redisasync def example():# Create Redis connectionconnection = await asyncio_redis.Connection.create(host='127.0.0.1', port=6379)    # Set a keyawait connection.set('my_key', 'my_value')# When finished, close the connection.connection.close()if __name__ == '__main__':loop = asyncio.get_event_loop()loop.run_until_complete(example())

 

Connection pooling

Requests will automatically be distributed among all connections in a pool. If a connection is blocking because of --for instance-- a blocking rpop, another connection will be used for new commands.

import asyncio
import asyncio_redisasync def example():# Create Redis connectionconnection = await asyncio_redis.Pool.create(host='127.0.0.1', port=6379, poolsize=10)# Set a keyawait connection.set('my_key', 'my_value')# When finished, close the connection pool.connection.close()if __name__ == '__main__':loop = asyncio.get_event_loop()loop.run_until_complete(example())

 

Transactions example

import asyncio
import asyncio_redisasync def example():# Create Redis connectionconnection = await asyncio_redis.Pool.create(host='127.0.0.1', port=6379, poolsize=10)# Create transactiontransaction = await connection.multi()# Run commands in transaction (they return future objects)f1 = await transaction.set('key', 'value')f2 = await transaction.set('another_key', 'another_value')# Commit transactionawait transaction.exec()# Retrieve resultsresult1 = await f1result2 = await f2# When finished, close the connection pool.connection.close()

It's recommended to use a large enough poolsize. A connection will be occupied as long as there's a transaction running in there.

 

Pub / sub example

import asyncio
import asyncio_redisasync def example():# Create connectionconnection = await asyncio_redis.Connection.create(host='127.0.0.1', port=6379)# Create subscriber.subscriber = await connection.start_subscribe()# Subscribe to channel.await subscriber.subscribe([ 'our-channel' ])# Inside a while loop, wait for incoming events.while True:reply = await subscriber.next_published()print('Received: ', repr(reply.value), 'on channel', reply.channel)# When finished, close the connection.connection.close()

 

LUA Scripting example

import asyncio
import asyncio_rediscode = """
local value = redis.call('GET', KEYS[1])
value = tonumber(value)
return value * ARGV[1]
"""async def example():connection = await asyncio_redis.Connection.create(host='127.0.0.1', port=6379)# Set a keyawait connection.set('my_key', '2')# Register scriptmultiply = await connection.register_script(code)# Run scriptscript_reply = await multiply.run(keys=['my_key'], args=['5'])result = await script_reply.return_value()print(result)  # prints 2 * 5# When finished, close the connection.connection.close()

 

Example using the Protocol class

import asyncio
import asyncio_redisasync def example():loop = asyncio.get_event_loop()# Create Redis connectiontransport, protocol = await loop.create_connection(asyncio_redis.RedisProtocol, '127.0.0.1', 6379)# Set a keyawait protocol.set('my_key', 'my_value')# Get a keyresult = await protocol.get('my_key')print(result)# Close transport when finished.transport.close()if __name__ == '__main__':asyncio.get_event_loop().run_until_complete(example())pass

 

 

asyncio-redis 是 Python asyncio 的 Redis 客户端 (PEP 3156)。这个 Redis 库是完全异步的,Reids 服务器非阻塞客户端,依赖于 asyncio,所以要求 Python 3.3. 以上版本。

安装:pip install asyncio-redis

创建一个redisUtils.py

class Redis:"""A simple wrapper class that allows you to share a connectionpool across your application."""_pool = Noneasync def get_redis_pool(self):REDIS_CONFIG = {'host': 'localhost', 'port': 6379}try:from config import REDIS_CONFIGexcept:passif not self._pool:self._pool = await asyncio_redis.Pool.create(host=REDIS_CONFIG['host'], port=REDIS_CONFIG['port'], password=REDIS_CONFIG.get('password'), poolsize=10)return self._poolasync def close(self):if self._pool:self._pool.close()

再创建一个run.py

from utils.redisUtils import Redis  #引用上面的配置
import json as jsonnredis = Redis()
r = await redis.get_redis_pool()
key = '/hushuai/product'
await r.set(key, 'value')
val = await r.get(key)
print(val)key_list = '/hushuai/product_list'
product_list_size = await r.llen(key_list)
print(product_list_size)
if product_list_size != 0:if product_list_size > start_page:product_list = await r.lrange(key, start_page, end_page)product_list = await product_list.aslist()product_list_other = []for i in product_list:product_list_other.append(jsonn.loads(i.replace('\'', '\"').replace('None', '""')))data = [product_list_size, product_list_other]else:data = await get_items(app.pool,"product_view",re_params,with_total=with_total,pager=pager)
else:data = await get_items(app.pool, "product_view", re_params, with_total=with_total, pager=pager)data_redis = await get_items(app.pool, "product_view", re_params)list = []for product_data in data_redis:list.append(str(product_data))if list:await r.rpush(key, list)

主要讲的是python 使用异步redis的方式,这里只是做了redis的str和list两种类型的数据处理。

 

 

 

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/495393.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AspNetPager 修改 然后返回当前页

昨天碰到一个问题需要修改了 然后然后当前选中页面 这个问题应该很简单 也就是传值问题 根据自己的需求而决定 在返回的时候数据是当前的数据但是不在选中的页号 纠结了很长时间 请求别人帮助终于发现问题 原来是 Pager.CurrentPageIndex没有设定 大家应该认为我是不是很马虎 …

DeepMind新论文:给侧面照片,AI给你脑补出正面

来源:澎湃新闻摘要:大家在学生时代可能都面对过这些涉及空间想象的几何题。从根本上,它们考验的是2D图像和3D场景间的转换能力。如今,人工智能也成功打破了这种“次元壁”。用小立方体搭一个几何体,使它的主视图和俯视…

安卓逆向 和 手游辅助 学习 路线

From:https://zhuanlan.zhihu.com/p/95915254 知乎:Android 逆向分析学习路线?:https://www.zhihu.com/question/25626303 入门篇 --- 学习Android安全和逆向开发的路线总结:https://www.52pojie.cn/thread-1065039-…

对 带头结点的单链表 的操作

//带头结点的单链表#include<iostream>using namespace std;typedef struct student{int data;struct student *next;}node;node * creat() //创建单链表{node *head,*p,*s;int x,cycle1;head(node *)malloc(sizeof(node));phead;while(cycle){…

可交互的对抗网络如何增强人类创造力?

编译&#xff1a;集智翻译组来源&#xff1a;distill.pub作者&#xff1a;Shan Carter&#xff0c;Michael Nielsen原题&#xff1a;Using Artificial Intelligence to Augment Human Intelligence摘要&#xff1a;计算机不仅可以是解决数学问题的工具&#xff0c;还可以是拥有…

[转载] 本能

新浪影音娱乐&#xff1a;http://data.ent.sina.com.cn/movie/3819.html 转载于:https://www.cnblogs.com/6DAN_HUST/archive/2011/08/04/2126907.html

ARM 寄存器 详解

From&#xff08; ARM 寄存器详解 &#xff09;&#xff1a;https://blog.csdn.net/sandeldeng/article/details/52954781 ARM 汇编基础教程&#xff1a;2.数据类型和寄存器&#xff1a;https://www.52pojie.cn/thread-797306-1-1.html ARM 的 寄存器 ARM 工作状态 和 工作模式…

让AI个性化而且功耗更低 IBM研发新型神经网络芯片

选自&#xff1a;Singularity Hub编译&#xff1a;网易智能参与&#xff1a;李擎摘要&#xff1a;在GPU上运行的神经网络已经在人工智能领域取得了一些惊人的进步&#xff0c;但这两者的合作还并不完美。IBM的研究人员希望能设计出一种专门用于运行神经网络的新的芯片&#xff…

Net C# 扩展方法

Net C# 扩展方法扩展方法使您能够向现有类型“添加”方法&#xff0c;而无需创建新的派生类型、重新编译或以其他方式修改原始类型。扩展方法是一种特殊的静态方法&#xff0c;但可以像扩展类型上的实例方法一样进行调用。好处&#xff1a;不修改原有类型的实现调用&#xff1a…

豆瓣 音乐和读书的搜索页的 window.__DATA__ 的解密

豆瓣读书搜索页的 window.__DATA__ 的解密&#xff1a;https://www.jianshu.com/p/ac8b81950a73 豆瓣的 音乐搜索 和 读书搜索&#xff0c;返回的网页源码都有 window.__DATA__ &#xff0c;这是 js 加密 之后的数据 具体分析 参考 &#xff1a;https://www.jianshu.com/p/ac8…

每日一题——LeetCode888

方法一 个人方法&#xff1a; 交换后要达到相同的数量&#xff0c;那么意味着这个相同的数量就是两个人总数的平均值&#xff0c;假设A总共有4个&#xff0c;B总共有8个&#xff0c;那么最后两个人都要达到6个&#xff0c;如果A的第一盒糖果只有1个&#xff0c;那么B就要给出6…

“真”5G标准出炉!不止是速度,这些难以置信的改变将彻底颠覆你的生活

来源&#xff1a;传球创新论坛摘要&#xff1a;5G时代又离我们近了一大步。6月14上午11时许&#xff0c;3GPP批准了第五代移动通信标准5G NR独立组网&#xff08;SA&#xff09;的冻结&#xff0c;这意味着首个完整意义的国际5G标准正式确立。未来&#xff0c;5G技术将成为和电…

写在囧男囧女们的七夕节

写在囧男囧女们的七夕节 当我们发现某些节日已经远离我们时&#xff0c;我们往往会以幽默的方式&#xff0c;让自己无厘头地般地分享节日的快乐 。比如&#xff0c;每年的六月一日&#xff0c;不管童年已经离我们多遥远&#xff0c;见朋友或同事就来上一句“节日快乐”&#xf…

安卓逆向_16 --- ARM 静态分析( 使用 IDA Pro 分析 ARM 汇编【java_ 和 JNI_OnLoad】 )

菜鸟总结 so 分析&#xff0c;arm 汇编&#xff0c;IDA 静态分析&#xff1a;https://www.52pojie.cn/thread-695063-1-1.html JNI 静态注册 so 和 IDA 导入的 JNI.h 文件.zip&#xff1a;https://download.csdn.net/download/freeking101/12571373 ARM 静态分析&#xff1a; …

人脑的前世今生

来源&#xff1a;科学网摘要&#xff1a;人类的神奇常常归结于一个智慧的大脑以及贯穿于其中的无比复杂的神经网络&#xff0c;并认为这源自上帝之手&#xff0c;但其实它并不是无中生有的&#xff0c;而是自然演化的产物&#xff0c;虽然是一个无与伦比的杰作&#xff0c;但人…

Web在线操作Office之Word

最近公司有个项目&#xff0c;需要直接在IE浏览器中操作Word文档&#xff0c;随后在网上搜了一些资料&#xff0c;感觉都不是很理想。不过&#xff0c;最后还是找到了一个 功能比较强的控件&#xff0c;是老外开发的控件&#xff0c;需要注册。还好&#xff0c;没有注册时&…

安卓逆向_17 --- IDA 动态调试【 环境搭建、so库调试【动态普通、动态debug模式】、JNI_OnLoad调试分析、java_ 开头函数分析】

哔哩哔哩视频&#xff1a;https://www.bilibili.com/video/BV1UE411A7rW?p54 IDA Pro调试so&#xff0c;附加完毕&#xff0c;跳到目标so基址&#xff0c;但是内容都是DCB伪指令&#xff1f;&#xff1a;https://bbs.pediy.com/thread-222646.htm Android 中 adb shell ps 查…

2018全球科技创新报告

来源&#xff1a;199IT互联网数据中心摘要&#xff1a;毕马威报告显示&#xff0c;我们如今正处在一个科技创新爆发的时代&#xff0c;对于科技企业来说&#xff0c;现在不是害怕失败或是裹足不前的时候。毕马威报告显示&#xff0c;我们如今正处在一个科技创新爆发的时代&…

安卓逆向_18 --- APK保护策略【Java代码混淆、资源混淆、签名校验】

Java 代码混淆介绍&#xff1a;https://www.bilibili.com/video/BV1UE411A7rW?p60 Android 反编译利器 jadx&#xff1a;GitHub上直接下载&#xff1a;https://github.com/skylot/jadx Jeb 软件是一款专业实用且为安全专业人士设计的 Android 应用程序的反编绎工具&#xff0…

VC++ CFileDialog文件选择对话框的构造和文件操作

CFileDialog文件选择对话框的使用&#xff1a;首先构造一个对象并提供相应的参数&#xff0c;然后判断选择的是确定按钮IDOK &#xff0c;还是取消按钮IDCANCEL 。构造函数原型如下&#xff1a;CFileDialog::CFileDialog( BOOL bOpenFileDialog,LPCTSTR lpszDefExt NULL,LPCTS…