日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

Python 异步 redis

發布時間:2024/7/23 python 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Python 异步 redis 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

?

現在的 Python 的異步 redis,有三種( aredis 、aioredis、asynio_redis)

?

?

aredis 、aioredis、asynio_redis 對比

?

From:https://zhuanlan.zhihu.com/p/24720629

  • aioredis 要求裝上 hiredis , 而 aredis 可以不需要相關依賴地運行,速度上兩者持平,且都可以使用 hiredis 來作為 parser ,用 uvloop 代替 asyncio 的 eventloop 來加速
  • asyncio_redis 使用了 Python 提供的 protocol 來進行異步通信,而 aredis 則使用 StreamReader 和 StreamWriter 來進行異步通信,在運行速度上兩倍于 asyncio_redis ,附上?benchmark
  • aioredis?和?asyncio_redis?這兩個客戶端目前都還沒有對于集群的支持,相對來說 aredis 的功能更為全面一些
  • ?

    asyncio 從 redis 取任務

    asyncio 怎么與 redis 隊列協同?:https://www.v2ex.com/amp/t/489042

    ?

    ?

    1. aredis

    ?

    github 地址:https://github.com/NoneGG/aredis

    aredis 官方英文文檔:https://aredis.readthedocs.io/en/latest/

    aredis 一個高效和用戶友好的異步Redis客戶端:https://www.ctolib.com/aredis.html

    :https://github.com/NoneGG/aredis/tree/master/examples

    ?

    安裝:

    pip install aredis

    ?

    開始使用:

    更多使用示例:https://github.com/NoneGG/aredis/tree/master/examples

    ?

    1. 單節點版

    import asyncio from aredis import StrictRedisasync def example():client = StrictRedis(host='127.0.0.1', port=6379, db=0)await client.flushdb()await client.set('foo', 1)assert await client.exists('foo') is Trueawait client.incr('foo', 100)assert int(await client.get('foo')) == 101await client.expire('foo', 1)await asyncio.sleep(0.1)await client.ttl('foo')await asyncio.sleep(1)assert not await client.exists('foo')loop = asyncio.get_event_loop() loop.run_until_complete(example())

    ?

    2. 集群版

    import asyncio from aredis import StrictRedisClusterasync def example():client = StrictRedisCluster(host='172.17.0.2', port=7001)await client.flushdb()await client.set('foo', 1)await client.lpush('a', 1)print(await client.cluster_slots())await client.rpoplpush('a', 'b')assert await client.rpop('b') == b'1'loop = asyncio.get_event_loop() loop.run_until_complete(example())

    ?

    ?

    2. aioredis

    ?

    github 地址:https://github.com/aio-libs/aioredis

    官方文檔:https://aioredis.readthedocs.io/en/v1.3.0/

    ?

    開始使用

    ?

    安裝:pip install aioredis

    連接 redis

    import asyncio import aioredisasync def main():redis = await aioredis.create_redis_pool('redis://localhost')await redis.set('my-key', 'value')value = await redis.get('my-key', encoding='utf-8')print(value)redis.close()await redis.wait_closed()asyncio.run(main())

    simple low-level interface:

    import asyncio import aioredisloop = asyncio.get_event_loop()async def go():conn = await aioredis.create_connection(('localhost', 6379), loop=loop)await conn.execute('set', 'my-key', 'value')val = await conn.execute('get', 'my-key')print(val)conn.close()await conn.wait_closed()loop.run_until_complete(go()) # will print 'value'

    simple high-level interface:

    import asyncio import aioredisloop = asyncio.get_event_loop()async def go():redis = await aioredis.create_redis(('localhost', 6379), loop=loop)await redis.set('my-key', 'value')val = await redis.get('my-key')print(val)redis.close()await redis.wait_closed()loop.run_until_complete(go()) # will print 'value'

    Connections pool:

    import asyncio import aioredisloop = asyncio.get_event_loop()async def go():pool = await aioredis.create_pool(('localhost', 6379), minsize=5, maxsize=10, loop=loop)with await pool as redis: # high-level redis API instanceawait redis.set('my-key', 'value')print(await redis.get('my-key'))# graceful shutdownpool.close()await pool.wait_closed()loop.run_until_complete(go())

    ?

    連接到指定 db 的 兩種方法:

  • 指定 db 參數:redis?=?await?aioredis.create_redis_pool('redis://localhost',?db=1)
  • 在 URL 中指定 db:redis?=?await?aioredis.create_redis_pool('redis://localhost/2')
  • 使用 select 方法: redis = await aioredis.create_redis_pool('redis://localhost/') await redis.select(3)
  • ?

    連接帶密碼的 redis 實例:

    The password can be specified either in keyword argument or in address URI:

    redis = await aioredis.create_redis_pool('redis://localhost', password='sEcRet')redis = await aioredis.create_redis_pool('redis://:sEcRet@localhost/')

    ?

    結果編碼:

    By default?aioredis?will return?bytes?for most Redis commands that return string replies. Redis error replies are known to be valid UTF-8 strings so error messages are decoded automatically.

    If you know that data in Redis is valid string you can tell?aioredis?to decode result by passing keyword-only argument?encoding?in a command call:

    示例代碼:

    import asyncio import aioredisasync def main():redis = await aioredis.create_redis_pool('redis://localhost')await redis.set('key', 'string-value')bin_value = await redis.get('key')assert bin_value == b'string-value'str_value = await redis.get('key', encoding='utf-8')assert str_value == 'string-value'redis.close()await redis.wait_closed()asyncio.run(main())

    示例代碼:

    import asyncio import aioredisasync def main():redis = await aioredis.create_redis_pool('redis://localhost')await redis.hmset_dict('hash', key1='value1', key2='value2', key3=123)result = await redis.hgetall('hash', encoding='utf-8')assert result == {'key1': 'value1','key2': 'value2','key3': '123', # note that Redis returns int as string}redis.close()await redis.wait_closed()asyncio.run(main())

    ?

    事務(?Multi/Exec )

    import asyncio import aioredisasync def main():redis = await aioredis.create_redis_pool('redis://localhost')tr = redis.multi_exec()tr.set('key1', 'value1')tr.set('key2', 'value2')ok1, ok2 = await tr.execute()assert ok1assert ok2asyncio.run(main())

    multi_exec()?method creates and returns new?MultiExec?object which is used for buffering commands and then executing them inside MULTI/EXEC block.

    重要提示:不要在 類似 (?tr.set('foo',?'123')?) 上 使用?await?buffered 命令,?因為它將被永遠阻塞。

    下面的代碼將會給永遠阻塞:

    tr = redis.multi_exec() await tr.incr('foo') # that's all. we've stuck!

    ?

    發布訂閱 模式

    aioredis 提供了對 Redis 的 發布/訂閱(Publish / Subscribe)?消息的支持。

    To start listening for messages you must call either?subscribe()?or?psubscribe()?method. Both methods return list of?Channel?objects representing subscribed channels.

    Right after that the channel will receive and store messages (the?Channel?object is basically a wrapper around?asyncio.Queue). To read messages from channel you need to use?get()?or?get_json()?coroutines.

    訂閱 和 閱讀 頻道 示例:

    import asyncio import aioredisasync def main():redis = await aioredis.create_redis_pool('redis://localhost')ch1, ch2 = await redis.subscribe('channel:1', 'channel:2')assert isinstance(ch1, aioredis.Channel)assert isinstance(ch2, aioredis.Channel)async def reader(channel):async for message in channel.iter():print("Got message:", message)asyncio.get_running_loop().create_task(reader(ch1))asyncio.get_running_loop().create_task(reader(ch2))await redis.publish('channel:1', 'Hello')await redis.publish('channel:2', 'World')redis.close()await redis.wait_closed()asyncio.run(main())

    訂閱 和 閱讀 模式:

    import asyncio import aioredisasync def main():redis = await aioredis.create_redis_pool('redis://localhost')ch, = await redis.psubscribe('channel:*')assert isinstance(ch, aioredis.Channel)async def reader(channel):async for ch, message in channel.iter():print("Got message in channel:", ch, ":", message)asyncio.get_running_loop().create_task(reader(ch))await redis.publish('channel:1', 'Hello')await redis.publish('channel:2', 'World')redis.close()await redis.wait_closed()asyncio.run(main())

    ?

    Sentinel client

    import asyncio import aioredisasync def main():sentinel = await aioredis.create_sentinel(['redis://localhost:26379', 'redis://sentinel2:26379'])redis = sentinel.master_for('mymaster')ok = await redis.set('key', 'value')assert okval = await redis.get('key', encoding='utf-8')assert val == 'value'asyncio.run(main())

    Sentinel 客戶端需要一個 Redis Sentinel 地址列表,來連接并開始發現服務。

    調用 master_for() 或 slave_for() 方法 將返回連接到 Sentinel 監視的指定服務的 Redis 客戶端。

    Sentinel 客戶端將自動檢測故障轉移并重新連接 Redis 客戶端。

    import asyncio import aioredisloop = asyncio.get_event_loop()async def go():conn = await aioredis.create_connection(('localhost', 6379), loop=loop)await conn.execute('set', 'my-key', 'value')val = await conn.execute('get', 'my-key')print(val)conn.close()await conn.wait_closed() loop.run_until_complete(go()) # will print 'value'

    連接池

    from sanic import Sanic, response import aioredisapp = Sanic(__name__)@app.route("/") async def handle(request):async with request.app.redis_pool.get() as redis:await redis.execute('set', 'my-key', 'value')val = await redis.execute('get', 'my-key')return response.text(val.decode('utf-8'))@app.listener('before_server_start') async def before_server_start(app, loop):app.redis_pool = await aioredis.create_pool(('localhost', 6379),minsize=5,maxsize=10,loop=loop)@app.listener('after_server_stop') async def after_server_stop(app, loop):app.redis_pool.close()await app.redis_pool.wait_closed()if __name__ == '__main__':app.run(host="0.0.0.0", port=80)

    ?

    示例:

    import asyncio import aioredisloop = asyncio.get_event_loop()async def go_1():conn = await aioredis.create_connection(('localhost', 6379), loop=loop)await conn.execute('set', 'my-key', 'value')val = await conn.execute('get', 'my-key')print(val)conn.close()await conn.wait_closed()async def go_2():redis = await aioredis.create_redis(('localhost', 6379), loop=loop)await redis.set('my-key', 'value')val = await redis.get('my-key')print(val)redis.close()await redis.wait_closed()async def go_3():redis_pool = await aioredis.create_pool(('localhost', 6379), minsize=5, maxsize=10, loop=loop)async with redis_pool.get() as conn: # high-level redis API instanceawait conn.execute('set', 'my-key', 'value')print(await conn.execute('get', 'my-key'))# graceful shutdownredis_pool.close()await redis_pool.wait_closed()loop.run_until_complete(go_1()) # loop.run_until_complete(go_2()) # loop.run_until_complete(go_3())

    ?

    ?

    3. asynio_redis

    ?

    GitHub 地址:https://github.com/jonathanslenders/asyncio-redis

    官方英文文檔:https://asyncio-redis.readthedocs.io/en/latest/

    ?

    安裝:pip?install?asyncio_redis

    ?

    The connection class

    A?asyncio_redis.Connection?instance will take care of the connection and will automatically reconnect, using a new transport when the connection drops. This connection class also acts as a proxy to a?asyncio_redis.RedisProtocol?instance; any Redis command of the protocol can be called directly at the connection.

    import asyncio import asyncio_redisasync def example():# Create Redis connectionconnection = await asyncio_redis.Connection.create(host='127.0.0.1', port=6379) # Set a keyawait connection.set('my_key', 'my_value')# When finished, close the connection.connection.close()if __name__ == '__main__':loop = asyncio.get_event_loop()loop.run_until_complete(example())

    ?

    Connection pooling

    Requests will automatically be distributed among all connections in a pool. If a connection is blocking because of --for instance-- a blocking rpop, another connection will be used for new commands.

    import asyncio import asyncio_redisasync def example():# Create Redis connectionconnection = await asyncio_redis.Pool.create(host='127.0.0.1', port=6379, poolsize=10)# Set a keyawait connection.set('my_key', 'my_value')# When finished, close the connection pool.connection.close()if __name__ == '__main__':loop = asyncio.get_event_loop()loop.run_until_complete(example())

    ?

    Transactions example

    import asyncio import asyncio_redisasync def example():# Create Redis connectionconnection = await asyncio_redis.Pool.create(host='127.0.0.1', port=6379, poolsize=10)# Create transactiontransaction = await connection.multi()# Run commands in transaction (they return future objects)f1 = await transaction.set('key', 'value')f2 = await transaction.set('another_key', 'another_value')# Commit transactionawait transaction.exec()# Retrieve resultsresult1 = await f1result2 = await f2# When finished, close the connection pool.connection.close()

    It's recommended to use a large enough poolsize. A connection will be occupied as long as there's a transaction running in there.

    ?

    Pub / sub example

    import asyncio import asyncio_redisasync def example():# Create connectionconnection = await asyncio_redis.Connection.create(host='127.0.0.1', port=6379)# Create subscriber.subscriber = await connection.start_subscribe()# Subscribe to channel.await subscriber.subscribe([ 'our-channel' ])# Inside a while loop, wait for incoming events.while True:reply = await subscriber.next_published()print('Received: ', repr(reply.value), 'on channel', reply.channel)# When finished, close the connection.connection.close()

    ?

    LUA Scripting example

    import asyncio import asyncio_rediscode = """ local value = redis.call('GET', KEYS[1]) value = tonumber(value) return value * ARGV[1] """async def example():connection = await asyncio_redis.Connection.create(host='127.0.0.1', port=6379)# Set a keyawait connection.set('my_key', '2')# Register scriptmultiply = await connection.register_script(code)# Run scriptscript_reply = await multiply.run(keys=['my_key'], args=['5'])result = await script_reply.return_value()print(result) # prints 2 * 5# When finished, close the connection.connection.close()

    ?

    Example using the Protocol class

    import asyncio import asyncio_redisasync def example():loop = asyncio.get_event_loop()# Create Redis connectiontransport, protocol = await loop.create_connection(asyncio_redis.RedisProtocol, '127.0.0.1', 6379)# Set a keyawait protocol.set('my_key', 'my_value')# Get a keyresult = await protocol.get('my_key')print(result)# Close transport when finished.transport.close()if __name__ == '__main__':asyncio.get_event_loop().run_until_complete(example())pass

    ?

    ?

    asyncio-redis 是 Python?asyncio?的?Redis?客戶端 (PEP 3156)。這個 Redis 庫是完全異步的,Reids 服務器非阻塞客戶端,依賴于 asyncio,所以要求 Python 3.3. 以上版本。

    安裝:pip install asyncio-redis

    創建一個redisUtils.py

    class Redis:"""A simple wrapper class that allows you to share a connectionpool across your application."""_pool = Noneasync def get_redis_pool(self):REDIS_CONFIG = {'host': 'localhost', 'port': 6379}try:from config import REDIS_CONFIGexcept:passif not self._pool:self._pool = await asyncio_redis.Pool.create(host=REDIS_CONFIG['host'], port=REDIS_CONFIG['port'], password=REDIS_CONFIG.get('password'), poolsize=10)return self._poolasync def close(self):if self._pool:self._pool.close()

    再創建一個run.py

    from utils.redisUtils import Redis #引用上面的配置 import json as jsonnredis = Redis() r = await redis.get_redis_pool() key = '/hushuai/product' await r.set(key, 'value') val = await r.get(key) print(val)key_list = '/hushuai/product_list' product_list_size = await r.llen(key_list) print(product_list_size) if product_list_size != 0:if product_list_size > start_page:product_list = await r.lrange(key, start_page, end_page)product_list = await product_list.aslist()product_list_other = []for i in product_list:product_list_other.append(jsonn.loads(i.replace('\'', '\"').replace('None', '""')))data = [product_list_size, product_list_other]else:data = await get_items(app.pool,"product_view",re_params,with_total=with_total,pager=pager) else:data = await get_items(app.pool, "product_view", re_params, with_total=with_total, pager=pager)data_redis = await get_items(app.pool, "product_view", re_params)list = []for product_data in data_redis:list.append(str(product_data))if list:await r.rpush(key, list)

    主要講的是python 使用異步redis的方式,這里只是做了redis的str和list兩種類型的數據處理。

    ?

    ?

    ?

    ?

    ?

    ?

    總結

    以上是生活随笔為你收集整理的Python 异步 redis的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

    主站蜘蛛池模板: 宅男噜噜噜66一区二区 | 艳妇臀荡乳欲伦交换gif | 最新啪啪网站 | 四虎影视成人永久免费观看亚洲欧美 | 免费黄色在线网站 | 加勒比毛片 | 6080黄色 | 男人狂揉女人下部视频 | 欧美高清a | 丁香激情综合 | av中文在线资源 | 人妻精品久久久久中文字幕 | 蜜桃av噜噜一区二区三区 | 躁躁躁日日躁 | 岛国片在线播放 | 免费在线小视频 | 日本伦理中文字幕 | 亚洲午夜精品久久久 | 久久精品久久久久久久 | 精产国品一二三产区m553麻豆 | 五月综合激情网 | 国产麻豆交换夫妇 | 污视频网址在线观看 | 天天插天天操 | 国产粉嫩在线观看 | 欧美成人精品一区二区 | 超碰超碰| 永久影院| 黄视频网站免费看 | 极品美女高潮出白浆 | 99热在线免费观看 | 秋霞成人午夜鲁丝一区二区三区 | 99热免费在线 | 男女无遮挡做爰猛烈视频 | 久精品视频 | 日韩 欧美 自拍 | 色哟哟在线视频 | 国产av天堂无码一区二区三区 | 自拍偷拍一区二区三区 | 99re99热| 中文国语毛片高清视频 | 成年网站免费在线观看 | 国产成人精品一区二区三区网站观看 | 中文字幕乱码在线观看 | 欧美自拍视频在线观看 | 瑟瑟视频在线免费观看 | 色八区 | 无码人妻丰满熟妇啪啪 | 免费一级黄色 | av久草| 久久久久成人网站 | 国产美女精品人人做人人爽 | 毛片123| 日本理论片午伦夜理片在线观看 | 性欢交69国产精品 | 久久精品欧美日韩精品 | 国产精品亚洲二区在线观看 | 青草草在线视频 | 理论片av | 成人看片黄a免费看视频 | 玖玖视频| 国产美女又黄又爽又色视频免费 | 成熟人妻av无码专区 | 美女网站免费观看 | 一本高清视频 | 日韩一级免费毛片 | 69午夜 | 无码国产69精品久久久久同性 | 国产成人精品午夜福利Av免费 | 国产视频久久久久久 | 亚洲精品福利在线观看 | 亚洲aaaaaa | 在线国产精品一区 | 毛片1000部免费看 | 国产一区二区在线免费 | av一区二| 欧美人一级淫片a免费播放 西方av在线 | 一级黄色免费观看 | 久久综合激情网 | 中文字幕一区久久 | 青青青青在线 | 日本一区二区视频在线播放 | 天天爱天天草 | 日日天天 | 欧美干干干| 99国产精品一区二区 | 欧美嫩草影院 | 青青青视频免费 | 四虎精品一区二区三区 | 最新在线视频 | 国产毛片av | 国产剧情精品 | 日韩久久精品电影 | 无码精品人妻一区二区三区漫画 | 国产精品6 | 暴力调教一区二区三区 | 激情小说综合 | 精品综合久久 | 久久国产一二三 |