日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 运维知识 > 数据库 >内容正文

数据库

Redis高级客户端Lettuce详解

發(fā)布時(shí)間:2025/10/17 数据库 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Redis高级客户端Lettuce详解 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

前提

Lettuce是一個(gè)Redis的Java驅(qū)動(dòng)包,初識(shí)她的時(shí)候是使用RedisTemplate的時(shí)候遇到點(diǎn)問(wèn)題Debug到底層的一些源碼,發(fā)現(xiàn)spring-data-redis的驅(qū)動(dòng)包在某個(gè)版本之后替換為L(zhǎng)ettuce。Lettuce翻譯為生菜,沒錯(cuò),就是吃的那種生菜,所以它的Logo長(zhǎng)這樣:

既然能被Spring生態(tài)所認(rèn)可,Lettuce想必有過(guò)人之處,于是筆者花時(shí)間閱讀她的官方文檔,整理測(cè)試示例,寫下這篇文章。編寫本文時(shí)所使用的版本為L(zhǎng)ettuce 5.1.8.RELEASE,SpringBoot 2.1.8.RELEASE,JDK [8,11]。超長(zhǎng)警告:這篇文章斷斷續(xù)續(xù)花了兩周完成,超過(guò)4萬(wàn)字.....

Lettuce簡(jiǎn)介

Lettuce是一個(gè)高性能基于Java編寫的Redis驅(qū)動(dòng)框架,底層集成了Project Reactor提供天然的反應(yīng)式編程,通信框架集成了Netty使用了非阻塞IO,5.x版本之后融合了JDK1.8的異步編程特性,在保證高性能的同時(shí)提供了十分豐富易用的API,5.1版本的新特性如下:

  • 支持Redis的新增命令ZPOPMIN, ZPOPMAX, BZPOPMIN, BZPOPMAX。
  • 支持通過(guò)Brave模塊跟蹤Redis命令執(zhí)行。
  • 支持Redis Streams。
  • 支持異步的主從連接。
  • 支持異步連接池。
  • 新增命令最多執(zhí)行一次模式(禁止自動(dòng)重連)。
  • 全局命令超時(shí)設(shè)置(對(duì)異步和反應(yīng)式命令也有效)。
  • ......等等

注意一點(diǎn):Redis的版本至少需要2.6,當(dāng)然越高越好,API的兼容性比較強(qiáng)大。

只需要引入單個(gè)依賴就可以開始愉快地使用Lettuce:

  • Maven
<dependency><groupId>io.lettuce</groupId><artifactId>lettuce-core</artifactId><version>5.1.8.RELEASE</version> </dependency>
  • Gradle
dependencies {compile 'io.lettuce:lettuce-core:5.1.8.RELEASE' }

連接Redis

單機(jī)、哨兵、集群模式下連接Redis需要一個(gè)統(tǒng)一的標(biāo)準(zhǔn)去表示連接的細(xì)節(jié)信息,在Lettuce中這個(gè)統(tǒng)一的標(biāo)準(zhǔn)是RedisURI。可以通過(guò)三種方式構(gòu)造一個(gè)RedisURI實(shí)例:

  • 定制的字符串URI語(yǔ)法:
RedisURI uri = RedisURI.create("redis://localhost/");
  • 使用建造器(RedisURI.Builder):
RedisURI uri = RedisURI.builder().withHost("localhost").withPort(6379).build();
  • 直接通過(guò)構(gòu)造函數(shù)實(shí)例化:
RedisURI uri = new RedisURI("localhost", 6379, 60, TimeUnit.SECONDS);

