Python 异步 redis
?
現在的 Python 的異步 redis,有三種( aredis 、aioredis、asynio_redis)
?
?
aredis 、aioredis、asynio_redis 對比
?
From:https://zhuanlan.zhihu.com/p/24720629
?
asyncio 從 redis 取任務
asyncio 怎么與 redis 隊列協同?:https://www.v2ex.com/amp/t/489042
?
?
1. aredis
?
github 地址:https://github.com/NoneGG/aredis
aredis 官方英文文檔:https://aredis.readthedocs.io/en/latest/
aredis 一個高效和用戶友好的異步Redis客戶端:https://www.ctolib.com/aredis.html
:https://github.com/NoneGG/aredis/tree/master/examples
?
安裝:
pip install aredis?
開始使用:
更多使用示例:https://github.com/NoneGG/aredis/tree/master/examples
?
1. 單節點版
import asyncio from aredis import StrictRedisasync def example():client = StrictRedis(host='127.0.0.1', port=6379, db=0)await client.flushdb()await client.set('foo', 1)assert await client.exists('foo') is Trueawait client.incr('foo', 100)assert int(await client.get('foo')) == 101await client.expire('foo', 1)await asyncio.sleep(0.1)await client.ttl('foo')await asyncio.sleep(1)assert not await client.exists('foo')loop = asyncio.get_event_loop() loop.run_until_complete(example())?
2. 集群版
import asyncio from aredis import StrictRedisClusterasync def example():client = StrictRedisCluster(host='172.17.0.2', port=7001)await client.flushdb()await client.set('foo', 1)await client.lpush('a', 1)print(await client.cluster_slots())await client.rpoplpush('a', 'b')assert await client.rpop('b') == b'1'loop = asyncio.get_event_loop() loop.run_until_complete(example())?
?
2. aioredis
?
github 地址:https://github.com/aio-libs/aioredis
官方文檔:https://aioredis.readthedocs.io/en/v1.3.0/
?
開始使用
?
安裝:pip install aioredis
連接 redis
import asyncio import aioredisasync def main():redis = await aioredis.create_redis_pool('redis://localhost')await redis.set('my-key', 'value')value = await redis.get('my-key', encoding='utf-8')print(value)redis.close()await redis.wait_closed()asyncio.run(main())simple low-level interface:
import asyncio import aioredisloop = asyncio.get_event_loop()async def go():conn = await aioredis.create_connection(('localhost', 6379), loop=loop)await conn.execute('set', 'my-key', 'value')val = await conn.execute('get', 'my-key')print(val)conn.close()await conn.wait_closed()loop.run_until_complete(go()) # will print 'value'simple high-level interface:
import asyncio import aioredisloop = asyncio.get_event_loop()async def go():redis = await aioredis.create_redis(('localhost', 6379), loop=loop)await redis.set('my-key', 'value')val = await redis.get('my-key')print(val)redis.close()await redis.wait_closed()loop.run_until_complete(go()) # will print 'value'Connections pool:
import asyncio import aioredisloop = asyncio.get_event_loop()async def go():pool = await aioredis.create_pool(('localhost', 6379), minsize=5, maxsize=10, loop=loop)with await pool as redis: # high-level redis API instanceawait redis.set('my-key', 'value')print(await redis.get('my-key'))# graceful shutdownpool.close()await pool.wait_closed()loop.run_until_complete(go())?
連接到指定 db 的 兩種方法:
?
連接帶密碼的 redis 實例:
The password can be specified either in keyword argument or in address URI:
redis = await aioredis.create_redis_pool('redis://localhost', password='sEcRet')redis = await aioredis.create_redis_pool('redis://:sEcRet@localhost/')?
結果編碼:
By default?aioredis?will return?bytes?for most Redis commands that return string replies. Redis error replies are known to be valid UTF-8 strings so error messages are decoded automatically.
If you know that data in Redis is valid string you can tell?aioredis?to decode result by passing keyword-only argument?encoding?in a command call:
示例代碼:
import asyncio import aioredisasync def main():redis = await aioredis.create_redis_pool('redis://localhost')await redis.set('key', 'string-value')bin_value = await redis.get('key')assert bin_value == b'string-value'str_value = await redis.get('key', encoding='utf-8')assert str_value == 'string-value'redis.close()await redis.wait_closed()asyncio.run(main())示例代碼:
import asyncio import aioredisasync def main():redis = await aioredis.create_redis_pool('redis://localhost')await redis.hmset_dict('hash', key1='value1', key2='value2', key3=123)result = await redis.hgetall('hash', encoding='utf-8')assert result == {'key1': 'value1','key2': 'value2','key3': '123', # note that Redis returns int as string}redis.close()await redis.wait_closed()asyncio.run(main())?
事務(?Multi/Exec )
import asyncio import aioredisasync def main():redis = await aioredis.create_redis_pool('redis://localhost')tr = redis.multi_exec()tr.set('key1', 'value1')tr.set('key2', 'value2')ok1, ok2 = await tr.execute()assert ok1assert ok2asyncio.run(main())multi_exec()?method creates and returns new?MultiExec?object which is used for buffering commands and then executing them inside MULTI/EXEC block.
重要提示:不要在 類似 (?tr.set('foo',?'123')?) 上 使用?await?buffered 命令,?因為它將被永遠阻塞。
下面的代碼將會給永遠阻塞:
tr = redis.multi_exec() await tr.incr('foo') # that's all. we've stuck!?
發布 和 訂閱 模式
aioredis 提供了對 Redis 的 發布/訂閱(Publish / Subscribe)?消息的支持。
To start listening for messages you must call either?subscribe()?or?psubscribe()?method. Both methods return list of?Channel?objects representing subscribed channels.
Right after that the channel will receive and store messages (the?Channel?object is basically a wrapper around?asyncio.Queue). To read messages from channel you need to use?get()?or?get_json()?coroutines.
訂閱 和 閱讀 頻道 示例:
import asyncio import aioredisasync def main():redis = await aioredis.create_redis_pool('redis://localhost')ch1, ch2 = await redis.subscribe('channel:1', 'channel:2')assert isinstance(ch1, aioredis.Channel)assert isinstance(ch2, aioredis.Channel)async def reader(channel):async for message in channel.iter():print("Got message:", message)asyncio.get_running_loop().create_task(reader(ch1))asyncio.get_running_loop().create_task(reader(ch2))await redis.publish('channel:1', 'Hello')await redis.publish('channel:2', 'World')redis.close()await redis.wait_closed()asyncio.run(main())訂閱 和 閱讀 模式:
import asyncio import aioredisasync def main():redis = await aioredis.create_redis_pool('redis://localhost')ch, = await redis.psubscribe('channel:*')assert isinstance(ch, aioredis.Channel)async def reader(channel):async for ch, message in channel.iter():print("Got message in channel:", ch, ":", message)asyncio.get_running_loop().create_task(reader(ch))await redis.publish('channel:1', 'Hello')await redis.publish('channel:2', 'World')redis.close()await redis.wait_closed()asyncio.run(main())?
Sentinel client
import asyncio import aioredisasync def main():sentinel = await aioredis.create_sentinel(['redis://localhost:26379', 'redis://sentinel2:26379'])redis = sentinel.master_for('mymaster')ok = await redis.set('key', 'value')assert okval = await redis.get('key', encoding='utf-8')assert val == 'value'asyncio.run(main())Sentinel 客戶端需要一個 Redis Sentinel 地址列表,來連接并開始發現服務。
調用 master_for() 或 slave_for() 方法 將返回連接到 Sentinel 監視的指定服務的 Redis 客戶端。
Sentinel 客戶端將自動檢測故障轉移并重新連接 Redis 客戶端。
import asyncio import aioredisloop = asyncio.get_event_loop()async def go():conn = await aioredis.create_connection(('localhost', 6379), loop=loop)await conn.execute('set', 'my-key', 'value')val = await conn.execute('get', 'my-key')print(val)conn.close()await conn.wait_closed() loop.run_until_complete(go()) # will print 'value'連接池
from sanic import Sanic, response import aioredisapp = Sanic(__name__)@app.route("/") async def handle(request):async with request.app.redis_pool.get() as redis:await redis.execute('set', 'my-key', 'value')val = await redis.execute('get', 'my-key')return response.text(val.decode('utf-8'))@app.listener('before_server_start') async def before_server_start(app, loop):app.redis_pool = await aioredis.create_pool(('localhost', 6379),minsize=5,maxsize=10,loop=loop)@app.listener('after_server_stop') async def after_server_stop(app, loop):app.redis_pool.close()await app.redis_pool.wait_closed()if __name__ == '__main__':app.run(host="0.0.0.0", port=80)?
示例:
import asyncio import aioredisloop = asyncio.get_event_loop()async def go_1():conn = await aioredis.create_connection(('localhost', 6379), loop=loop)await conn.execute('set', 'my-key', 'value')val = await conn.execute('get', 'my-key')print(val)conn.close()await conn.wait_closed()async def go_2():redis = await aioredis.create_redis(('localhost', 6379), loop=loop)await redis.set('my-key', 'value')val = await redis.get('my-key')print(val)redis.close()await redis.wait_closed()async def go_3():redis_pool = await aioredis.create_pool(('localhost', 6379), minsize=5, maxsize=10, loop=loop)async with redis_pool.get() as conn: # high-level redis API instanceawait conn.execute('set', 'my-key', 'value')print(await conn.execute('get', 'my-key'))# graceful shutdownredis_pool.close()await redis_pool.wait_closed()loop.run_until_complete(go_1()) # loop.run_until_complete(go_2()) # loop.run_until_complete(go_3())?
?
3. asynio_redis
?
GitHub 地址:https://github.com/jonathanslenders/asyncio-redis
官方英文文檔:https://asyncio-redis.readthedocs.io/en/latest/
?
安裝:pip?install?asyncio_redis
?
The connection class
A?asyncio_redis.Connection?instance will take care of the connection and will automatically reconnect, using a new transport when the connection drops. This connection class also acts as a proxy to a?asyncio_redis.RedisProtocol?instance; any Redis command of the protocol can be called directly at the connection.
import asyncio import asyncio_redisasync def example():# Create Redis connectionconnection = await asyncio_redis.Connection.create(host='127.0.0.1', port=6379) # Set a keyawait connection.set('my_key', 'my_value')# When finished, close the connection.connection.close()if __name__ == '__main__':loop = asyncio.get_event_loop()loop.run_until_complete(example())?
Connection pooling
Requests will automatically be distributed among all connections in a pool. If a connection is blocking because of --for instance-- a blocking rpop, another connection will be used for new commands.
import asyncio import asyncio_redisasync def example():# Create Redis connectionconnection = await asyncio_redis.Pool.create(host='127.0.0.1', port=6379, poolsize=10)# Set a keyawait connection.set('my_key', 'my_value')# When finished, close the connection pool.connection.close()if __name__ == '__main__':loop = asyncio.get_event_loop()loop.run_until_complete(example())?
Transactions example
import asyncio import asyncio_redisasync def example():# Create Redis connectionconnection = await asyncio_redis.Pool.create(host='127.0.0.1', port=6379, poolsize=10)# Create transactiontransaction = await connection.multi()# Run commands in transaction (they return future objects)f1 = await transaction.set('key', 'value')f2 = await transaction.set('another_key', 'another_value')# Commit transactionawait transaction.exec()# Retrieve resultsresult1 = await f1result2 = await f2# When finished, close the connection pool.connection.close()It's recommended to use a large enough poolsize. A connection will be occupied as long as there's a transaction running in there.
?
Pub / sub example
import asyncio import asyncio_redisasync def example():# Create connectionconnection = await asyncio_redis.Connection.create(host='127.0.0.1', port=6379)# Create subscriber.subscriber = await connection.start_subscribe()# Subscribe to channel.await subscriber.subscribe([ 'our-channel' ])# Inside a while loop, wait for incoming events.while True:reply = await subscriber.next_published()print('Received: ', repr(reply.value), 'on channel', reply.channel)# When finished, close the connection.connection.close()?
LUA Scripting example
import asyncio import asyncio_rediscode = """ local value = redis.call('GET', KEYS[1]) value = tonumber(value) return value * ARGV[1] """async def example():connection = await asyncio_redis.Connection.create(host='127.0.0.1', port=6379)# Set a keyawait connection.set('my_key', '2')# Register scriptmultiply = await connection.register_script(code)# Run scriptscript_reply = await multiply.run(keys=['my_key'], args=['5'])result = await script_reply.return_value()print(result) # prints 2 * 5# When finished, close the connection.connection.close()?
Example using the Protocol class
import asyncio import asyncio_redisasync def example():loop = asyncio.get_event_loop()# Create Redis connectiontransport, protocol = await loop.create_connection(asyncio_redis.RedisProtocol, '127.0.0.1', 6379)# Set a keyawait protocol.set('my_key', 'my_value')# Get a keyresult = await protocol.get('my_key')print(result)# Close transport when finished.transport.close()if __name__ == '__main__':asyncio.get_event_loop().run_until_complete(example())pass?
?
asyncio-redis 是 Python?asyncio?的?Redis?客戶端 (PEP 3156)。這個 Redis 庫是完全異步的,Reids 服務器非阻塞客戶端,依賴于 asyncio,所以要求 Python 3.3. 以上版本。
安裝:pip install asyncio-redis
創建一個redisUtils.py
class Redis:"""A simple wrapper class that allows you to share a connectionpool across your application."""_pool = Noneasync def get_redis_pool(self):REDIS_CONFIG = {'host': 'localhost', 'port': 6379}try:from config import REDIS_CONFIGexcept:passif not self._pool:self._pool = await asyncio_redis.Pool.create(host=REDIS_CONFIG['host'], port=REDIS_CONFIG['port'], password=REDIS_CONFIG.get('password'), poolsize=10)return self._poolasync def close(self):if self._pool:self._pool.close()再創建一個run.py
from utils.redisUtils import Redis #引用上面的配置 import json as jsonnredis = Redis() r = await redis.get_redis_pool() key = '/hushuai/product' await r.set(key, 'value') val = await r.get(key) print(val)key_list = '/hushuai/product_list' product_list_size = await r.llen(key_list) print(product_list_size) if product_list_size != 0:if product_list_size > start_page:product_list = await r.lrange(key, start_page, end_page)product_list = await product_list.aslist()product_list_other = []for i in product_list:product_list_other.append(jsonn.loads(i.replace('\'', '\"').replace('None', '""')))data = [product_list_size, product_list_other]else:data = await get_items(app.pool,"product_view",re_params,with_total=with_total,pager=pager) else:data = await get_items(app.pool, "product_view", re_params, with_total=with_total, pager=pager)data_redis = await get_items(app.pool, "product_view", re_params)list = []for product_data in data_redis:list.append(str(product_data))if list:await r.rpush(key, list)主要講的是python 使用異步redis的方式,這里只是做了redis的str和list兩種類型的數據處理。
?
?
?
?
?
?
總結
以上是生活随笔為你收集整理的Python 异步 redis的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 我的第一个 PHP
- 下一篇: 跳石板(通俗易懂的思路和方法)