定制的連接URI語(yǔ)法

  • 單機(jī)(前綴為redis://)
格式:redis://[password@]host[:port][/databaseNumber][?[timeout=timeout[d|h|m|s|ms|us|ns]] 完整:redis://mypassword@127.0.0.1:6379/0?timeout=10s 簡(jiǎn)單:redis://localhost
  • 單機(jī)并且使用SSL(前綴為rediss://) <== 注意后面多了個(gè)s
格式:rediss://[password@]host[:port][/databaseNumber][?[timeout=timeout[d|h|m|s|ms|us|ns]] 完整:rediss://mypassword@127.0.0.1:6379/0?timeout=10s 簡(jiǎn)單:rediss://localhost
  • 單機(jī)Unix Domain Sockets模式(前綴為redis-socket://)
格式:redis-socket://path[?[timeout=timeout[d|h|m|s|ms|us|ns]][&_database=database_]] 完整:redis-socket:///tmp/redis?timeout=10s&_database=0
  • 哨兵(前綴為redis-sentinel://)
格式:redis-sentinel://[password@]host[:port][,host2[:port2]][/databaseNumber][?[timeout=timeout[d|h|m|s|ms|us|ns]]#sentinelMasterId 完整:redis-sentinel://mypassword@127.0.0.1:6379,127.0.0.1:6380/0?timeout=10s#mymaster

超時(shí)時(shí)間單位:

  • d 天
  • h 小時(shí)
  • m 分鐘
  • s 秒鐘
  • ms 毫秒
  • us 微秒
  • ns 納秒

個(gè)人建議使用RedisURI提供的建造器,畢竟定制的URI雖然簡(jiǎn)潔,但是比較容易出現(xiàn)人為錯(cuò)誤。鑒于筆者沒有SSL和Unix Domain Socket的使用場(chǎng)景,下面不對(duì)這兩種連接方式進(jìn)行列舉。

基本使用

Lettuce使用的時(shí)候依賴于四個(gè)主要組件:

  • RedisURI:連接信息。
  • RedisClient:Redis客戶端,特殊地,集群連接有一個(gè)定制的RedisClusterClient。
  • Connection:Redis連接,主要是StatefulConnection或者StatefulRedisConnection的子類,連接的類型主要由連接的具體方式(單機(jī)、哨兵、集群、訂閱發(fā)布等等)選定,比較重要。
  • RedisCommands:Redis命令A(yù)PI接口,基本上覆蓋了Redis發(fā)行版本的所有命令,提供了同步(sync)、異步(async)、反應(yīng)式(reative)的調(diào)用方式,對(duì)于使用者而言,會(huì)經(jīng)常跟RedisCommands系列接口打交道。

一個(gè)基本使用例子如下:

@Test public void testSetGet() throws Exception {RedisURI redisUri = RedisURI.builder() // <1> 創(chuàng)建單機(jī)連接的連接信息.withHost("localhost").withPort(6379).withTimeout(Duration.of(10, ChronoUnit.SECONDS)).build();RedisClient redisClient = RedisClient.create(redisUri); // <2> 創(chuàng)建客戶端StatefulRedisConnection<String, String> connection = redisClient.connect(); // <3> 創(chuàng)建線程安全的連接RedisCommands<String, String> redisCommands = connection.sync(); // <4> 創(chuàng)建同步命令SetArgs setArgs = SetArgs.Builder.nx().ex(5);String result = redisCommands.set("name", "throwable", setArgs);Assertions.assertThat(result).isEqualToIgnoringCase("OK");result = redisCommands.get("name");Assertions.assertThat(result).isEqualTo("throwable");// ... 其他操作connection.close(); // <5> 關(guān)閉連接redisClient.shutdown(); // <6> 關(guān)閉客戶端 }

注意:

  • <5>:關(guān)閉連接一般在應(yīng)用程序停止之前操作,一個(gè)應(yīng)用程序中的一個(gè)Redis驅(qū)動(dòng)實(shí)例不需要太多的連接(一般情況下只需要一個(gè)連接實(shí)例就可以,如果有多個(gè)連接的需要可以考慮使用連接池,其實(shí)Redis目前處理命令的模塊是單線程,在客戶端多個(gè)連接多線程調(diào)用理論上沒有效果)。
  • <6>:關(guān)閉客戶端一般應(yīng)用程序停止之前操作,如果條件允許的話,基于后開先閉原則,客戶端關(guān)閉應(yīng)該在連接關(guān)閉之后操作。

API

Lettuce主要提供三種API:

  • 同步(sync):RedisCommands。
  • 異步(async):RedisAsyncCommands。
  • 反應(yīng)式(reactive):RedisReactiveCommands。

先準(zhǔn)備好一個(gè)單機(jī)Redis連接備用:

private static StatefulRedisConnection<String, String> CONNECTION; private static RedisClient CLIENT;@BeforeClass public static void beforeClass() {RedisURI redisUri = RedisURI.builder().withHost("localhost").withPort(6379).withTimeout(Duration.of(10, ChronoUnit.SECONDS)).build();CLIENT = RedisClient.create(redisUri);CONNECTION = CLIENT.connect(); }@AfterClass public static void afterClass() throws Exception {CONNECTION.close();CLIENT.shutdown(); }

Redis命令A(yù)PI的具體實(shí)現(xiàn)可以直接從StatefulRedisConnection實(shí)例獲取,見其接口定義:

public interface StatefulRedisConnection<K, V> extends StatefulConnection<K, V> {boolean isMulti();RedisCommands<K, V> sync();RedisAsyncCommands<K, V> async();RedisReactiveCommands<K, V> reactive(); }

值得注意的是,在不指定編碼解碼器RedisCodec的前提下,RedisClient創(chuàng)建的StatefulRedisConnection實(shí)例一般是泛型實(shí)例StatefulRedisConnection<String,String>,也就是所有命令A(yù)PI的KEY和VALUE都是String類型,這種使用方式能滿足大部分的使用場(chǎng)景。當(dāng)然,必要的時(shí)候可以定制編碼解碼器RedisCodec<K,V>。

同步API

先構(gòu)建RedisCommands實(shí)例:

private static RedisCommands<String, String> COMMAND;@BeforeClass public static void beforeClass() {COMMAND = CONNECTION.sync(); }

基本使用:

@Test public void testSyncPing() throws Exception {String pong = COMMAND.ping();Assertions.assertThat(pong).isEqualToIgnoringCase("PONG"); }@Test public void testSyncSetAndGet() throws Exception {SetArgs setArgs = SetArgs.Builder.nx().ex(5);COMMAND.set("name", "throwable", setArgs);String value = COMMAND.get("name");log.info("Get value: {}", value); }// Get value: throwable

同步API在所有命令調(diào)用之后會(huì)立即返回結(jié)果。如果熟悉Jedis的話,RedisCommands的用法其實(shí)和它相差不大。

異步API

先構(gòu)建RedisAsyncCommands實(shí)例:

private static RedisAsyncCommands<String, String> ASYNC_COMMAND;@BeforeClass public static void beforeClass() {ASYNC_COMMAND = CONNECTION.async(); }

基本使用:

@Test public void testAsyncPing() throws Exception {RedisFuture<String> redisFuture = ASYNC_COMMAND.ping();log.info("Ping result:{}", redisFuture.get()); } // Ping result:PONG

RedisAsyncCommands所有方法執(zhí)行返回結(jié)果都是RedisFuture實(shí)例,而RedisFuture接口的定義如下:

public interface RedisFuture<V> extends CompletionStage<V>, Future<V> {String getError();boolean await(long timeout, TimeUnit unit) throws InterruptedException; }

也就是,RedisFuture可以無(wú)縫使用Future或者JDK1.8中引入的CompletableFuture提供的方法。舉個(gè)例子:

@Test public void testAsyncSetAndGet1() throws Exception {SetArgs setArgs = SetArgs.Builder.nx().ex(5);RedisFuture<String> future = ASYNC_COMMAND.set("name", "throwable", setArgs);// CompletableFuture#thenAccept()future.thenAccept(value -> log.info("Set命令返回:{}", value));// Future#get()future.get(); } // Set命令返回:OK@Test public void testAsyncSetAndGet2() throws Exception {SetArgs setArgs = SetArgs.Builder.nx().ex(5);CompletableFuture<Void> result =(CompletableFuture<Void>) ASYNC_COMMAND.set("name", "throwable", setArgs).thenAcceptBoth(ASYNC_COMMAND.get("name"),(s, g) -> {log.info("Set命令返回:{}", s);log.info("Get命令返回:{}", g);});result.get(); } // Set命令返回:OK // Get命令返回:throwable

如果能熟練使用CompletableFuture和函數(shù)式編程技巧,可以組合多個(gè)RedisFuture完成一些列復(fù)雜的操作。

反應(yīng)式API

Lettuce引入的反應(yīng)式編程框架是Project Reactor,如果沒有反應(yīng)式編程經(jīng)驗(yàn)可以先自行了解一下Project Reactor。

構(gòu)建RedisReactiveCommands實(shí)例:

private static RedisReactiveCommands<String, String> REACTIVE_COMMAND;@BeforeClass public static void beforeClass() {REACTIVE_COMMAND = CONNECTION.reactive(); }

根據(jù)Project Reactor,RedisReactiveCommands的方法如果返回的結(jié)果只包含0或1個(gè)元素,那么返回值類型是Mono,如果返回的結(jié)果包含0到N(N大于0)個(gè)元素,那么返回值是Flux。舉個(gè)例子:

@Test public void testReactivePing() throws Exception {Mono<String> ping = REACTIVE_COMMAND.ping();ping.subscribe(v -> log.info("Ping result:{}", v));Thread.sleep(1000); } // Ping result:PONG@Test public void testReactiveSetAndGet() throws Exception {SetArgs setArgs = SetArgs.Builder.nx().ex(5);REACTIVE_COMMAND.set("name", "throwable", setArgs).block();REACTIVE_COMMAND.get("name").subscribe(value -> log.info("Get命令返回:{}", value));Thread.sleep(1000); } // Get命令返回:throwable@Test public void testReactiveSet() throws Exception {REACTIVE_COMMAND.sadd("food", "bread", "meat", "fish").block();Flux<String> flux = REACTIVE_COMMAND.smembers("food");flux.subscribe(log::info);REACTIVE_COMMAND.srem("food", "bread", "meat", "fish").block();Thread.sleep(1000); } // meat // bread // fish

舉個(gè)更加復(fù)雜的例子,包含了事務(wù)、函數(shù)轉(zhuǎn)換等:

@Test public void testReactiveFunctional() throws Exception {REACTIVE_COMMAND.multi().doOnSuccess(r -> {REACTIVE_COMMAND.set("counter", "1").doOnNext(log::info).subscribe();REACTIVE_COMMAND.incr("counter").doOnNext(c -> log.info(String.valueOf(c))).subscribe();}).flatMap(s -> REACTIVE_COMMAND.exec()).doOnNext(transactionResult -> log.info("Discarded:{}", transactionResult.wasDiscarded())).subscribe();Thread.sleep(1000); } // OK // 2 // Discarded:false

這個(gè)方法開啟一個(gè)事務(wù),先把counter設(shè)置為1,再將counter自增1。

發(fā)布和訂閱

非集群模式下的發(fā)布訂閱依賴于定制的連接StatefulRedisPubSubConnection,集群模式下的發(fā)布訂閱依賴于定制的連接StatefulRedisClusterPubSubConnection,兩者分別來(lái)源于RedisClient#connectPubSub()系列方法和RedisClusterClient#connectPubSub():

  • 非集群模式:
// 可能是單機(jī)、普通主從、哨兵等非集群模式的客戶端 RedisClient client = ... StatefulRedisPubSubConnection<String, String> connection = client.connectPubSub(); connection.addListener(new RedisPubSubListener<String, String>() { ... });// 同步命令 RedisPubSubCommands<String, String> sync = connection.sync(); sync.subscribe("channel");// 異步命令 RedisPubSubAsyncCommands<String, String> async = connection.async(); RedisFuture<Void> future = async.subscribe("channel");// 反應(yīng)式命令 RedisPubSubReactiveCommands<String, String> reactive = connection.reactive(); reactive.subscribe("channel").subscribe();reactive.observeChannels().doOnNext(patternMessage -> {...}).subscribe()
  • 集群模式:
// 使用方式其實(shí)和非集群模式基本一致 RedisClusterClient clusterClient = ... StatefulRedisClusterPubSubConnection<String, String> connection = clusterClient.connectPubSub(); connection.addListener(new RedisPubSubListener<String, String>() { ... }); RedisPubSubCommands<String, String> sync = connection.sync(); sync.subscribe("channel"); // ...

這里用單機(jī)同步命令的模式舉一個(gè)Redis鍵空間通知(Redis Keyspace Notifications)的例子:

@Test public void testSyncKeyspaceNotification() throws Exception {RedisURI redisUri = RedisURI.builder().withHost("localhost").withPort(6379)// 注意這里只能是0號(hào)庫(kù).withDatabase(0).withTimeout(Duration.of(10, ChronoUnit.SECONDS)).build();RedisClient redisClient = RedisClient.create(redisUri);StatefulRedisConnection<String, String> redisConnection = redisClient.connect();RedisCommands<String, String> redisCommands = redisConnection.sync();// 只接收鍵過(guò)期的事件redisCommands.configSet("notify-keyspace-events", "Ex");StatefulRedisPubSubConnection<String, String> connection = redisClient.connectPubSub();connection.addListener(new RedisPubSubAdapter<>() {@Overridepublic void psubscribed(String pattern, long count) {log.info("pattern:{},count:{}", pattern, count);}@Overridepublic void message(String pattern, String channel, String message) {log.info("pattern:{},channel:{},message:{}", pattern, channel, message);}});RedisPubSubCommands<String, String> commands = connection.sync();commands.psubscribe("__keyevent@0__:expired");redisCommands.setex("name", 2, "throwable");Thread.sleep(10000);redisConnection.close();connection.close();redisClient.shutdown(); } // pattern:__keyevent@0__:expired,count:1 // pattern:__keyevent@0__:expired,channel:__keyevent@0__:expired,message:name

實(shí)際上,在實(shí)現(xiàn)RedisPubSubListener的時(shí)候可以單獨(dú)抽離,盡量不要設(shè)計(jì)成匿名內(nèi)部類的形式。

事務(wù)和批量命令執(zhí)行

事務(wù)相關(guān)的命令就是WATCH、UNWATCH、EXEC、MULTI和DISCARD,在RedisCommands系列接口中有對(duì)應(yīng)的方法。舉個(gè)例子:

// 同步模式 @Test public void testSyncMulti() throws Exception {COMMAND.multi();COMMAND.setex("name-1", 2, "throwable");COMMAND.setex("name-2", 2, "doge");TransactionResult result = COMMAND.exec();int index = 0;for (Object r : result) {log.info("Result-{}:{}", index, r);index++;} } // Result-0:OK // Result-1:OK

Redis的Pipeline也就是管道機(jī)制可以理解為把多個(gè)命令打包在一次請(qǐng)求發(fā)送到Redis服務(wù)端,然后Redis服務(wù)端把所有的響應(yīng)結(jié)果打包好一次性返回,從而節(jié)省不必要的網(wǎng)絡(luò)資源(最主要是減少網(wǎng)絡(luò)請(qǐng)求次數(shù))。Redis對(duì)于Pipeline機(jī)制如何實(shí)現(xiàn)并沒有明確的規(guī)定,也沒有提供特殊的命令支持Pipeline機(jī)制。Jedis中底層采用BIO(阻塞IO)通訊,所以它的做法是客戶端緩存將要發(fā)送的命令,最后需要觸發(fā)然后同步發(fā)送一個(gè)巨大的命令列表包,再接收和解析一個(gè)巨大的響應(yīng)列表包。Pipeline在Lettuce中對(duì)使用者是透明的,由于底層的通訊框架是Netty,所以網(wǎng)絡(luò)通訊層面的優(yōu)化Lettuce不需要過(guò)多干預(yù),換言之可以這樣理解:Netty幫Lettuce從底層實(shí)現(xiàn)了Redis的Pipeline機(jī)制。但是,Lettuce的異步API也提供了手動(dòng)Flush的方法:

@Test public void testAsyncManualFlush() {// 取消自動(dòng)flushASYNC_COMMAND.setAutoFlushCommands(false);List<RedisFuture<?>> redisFutures = Lists.newArrayList();int count = 5000;for (int i = 0; i < count; i++) {String key = "key-" + (i + 1);String value = "value-" + (i + 1);redisFutures.add(ASYNC_COMMAND.set(key, value));redisFutures.add(ASYNC_COMMAND.expire(key, 2));}long start = System.currentTimeMillis();ASYNC_COMMAND.flushCommands();boolean result = LettuceFutures.awaitAll(10, TimeUnit.SECONDS, redisFutures.toArray(new RedisFuture[0]));Assertions.assertThat(result).isTrue();log.info("Lettuce cost:{} ms", System.currentTimeMillis() - start); } // Lettuce cost:1302 ms

上面只是從文檔看到的一些理論術(shù)語(yǔ),但是現(xiàn)實(shí)是骨感的,對(duì)比了下Jedis的Pipeline提供的方法,發(fā)現(xiàn)了Jedis的Pipeline執(zhí)行耗時(shí)比較低:

@Test public void testJedisPipeline() throws Exception {Jedis jedis = new Jedis();Pipeline pipeline = jedis.pipelined();int count = 5000;for (int i = 0; i < count; i++) {String key = "key-" + (i + 1);String value = "value-" + (i + 1);pipeline.set(key, value);pipeline.expire(key, 2);}long start = System.currentTimeMillis();pipeline.syncAndReturnAll();log.info("Jedis cost:{} ms", System.currentTimeMillis() - start); } // Jedis cost:9 ms

個(gè)人猜測(cè)Lettuce可能底層并非合并所有命令一次發(fā)送(甚至可能是單條發(fā)送),具體可能需要抓包才能定位。依此來(lái)看,如果真的有大量執(zhí)行Redis命令的場(chǎng)景,不妨可以使用Jedis的Pipeline。

注意:由上面的測(cè)試推斷RedisTemplate的executePipelined()方法是假的Pipeline執(zhí)行方法,使用RedisTemplate的時(shí)候請(qǐng)務(wù)必注意這一點(diǎn)。

Lua腳本執(zhí)行

Lettuce中執(zhí)行Redis的Lua命令的同步接口如下:

public interface RedisScriptingCommands<K, V> {<T> T eval(String var1, ScriptOutputType var2, K... var3);<T> T eval(String var1, ScriptOutputType var2, K[] var3, V... var4);<T> T evalsha(String var1, ScriptOutputType var2, K... var3);<T> T evalsha(String var1, ScriptOutputType var2, K[] var3, V... var4);List<Boolean> scriptExists(String... var1);String scriptFlush();String scriptKill();String scriptLoad(V var1);String digest(V var1); }

異步和反應(yīng)式的接口方法定義差不多,不同的地方就是返回值類型,一般我們常用的是eval()、evalsha()和scriptLoad()方法。舉個(gè)簡(jiǎn)單的例子:

private static RedisCommands<String, String> COMMANDS; private static String RAW_LUA = "local key = KEYS[1]\n" +"local value = ARGV[1]\n" +"local timeout = ARGV[2]\n" +"redis.call('SETEX', key, tonumber(timeout), value)\n" +"local result = redis.call('GET', key)\n" +"return result;"; private static AtomicReference<String> LUA_SHA = new AtomicReference<>();@Test public void testLua() throws Exception {LUA_SHA.compareAndSet(null, COMMANDS.scriptLoad(RAW_LUA));String[] keys = new String[]{"name"};String[] args = new String[]{"throwable", "5000"};String result = COMMANDS.evalsha(LUA_SHA.get(), ScriptOutputType.VALUE, keys, args);log.info("Get value:{}", result); } // Get value:throwable

高可用和分片

為了Redis的高可用,一般會(huì)采用普通主從(Master/Replica,這里筆者稱為普通主從模式,也就是僅僅做了主從復(fù)制,故障需要手動(dòng)切換)、哨兵和集群。普通主從模式可以獨(dú)立運(yùn)行,也可以配合哨兵運(yùn)行,只是哨兵提供自動(dòng)故障轉(zhuǎn)移和主節(jié)點(diǎn)提升功能。普通主從和哨兵都可以使用MasterSlave,通過(guò)入?yún)≧edisClient、編碼解碼器以及一個(gè)或者多個(gè)RedisURI獲取對(duì)應(yīng)的Connection實(shí)例。

這里注意一點(diǎn),MasterSlave中提供的方法如果只要求傳入一個(gè)RedisURI實(shí)例,那么Lettuce會(huì)進(jìn)行拓?fù)浒l(fā)現(xiàn)機(jī)制,自動(dòng)獲取Redis主從節(jié)點(diǎn)信息;如果要求傳入一個(gè)RedisURI集合,那么對(duì)于普通主從模式來(lái)說(shuō)所有節(jié)點(diǎn)信息是靜態(tài)的,不會(huì)進(jìn)行發(fā)現(xiàn)和更新。

拓?fù)浒l(fā)現(xiàn)的規(guī)則如下:

  • 對(duì)于普通主從(Master/Replica)模式,不需要感知RedisURI指向從節(jié)點(diǎn)還是主節(jié)點(diǎn),只會(huì)進(jìn)行一次性的拓?fù)洳檎宜泄?jié)點(diǎn)信息,此后節(jié)點(diǎn)信息會(huì)保存在靜態(tài)緩存中,不會(huì)更新。
  • 對(duì)于哨兵模式,會(huì)訂閱所有哨兵實(shí)例并偵聽訂閱/發(fā)布消息以觸發(fā)拓?fù)渌⑿聶C(jī)制,更新緩存的節(jié)點(diǎn)信息,也就是哨兵天然就是動(dòng)態(tài)發(fā)現(xiàn)節(jié)點(diǎn)信息,不支持靜態(tài)配置。

拓?fù)浒l(fā)現(xiàn)機(jī)制的提供API為TopologyProvider,需要了解其原理的可以參考具體的實(shí)現(xiàn)。

對(duì)于集群(Cluster)模式,Lettuce提供了一套獨(dú)立的API。

另外,如果Lettuce連接面向的是非單個(gè)Redis節(jié)點(diǎn),連接實(shí)例提供了數(shù)據(jù)讀取節(jié)點(diǎn)偏好(ReadFrom)設(shè)置,可選值有:

  • MASTER:只從Master節(jié)點(diǎn)中讀取。
  • MASTER_PREFERRED:優(yōu)先從Master節(jié)點(diǎn)中讀取。
  • SLAVE_PREFERRED:優(yōu)先從Slavor節(jié)點(diǎn)中讀取。
  • SLAVE:只從Slavor節(jié)點(diǎn)中讀取。
  • NEAREST:使用最近一次連接的Redis實(shí)例讀取。

普通主從模式

假設(shè)現(xiàn)在有三個(gè)Redis服務(wù)形成樹狀主從關(guān)系如下:

  • 節(jié)點(diǎn)一:localhost:6379,角色為Master。
  • 節(jié)點(diǎn)二:localhost:6380,角色為Slavor,節(jié)點(diǎn)一的從節(jié)點(diǎn)。
  • 節(jié)點(diǎn)三:localhost:6381,角色為Slavor,節(jié)點(diǎn)二的從節(jié)點(diǎn)。

首次動(dòng)態(tài)節(jié)點(diǎn)發(fā)現(xiàn)主從模式的節(jié)點(diǎn)信息需要如下構(gòu)建連接:

@Test public void testDynamicReplica() throws Exception {// 這里只需要配置一個(gè)節(jié)點(diǎn)的連接信息,不一定需要是主節(jié)點(diǎn)的信息,從節(jié)點(diǎn)也可以RedisURI uri = RedisURI.builder().withHost("localhost").withPort(6379).build();RedisClient redisClient = RedisClient.create(uri);StatefulRedisMasterSlaveConnection<String, String> connection = MasterSlave.connect(redisClient, new Utf8StringCodec(), uri);// 只從從節(jié)點(diǎn)讀取數(shù)據(jù)connection.setReadFrom(ReadFrom.SLAVE);// 執(zhí)行其他Redis命令connection.close();redisClient.shutdown(); }

如果需要指定靜態(tài)的Redis主從節(jié)點(diǎn)連接屬性,那么可以這樣構(gòu)建連接:

@Test public void testStaticReplica() throws Exception {List<RedisURI> uris = new ArrayList<>();RedisURI uri1 = RedisURI.builder().withHost("localhost").withPort(6379).build();RedisURI uri2 = RedisURI.builder().withHost("localhost").withPort(6380).build();RedisURI uri3 = RedisURI.builder().withHost("localhost").withPort(6381).build();uris.add(uri1);uris.add(uri2);uris.add(uri3);RedisClient redisClient = RedisClient.create();StatefulRedisMasterSlaveConnection<String, String> connection = MasterSlave.connect(redisClient,new Utf8StringCodec(), uris);// 只從主節(jié)點(diǎn)讀取數(shù)據(jù)connection.setReadFrom(ReadFrom.MASTER);// 執(zhí)行其他Redis命令connection.close();redisClient.shutdown(); }

哨兵模式

由于Lettuce自身提供了哨兵的拓?fù)浒l(fā)現(xiàn)機(jī)制,所以只需要隨便配置一個(gè)哨兵節(jié)點(diǎn)的RedisURI實(shí)例即可:

@Test public void testDynamicSentinel() throws Exception {RedisURI redisUri = RedisURI.builder().withPassword("你的密碼").withSentinel("localhost", 26379).withSentinelMasterId("哨兵Master的ID").build();RedisClient redisClient = RedisClient.create();StatefulRedisMasterSlaveConnection<String, String> connection = MasterSlave.connect(redisClient, new Utf8StringCodec(), redisUri);// 只允許從從節(jié)點(diǎn)讀取數(shù)據(jù)connection.setReadFrom(ReadFrom.SLAVE);RedisCommands<String, String> command = connection.sync();SetArgs setArgs = SetArgs.Builder.nx().ex(5);command.set("name", "throwable", setArgs);String value = command.get("name");log.info("Get value:{}", value); } // Get value:throwable

集群模式

鑒于筆者對(duì)Redis集群模式并不熟悉,Cluster模式下的API使用本身就有比較多的限制,所以這里只簡(jiǎn)單介紹一下怎么用。先說(shuō)幾個(gè)特性:

下面的API提供跨槽位(Slot)調(diào)用的功能

  • RedisAdvancedClusterCommands。
  • RedisAdvancedClusterAsyncCommands。
  • RedisAdvancedClusterReactiveCommands。

靜態(tài)節(jié)點(diǎn)選擇功能:

  • masters:選擇所有主節(jié)點(diǎn)執(zhí)行命令。
  • slaves:選擇所有從節(jié)點(diǎn)執(zhí)行命令,其實(shí)就是只讀模式。
  • all nodes:命令可以在所有節(jié)點(diǎn)執(zhí)行。

集群拓?fù)湟晥D動(dòng)態(tài)更新功能:

  • 手動(dòng)更新,主動(dòng)調(diào)用RedisClusterClient#reloadPartitions()。
  • 后臺(tái)定時(shí)更新。
  • 自適應(yīng)更新,基于連接斷開和MOVED/ASK命令重定向自動(dòng)更新。

Redis集群搭建詳細(xì)過(guò)程可以參考官方文檔,假設(shè)已經(jīng)搭建好集群如下(192.168.56.200是筆者的虛擬機(jī)Host):

  • 192.168.56.200:7001 => 主節(jié)點(diǎn),槽位0-5460。
  • 192.168.56.200:7002 => 主節(jié)點(diǎn),槽位5461-10922。
  • 192.168.56.200:7003 => 主節(jié)點(diǎn),槽位10923-16383。
  • 192.168.56.200:7004 => 7001的從節(jié)點(diǎn)。
  • 192.168.56.200:7005 => 7002的從節(jié)點(diǎn)。
  • 192.168.56.200:7006 => 7003的從節(jié)點(diǎn)。

簡(jiǎn)單的集群連接和使用方式如下:

@Test public void testSyncCluster(){RedisURI uri = RedisURI.builder().withHost("192.168.56.200").build();RedisClusterClient redisClusterClient = RedisClusterClient.create(uri);StatefulRedisClusterConnection<String, String> connection = redisClusterClient.connect();RedisAdvancedClusterCommands<String, String> commands = connection.sync();commands.setex("name",10, "throwable");String value = commands.get("name");log.info("Get value:{}", value); } // Get value:throwable

節(jié)點(diǎn)選擇:

@Test public void testSyncNodeSelection() {RedisURI uri = RedisURI.builder().withHost("192.168.56.200").withPort(7001).build();RedisClusterClient redisClusterClient = RedisClusterClient.create(uri);StatefulRedisClusterConnection<String, String> connection = redisClusterClient.connect();RedisAdvancedClusterCommands<String, String> commands = connection.sync(); // commands.all(); // 所有節(jié)點(diǎn) // commands.masters(); // 主節(jié)點(diǎn)// 從節(jié)點(diǎn)只讀NodeSelection<String, String> replicas = commands.slaves();NodeSelectionCommands<String, String> nodeSelectionCommands = replicas.commands();// 這里只是演示,一般應(yīng)該禁用keys *命令Executions<List<String>> keys = nodeSelectionCommands.keys("*");keys.forEach(key -> log.info("key: {}", key));connection.close();redisClusterClient.shutdown(); }

定時(shí)更新集群拓?fù)湟晥D(每隔十分鐘更新一次,這個(gè)時(shí)間自行考量,不能太頻繁):

@Test public void testPeriodicClusterTopology() throws Exception {RedisURI uri = RedisURI.builder().withHost("192.168.56.200").withPort(7001).build();RedisClusterClient redisClusterClient = RedisClusterClient.create(uri);ClusterTopologyRefreshOptions options = ClusterTopologyRefreshOptions.builder().enablePeriodicRefresh(Duration.of(10, ChronoUnit.MINUTES)).build();redisClusterClient.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(options).build());StatefulRedisClusterConnection<String, String> connection = redisClusterClient.connect();RedisAdvancedClusterCommands<String, String> commands = connection.sync();commands.setex("name", 10, "throwable");String value = commands.get("name");log.info("Get value:{}", value);Thread.sleep(Integer.MAX_VALUE);connection.close();redisClusterClient.shutdown(); }

自適應(yīng)更新集群拓?fù)湟晥D:

@Test public void testAdaptiveClusterTopology() throws Exception {RedisURI uri = RedisURI.builder().withHost("192.168.56.200").withPort(7001).build();RedisClusterClient redisClusterClient = RedisClusterClient.create(uri);ClusterTopologyRefreshOptions options = ClusterTopologyRefreshOptions.builder().enableAdaptiveRefreshTrigger(ClusterTopologyRefreshOptions.RefreshTrigger.MOVED_REDIRECT,ClusterTopologyRefreshOptions.RefreshTrigger.PERSISTENT_RECONNECTS).adaptiveRefreshTriggersTimeout(Duration.of(30, ChronoUnit.SECONDS)).build();redisClusterClient.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(options).build());StatefulRedisClusterConnection<String, String> connection = redisClusterClient.connect();RedisAdvancedClusterCommands<String, String> commands = connection.sync();commands.setex("name", 10, "throwable");String value = commands.get("name");log.info("Get value:{}", value);Thread.sleep(Integer.MAX_VALUE);connection.close();redisClusterClient.shutdown(); }

動(dòng)態(tài)命令和自定義命令

自定義命令是Redis命令有限集,不過(guò)可以更細(xì)粒度指定KEY、ARGV、命令類型、編碼解碼器和返回值類型,依賴于dispatch()方法:

// 自定義實(shí)現(xiàn)PING方法 @Test public void testCustomPing() throws Exception {RedisURI redisUri = RedisURI.builder().withHost("localhost").withPort(6379).withTimeout(Duration.of(10, ChronoUnit.SECONDS)).build();RedisClient redisClient = RedisClient.create(redisUri);StatefulRedisConnection<String, String> connect = redisClient.connect();RedisCommands<String, String> sync = connect.sync();RedisCodec<String, String> codec = StringCodec.UTF8;String result = sync.dispatch(CommandType.PING, new StatusOutput<>(codec));log.info("PING:{}", result);connect.close();redisClient.shutdown(); } // PING:PONG// 自定義實(shí)現(xiàn)Set方法 @Test public void testCustomSet() throws Exception {RedisURI redisUri = RedisURI.builder().withHost("localhost").withPort(6379).withTimeout(Duration.of(10, ChronoUnit.SECONDS)).build();RedisClient redisClient = RedisClient.create(redisUri);StatefulRedisConnection<String, String> connect = redisClient.connect();RedisCommands<String, String> sync = connect.sync();RedisCodec<String, String> codec = StringCodec.UTF8;sync.dispatch(CommandType.SETEX, new StatusOutput<>(codec),new CommandArgs<>(codec).addKey("name").add(5).addValue("throwable"));String result = sync.get("name");log.info("Get value:{}", result);connect.close();redisClient.shutdown(); } // Get value:throwable

動(dòng)態(tài)命令是基于Redis命令有限集,并且通過(guò)注解和動(dòng)態(tài)代理完成一些復(fù)雜命令組合的實(shí)現(xiàn)。主要注解在io.lettuce.core.dynamic.annotation包路徑下。簡(jiǎn)單舉個(gè)例子:

public interface CustomCommand extends Commands {// SET [key] [value]@Command("SET ?0 ?1")String setKey(String key, String value);// SET [key] [value]@Command("SET :key :value")String setKeyNamed(@Param("key") String key, @Param("value") String value);// MGET [key1] [key2]@Command("MGET ?0 ?1")List<String> mGet(String key1, String key2);/*** 方法名作為命令*/@CommandNaming(strategy = CommandNaming.Strategy.METHOD_NAME)String mSet(String key1, String value1, String key2, String value2); }@Test public void testCustomDynamicSet() throws Exception {RedisURI redisUri = RedisURI.builder().withHost("localhost").withPort(6379).withTimeout(Duration.of(10, ChronoUnit.SECONDS)).build();RedisClient redisClient = RedisClient.create(redisUri);StatefulRedisConnection<String, String> connect = redisClient.connect();RedisCommandFactory commandFactory = new RedisCommandFactory(connect);CustomCommand commands = commandFactory.getCommands(CustomCommand.class);commands.setKey("name", "throwable");commands.setKeyNamed("throwable", "doge");log.info("MGET ===> " + commands.mGet("name", "throwable"));commands.mSet("key1", "value1","key2", "value2");log.info("MGET ===> " + commands.mGet("key1", "key2"));connect.close();redisClient.shutdown(); } // MGET ===> [throwable, doge] // MGET ===> [value1, value2]

高階特性

Lettuce有很多高階使用特性,這里只列舉個(gè)人認(rèn)為常用的兩點(diǎn):

  • 配置客戶端資源。
  • 使用連接池。

更多其他特性可以自行參看官方文檔。

配置客戶端資源

客戶端資源的設(shè)置與Lettuce的性能、并發(fā)和事件處理相關(guān)。線程池或者線程組相關(guān)配置占據(jù)客戶端資源配置的大部分(EventLoopGroups和EventExecutorGroup),這些線程池或者線程組是連接程序的基礎(chǔ)組件。一般情況下,客戶端資源應(yīng)該在多個(gè)Redis客戶端之間共享,并且在不再使用的時(shí)候需要自行關(guān)閉。筆者認(rèn)為,客戶端資源是面向Netty的。注意除非特別熟悉或者花長(zhǎng)時(shí)間去測(cè)試調(diào)整下面提到的參數(shù),否則在沒有經(jīng)驗(yàn)的前提下憑直覺修改默認(rèn)值,有可能會(huì)踩坑

客戶端資源接口是ClientResources,實(shí)現(xiàn)類是DefaultClientResources。

構(gòu)建DefaultClientResources實(shí)例:

// 默認(rèn) ClientResources resources = DefaultClientResources.create();// 建造器 ClientResources resources = DefaultClientResources.builder().ioThreadPoolSize(4).computationThreadPoolSize(4).build()

使用:

ClientResources resources = DefaultClientResources.create(); // 非集群 RedisClient client = RedisClient.create(resources, uri); // 集群 RedisClusterClient clusterClient = RedisClusterClient.create(resources, uris); // ...... client.shutdown(); clusterClient.shutdown(); // 關(guān)閉資源 resources.shutdown();

客戶端資源基本配置:

屬性描述默認(rèn)值
ioThreadPoolSizeI/O線程數(shù)Runtime.getRuntime().availableProcessors()
computationThreadPoolSize任務(wù)線程數(shù)Runtime.getRuntime().availableProcessors()

客戶端資源高級(jí)配置:

屬性描述默認(rèn)值
eventLoopGroupProviderEventLoopGroup提供商-
eventExecutorGroupProviderEventExecutorGroup提供商-
eventBus事件總線DefaultEventBus
commandLatencyCollectorOptions命令延時(shí)收集器配置DefaultCommandLatencyCollectorOptions
commandLatencyCollector命令延時(shí)收集器DefaultCommandLatencyCollector
commandLatencyPublisherOptions命令延時(shí)發(fā)布器配置DefaultEventPublisherOptions
dnsResolverDNS處理器JDK或者Netty提供
reconnectDelay重連延時(shí)配置Delay.exponential()
nettyCustomizerNetty自定義配置器-
tracing軌跡記錄器-

非集群客戶端RedisClient的屬性配置:

Redis非集群客戶端RedisClient本身提供了配置屬性方法:

RedisClient client = RedisClient.create(uri); client.setOptions(ClientOptions.builder().autoReconnect(false).pingBeforeActivateConnection(true).build());

非集群客戶端的配置屬性列表:

屬性描述默認(rèn)值
pingBeforeActivateConnection連接激活之前是否執(zhí)行PING命令false
autoReconnect是否自動(dòng)重連true
cancelCommandsOnReconnectFailure重連失敗是否拒絕命令執(zhí)行false
suspendReconnectOnProtocolFailure底層協(xié)議失敗是否掛起重連操作false
requestQueueSize請(qǐng)求隊(duì)列容量2147483647(Integer#MAX_VALUE)
disconnectedBehavior失去連接時(shí)候的行為DEFAULT
sslOptionsSSL配置-
socketOptionsSocket配置10 seconds Connection-Timeout, no keep-alive, no TCP noDelay
timeoutOptions超時(shí)配置-
publishOnScheduler發(fā)布反應(yīng)式信號(hào)數(shù)據(jù)的調(diào)度器使用I/O線程

集群客戶端屬性配置:

Redis集群客戶端RedisClusterClient本身提供了配置屬性方法:

RedisClusterClient client = RedisClusterClient.create(uri); ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder().enablePeriodicRefresh(refreshPeriod(10, TimeUnit.MINUTES)).enableAllAdaptiveRefreshTriggers().build();client.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(topologyRefreshOptions).build());

集群客戶端的配置屬性列表:

屬性描述默認(rèn)值
enablePeriodicRefresh是否允許周期性更新集群拓?fù)湟晥Dfalse
refreshPeriod更新集群拓?fù)湟晥D周期60秒
enableAdaptiveRefreshTrigger設(shè)置自適應(yīng)更新集群拓?fù)湟晥D觸發(fā)器RefreshTrigger-
adaptiveRefreshTriggersTimeout自適應(yīng)更新集群拓?fù)湟晥D觸發(fā)器超時(shí)設(shè)置30秒
refreshTriggersReconnectAttempts自適應(yīng)更新集群拓?fù)湟晥D觸發(fā)重連次數(shù)5
dynamicRefreshSources是否允許動(dòng)態(tài)刷新拓?fù)滟Y源true
closeStaleConnections是否允許關(guān)閉陳舊的連接true
maxRedirects集群重定向次數(shù)上限5
validateClusterNodeMembership是否校驗(yàn)集群節(jié)點(diǎn)的成員關(guān)系true

使用連接池

引入連接池依賴commons-pool2:

<dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId><version>2.7.0</version> </dependency

基本使用如下:

@Test public void testUseConnectionPool() throws Exception {RedisURI redisUri = RedisURI.builder().withHost("localhost").withPort(6379).withTimeout(Duration.of(10, ChronoUnit.SECONDS)).build();RedisClient redisClient = RedisClient.create(redisUri);GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();GenericObjectPool<StatefulRedisConnection<String, String>> pool= ConnectionPoolSupport.createGenericObjectPool(redisClient::connect, poolConfig);try (StatefulRedisConnection<String, String> connection = pool.borrowObject()) {RedisCommands<String, String> command = connection.sync();SetArgs setArgs = SetArgs.Builder.nx().ex(5);command.set("name", "throwable", setArgs);String n = command.get("name");log.info("Get value:{}", n);}pool.close();redisClient.shutdown(); }

其中,同步連接的池化支持需要用ConnectionPoolSupport,異步連接的池化支持需要用AsyncConnectionPoolSupport(Lettuce5.1之后才支持)。

幾個(gè)常見的漸進(jìn)式刪除例子

漸進(jìn)式刪除Hash中的域-屬性:

@Test public void testDelBigHashKey() throws Exception {// SCAN參數(shù)ScanArgs scanArgs = ScanArgs.Builder.limit(2);// TEMP游標(biāo)ScanCursor cursor = ScanCursor.INITIAL;// 目標(biāo)KEYString key = "BIG_HASH_KEY";prepareHashTestData(key);log.info("開始漸進(jìn)式刪除Hash的元素...");int counter = 0;do {MapScanCursor<String, String> result = COMMAND.hscan(key, cursor, scanArgs);// 重置TEMP游標(biāo)cursor = ScanCursor.of(result.getCursor());cursor.setFinished(result.isFinished());Collection<String> fields = result.getMap().values();if (!fields.isEmpty()) {COMMAND.hdel(key, fields.toArray(new String[0]));}counter++;} while (!(ScanCursor.FINISHED.getCursor().equals(cursor.getCursor()) && ScanCursor.FINISHED.isFinished() == cursor.isFinished()));log.info("漸進(jìn)式刪除Hash的元素完畢,迭代次數(shù):{} ...", counter); }private void prepareHashTestData(String key) throws Exception {COMMAND.hset(key, "1", "1");COMMAND.hset(key, "2", "2");COMMAND.hset(key, "3", "3");COMMAND.hset(key, "4", "4");COMMAND.hset(key, "5", "5"); }

漸進(jìn)式刪除集合中的元素:

@Test public void testDelBigSetKey() throws Exception {String key = "BIG_SET_KEY";prepareSetTestData(key);// SCAN參數(shù)ScanArgs scanArgs = ScanArgs.Builder.limit(2);// TEMP游標(biāo)ScanCursor cursor = ScanCursor.INITIAL;log.info("開始漸進(jìn)式刪除Set的元素...");int counter = 0;do {ValueScanCursor<String> result = COMMAND.sscan(key, cursor, scanArgs);// 重置TEMP游標(biāo)cursor = ScanCursor.of(result.getCursor());cursor.setFinished(result.isFinished());List<String> values = result.getValues();if (!values.isEmpty()) {COMMAND.srem(key, values.toArray(new String[0]));}counter++;} while (!(ScanCursor.FINISHED.getCursor().equals(cursor.getCursor()) && ScanCursor.FINISHED.isFinished() == cursor.isFinished()));log.info("漸進(jìn)式刪除Set的元素完畢,迭代次數(shù):{} ...", counter); }private void prepareSetTestData(String key) throws Exception {COMMAND.sadd(key, "1", "2", "3", "4", "5"); }

漸進(jìn)式刪除有序集合中的元素:

@Test public void testDelBigZSetKey() throws Exception {// SCAN參數(shù)ScanArgs scanArgs = ScanArgs.Builder.limit(2);// TEMP游標(biāo)ScanCursor cursor = ScanCursor.INITIAL;// 目標(biāo)KEYString key = "BIG_ZSET_KEY";prepareZSetTestData(key);log.info("開始漸進(jìn)式刪除ZSet的元素...");int counter = 0;do {ScoredValueScanCursor<String> result = COMMAND.zscan(key, cursor, scanArgs);// 重置TEMP游標(biāo)cursor = ScanCursor.of(result.getCursor());cursor.setFinished(result.isFinished());List<ScoredValue<String>> scoredValues = result.getValues();if (!scoredValues.isEmpty()) {COMMAND.zrem(key, scoredValues.stream().map(ScoredValue<String>::getValue).toArray(String[]::new));}counter++;} while (!(ScanCursor.FINISHED.getCursor().equals(cursor.getCursor()) && ScanCursor.FINISHED.isFinished() == cursor.isFinished()));log.info("漸進(jìn)式刪除ZSet的元素完畢,迭代次數(shù):{} ...", counter); }private void prepareZSetTestData(String key) throws Exception {COMMAND.zadd(key, 0, "1");COMMAND.zadd(key, 0, "2");COMMAND.zadd(key, 0, "3");COMMAND.zadd(key, 0, "4");COMMAND.zadd(key, 0, "5"); }

在SpringBoot中使用Lettuce

個(gè)人認(rèn)為,spring-data-redis中的API封裝并不是很優(yōu)秀,用起來(lái)比較重,不夠靈活,這里結(jié)合前面的例子和代碼,在SpringBoot腳手架項(xiàng)目中配置和整合Lettuce。先引入依賴:

<dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>2.1.8.RELEASE</version><type>pom</type><scope>import</scope></dependency></dependencies> </dependencyManagement> <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>io.lettuce</groupId><artifactId>lettuce-core</artifactId><version>5.1.8.RELEASE</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.10</version><scope>provided</scope></dependency> </dependencies>

一般情況下,每個(gè)應(yīng)用應(yīng)該使用單個(gè)Redis客戶端實(shí)例和單個(gè)連接實(shí)例,這里設(shè)計(jì)一個(gè)腳手架,適配單機(jī)、普通主從、哨兵和集群四種使用場(chǎng)景。對(duì)于客戶端資源,采用默認(rèn)的實(shí)現(xiàn)即可。對(duì)于Redis的連接屬性,比較主要的有Host、Port和Password,其他可以暫時(shí)忽略。基于約定大于配置的原則,先定制一系列屬性配置類(其實(shí)有些配置是可以完全共用,但是考慮到要清晰描述類之間的關(guān)系,這里拆分多個(gè)配置屬性類和多個(gè)配置方法):

@Data @ConfigurationProperties(prefix = "lettuce") public class LettuceProperties {private LettuceSingleProperties single;private LettuceReplicaProperties replica;private LettuceSentinelProperties sentinel;private LettuceClusterProperties cluster;}@Data public class LettuceSingleProperties {private String host;private Integer port;private String password; }@EqualsAndHashCode(callSuper = true) @Data public class LettuceReplicaProperties extends LettuceSingleProperties {}@EqualsAndHashCode(callSuper = true) @Data public class LettuceSentinelProperties extends LettuceSingleProperties {private String masterId; }@EqualsAndHashCode(callSuper = true) @Data public class LettuceClusterProperties extends LettuceSingleProperties {}

配置類如下,主要使用@ConditionalOnProperty做隔離,一般情況下,很少有人會(huì)在一個(gè)應(yīng)用使用一種以上的Redis連接場(chǎng)景:

@RequiredArgsConstructor @Configuration @ConditionalOnClass(name = "io.lettuce.core.RedisURI") @EnableConfigurationProperties(value = LettuceProperties.class) public class LettuceAutoConfiguration {private final LettuceProperties lettuceProperties;@Bean(destroyMethod = "shutdown")public ClientResources clientResources() {return DefaultClientResources.create();}@Bean@ConditionalOnProperty(name = "lettuce.single.host")public RedisURI singleRedisUri() {LettuceSingleProperties singleProperties = lettuceProperties.getSingle();return RedisURI.builder().withHost(singleProperties.getHost()).withPort(singleProperties.getPort()).withPassword(singleProperties.getPassword()).build();}@Bean(destroyMethod = "shutdown")@ConditionalOnProperty(name = "lettuce.single.host")public RedisClient singleRedisClient(ClientResources clientResources, @Qualifier("singleRedisUri") RedisURI redisUri) {return RedisClient.create(clientResources, redisUri);}@Bean(destroyMethod = "close")@ConditionalOnProperty(name = "lettuce.single.host")public StatefulRedisConnection<String, String> singleRedisConnection(@Qualifier("singleRedisClient") RedisClient singleRedisClient) {return singleRedisClient.connect();}@Bean@ConditionalOnProperty(name = "lettuce.replica.host")public RedisURI replicaRedisUri() {LettuceReplicaProperties replicaProperties = lettuceProperties.getReplica();return RedisURI.builder().withHost(replicaProperties.getHost()).withPort(replicaProperties.getPort()).withPassword(replicaProperties.getPassword()).build();}@Bean(destroyMethod = "shutdown")@ConditionalOnProperty(name = "lettuce.replica.host")public RedisClient replicaRedisClient(ClientResources clientResources, @Qualifier("replicaRedisUri") RedisURI redisUri) {return RedisClient.create(clientResources, redisUri);}@Bean(destroyMethod = "close")@ConditionalOnProperty(name = "lettuce.replica.host")public StatefulRedisMasterSlaveConnection<String, String> replicaRedisConnection(@Qualifier("replicaRedisClient") RedisClient replicaRedisClient,@Qualifier("replicaRedisUri") RedisURI redisUri) {return MasterSlave.connect(replicaRedisClient, new Utf8StringCodec(), redisUri);}@Bean@ConditionalOnProperty(name = "lettuce.sentinel.host")public RedisURI sentinelRedisUri() {LettuceSentinelProperties sentinelProperties = lettuceProperties.getSentinel();return RedisURI.builder().withPassword(sentinelProperties.getPassword()).withSentinel(sentinelProperties.getHost(), sentinelProperties.getPort()).withSentinelMasterId(sentinelProperties.getMasterId()).build();}@Bean(destroyMethod = "shutdown")@ConditionalOnProperty(name = "lettuce.sentinel.host")public RedisClient sentinelRedisClient(ClientResources clientResources, @Qualifier("sentinelRedisUri") RedisURI redisUri) {return RedisClient.create(clientResources, redisUri);}@Bean(destroyMethod = "close")@ConditionalOnProperty(name = "lettuce.sentinel.host")public StatefulRedisMasterSlaveConnection<String, String> sentinelRedisConnection(@Qualifier("sentinelRedisClient") RedisClient sentinelRedisClient,@Qualifier("sentinelRedisUri") RedisURI redisUri) {return MasterSlave.connect(sentinelRedisClient, new Utf8StringCodec(), redisUri);}@Bean@ConditionalOnProperty(name = "lettuce.cluster.host")public RedisURI clusterRedisUri() {LettuceClusterProperties clusterProperties = lettuceProperties.getCluster();return RedisURI.builder().withHost(clusterProperties.getHost()).withPort(clusterProperties.getPort()).withPassword(clusterProperties.getPassword()).build();}@Bean(destroyMethod = "shutdown")@ConditionalOnProperty(name = "lettuce.cluster.host")public RedisClusterClient redisClusterClient(ClientResources clientResources, @Qualifier("clusterRedisUri") RedisURI redisUri) {return RedisClusterClient.create(clientResources, redisUri);}@Bean(destroyMethod = "close")@ConditionalOnProperty(name = "lettuce.cluster")public StatefulRedisClusterConnection<String, String> clusterConnection(RedisClusterClient clusterClient) {return clusterClient.connect();} }

最后為了讓IDE識(shí)別我們的配置,可以添加IDE親緣性,/META-INF文件夾下新增一個(gè)文件spring-configuration-metadata.json,內(nèi)容如下:

{"properties": [{"name": "lettuce.single","type": "club.throwable.spring.lettuce.LettuceSingleProperties","description": "單機(jī)配置","sourceType": "club.throwable.spring.lettuce.LettuceProperties"},{"name": "lettuce.replica","type": "club.throwable.spring.lettuce.LettuceReplicaProperties","description": "主從配置","sourceType": "club.throwable.spring.lettuce.LettuceProperties"},{"name": "lettuce.sentinel","type": "club.throwable.spring.lettuce.LettuceSentinelProperties","description": "哨兵配置","sourceType": "club.throwable.spring.lettuce.LettuceProperties"},{"name": "lettuce.single","type": "club.throwable.spring.lettuce.LettuceClusterProperties","description": "集群配置","sourceType": "club.throwable.spring.lettuce.LettuceProperties"}] }

如果想IDE親緣性做得更好,可以添加/META-INF/additional-spring-configuration-metadata.json進(jìn)行更多細(xì)節(jié)定義。簡(jiǎn)單使用如下:

@Slf4j @Component public class RedisCommandLineRunner implements CommandLineRunner {@Autowired@Qualifier("singleRedisConnection")private StatefulRedisConnection<String, String> connection;@Overridepublic void run(String... args) throws Exception {RedisCommands<String, String> redisCommands = connection.sync();redisCommands.setex("name", 5, "throwable");log.info("Get value:{}", redisCommands.get("name"));} } // Get value:throwable

小結(jié)

本文算是基于Lettuce的官方文檔,對(duì)它的使用進(jìn)行全方位的分析,包括主要功能、配置都做了一些示例,限于篇幅部分特性和配置細(xì)節(jié)沒有分析。Lettuce已經(jīng)被spring-data-redis接納作為官方的Redis客戶端驅(qū)動(dòng),所以值得信賴,它的一些API設(shè)計(jì)確實(shí)比較合理,擴(kuò)展性高的同時(shí)靈活性也高。個(gè)人建議,基于Lettuce包自行添加配置到SpringBoot應(yīng)用用起來(lái)會(huì)得心應(yīng)手,畢竟RedisTemplate實(shí)在太笨重,而且還屏蔽了Lettuce一些高級(jí)特性和靈活的API。

參考資料:

  • Lettuce Reference Guide

鏈接

  • Github Page:http://www.throwable.club/2019/09/28/redis-client-driver-lettuce-usage
  • Coding Page:http://throwable.coding.me/2019/09/28/redis-client-driver-lettuce-usage

(本文完 c-14-d e-a-20190928 最近事太多...)

轉(zhuǎn)載于:https://www.cnblogs.com/throwable/p/11601538.html

總結(jié)

以上是生活随笔為你收集整理的Redis高级客户端Lettuce详解的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。