日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

springboot 多线程_redis官方推荐:SpringBoot用这个,一键多线程

發布時間:2023/12/2 javascript 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 springboot 多线程_redis官方推荐:SpringBoot用这个,一键多线程 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Lettuce是一個可伸縮的線程安全的Redis客戶端,提供了同步,異步和響應式使用方式。 如果多線程避免阻塞和事務操作(如BLPOP和MULTI / EXEC),則多個線程可共享一個連接。 Lettuce使用通信使用netty。 支持先進的Redis功能,如Sentinel,群集,管道傳輸,自動重新連接和Redis數據模型。

下面分享來自網易后端工程師的Lettuce的使用心得~

自己整理的Java架構學習視頻和大廠項目底層知識點,需要的同學歡迎私信我【資料】發給你~一起學習進步!

Lettuce在Spring boot中的配置

@Bean(name="clusterRedisURI") RedisURI clusterRedisURI(){ return RedisURI.builder().withHost("xxx").withPort(6954).build(); } @Bean ClusterClientOptions clusterClientOptions(){ return ClusterClientOptions.builder().autoReconnect(true).maxRedirects(1024).build(); } @Bean RedisClusterClient redisClusterClient(ClientResources clientResources, ClusterClientOptions clusterClientOptions, RedisURI clusterRedisURI){ RedisClusterClient redisClusterClient= RedisClusterClient.create(clientResources,clusterRedisURI); redisClusterClient.setOptions(clusterClientOptions); return redisClusterClient; } @Bean(destroyMethod = "close") StatefulRedisClusterConnection statefulRedisClusterConnection(RedisClusterClient redisClusterClient){ return redisClusterClient.connect(); }

基本的使用方式

@Bean(name="clusterRedisURI") RedisURI clusterRedisURI(){ return RedisURI.builder().withHost("xxx").withPort(6954).build(); } @Bean ClusterClientOptions clusterClientOptions(){ return ClusterClientOptions.builder().autoReconnect(true).maxRedirects(1024).build(); } @Bean RedisClusterClient redisClusterClient(ClientResources clientResources, ClusterClientOptions clusterClientOptions, RedisURI clusterRedisURI){ RedisClusterClient redisClusterClient= RedisClusterClient.create(clientResources,clusterRedisURI); redisClusterClient.setOptions(clusterClientOptions); return redisClusterClient; } @Bean(destroyMethod = "close") StatefulRedisClusterConnection statefulRedisClusterConnection(RedisClusterClient redisClusterClient){ return redisClusterClient.connect(); }

集群模式

@Bean(name="clusterRedisURI") RedisURI clusterRedisURI(){ return RedisURI.builder().withHost("xxx").withPort(6954).build(); } @Bean ClusterClientOptions clusterClientOptions(){ return ClusterClientOptions.builder().autoReconnect(true).maxRedirects(1024).build(); } @Bean RedisClusterClient redisClusterClient(ClientResources clientResources, ClusterClientOptions clusterClientOptions, RedisURI clusterRedisURI){ RedisClusterClient redisClusterClient= RedisClusterClient.create(clientResources,clusterRedisURI); redisClusterClient.setOptions(clusterClientOptions); return redisClusterClient; } @Bean(destroyMethod = "close") StatefulRedisClusterConnection statefulRedisClusterConnection(RedisClusterClient redisClusterClient){ return redisClusterClient.connect(); }

客戶端訂閱事件

客戶端使用事件總線傳輸運行期間產生的事件;EventBus可以從客戶端資源進行配置和獲取,并用于客戶端和自定義事件。  

如下事件可以被客戶端發送:

  • 連接事件
  • 測量事件 (Lettuce命令延遲測量(CommandLatency))
  • 集群拓撲事件

訂閱所有事件,并將事件輸出到控制臺

client.getResources().eventBus().get().subscribe(e -> { System.out.println("client 訂閱事件: " + e); });

輸出到內容有:

client 訂閱事件: ConnectionActivatedEvent [/xx:49910 -> /xx:6008]client 訂閱事件: ConnectionActivatedEvent [/xx:49911 -> /xx:6018]client 訂閱事件: ConnectedEvent [/xx:49912 -> /xx:6018]

發布事件

用戶除了可以通過事件總線訂閱事件外還可以通過事件總線發布自定義事件

eventBus.publish(new Event() { @Override public String toString() { return "自定義事件"; } });

訂閱到到內容如下:

client 訂閱事件: 自定義事件

讀寫分離

lettuce master/slave模式支持讀寫分離,下面看看具體使用方式,只需要指定ReadFrom就可以了

@Bean(destroyMethod = "close") StatefulRedisMasterSlaveConnection statefulRedisMasterSlaveConnection(RedisClient redisClient, RedisURI redisURI) { StatefulRedisMasterSlaveConnection connection = MasterSlave.connect(redisClient, new Utf8StringCodec(), redisURI); connection.setReadFrom(ReadFrom.NEAREST); return connection; }}

ReadFrom可選參數以及含義:

參數含義

MASTER

從master節點讀取

SLAVE

從slave節點讀取

MASTER_PREFERRED

從master節點讀取,如果master節點不可以則從slave節點讀取

SLAVE_PREFERRED

從slave節點讀取,如果slave節點不可用則倒退到master節點讀取

NEAREST

從最近到節點讀取

下面看看源碼是如何實現讀寫分離的,

//根據意圖獲取連接 public StatefulRedisConnection getConnection(Intent intent) { if (debugEnabled) { logger.debug("getConnection(" + intent + ")"); } //如果readFrom不為null且是READ if (readFrom != null && intent == Intent.READ) { //根據readFrom配置從已知節點中選擇可用節點描述 List selection = readFrom.select(new ReadFrom.Nodes() { @Override public List getNodes() { return knownNodes; } @Override public Iterator iterator() { return knownNodes.iterator(); } }); //如果可選擇節點集合為空則拋出異常 if (selection.isEmpty()) { throw new RedisException(String.format("Cannot determine a node to read (Known nodes: %s) with setting %s", knownNodes, readFrom)); } try { //遍歷所有可用節點 for (RedisNodeDescription redisNodeDescription : selection) { //獲取節點連接 StatefulRedisConnection readerCandidate = getConnection(redisNodeDescription); //如果節點連接不是打開到連接則繼續查找下一個連接 if (!readerCandidate.isOpen()) { continue; } //返回可用連接 return readerCandidate; } //如果沒有找到可用連接,默認返回第一個 return getConnection(selection.get(0)); } catch (RuntimeException e) { throw new RedisException(e); } } //如果沒有配置readFrom或者不是READ 則返回master連接 return getConnection(getMaster()); }

自定義負載均衡

通過上文的讀寫分離實現代碼可以發現,只需要readFrom select方法每次返回的list都是隨機無序的就可以實現隨機的負載均衡

public class Sharded< C extends StatefulRedisConnection,V> { private TreeMap nodes; private final Hashing algo = Hashing.MURMUR_HASH; private final Map resources = new LinkedHashMap<>(); private RedisClient redisClient; private String password; private Set sentinels; private RedisCodec codec; public Sharded(List masters, RedisClient redisClient, String password, Set sentinels, RedisCodec codec) { this.redisClient = redisClient; this.password = password; this.sentinels = sentinels; this.codec = codec; initialize(masters); } private void initialize(List masters) { nodes = new TreeMap<>(); for (int i = 0; i != masters.size(); ++i) { final String master = masters.get(i); for (int n = 0; n < 160; n++) { nodes.put(this.algo.hash("SHARD-" + i + "-NODE-" + n), master); } RedisURI.Builder builder = RedisURI.builder(); for (HostAndPort hostAndPort : sentinels) { builder.withSentinel(hostAndPort.getHostText(), hostAndPort.getPort()); } RedisURI redisURI = builder.withPassword(password).withSentinelMasterId(master).build(); resources.put(master, MasterSlave.connect(redisClient, codec, redisURI)); } } public StatefulRedisConnection getConnectionBy(String key) { return resources.get(getShardInfo(SafeEncoder.encode(key))); } public Collection getAllConnection(){ return Collections.unmodifiableCollection(resources.values()); } public String getShardInfo(byte[] key) { SortedMap tail = nodes.tailMap(algo.hash(key)); if (tail.isEmpty()) { return nodes.get(nodes.firstKey()); } return tail.get(tail.firstKey()); } public void close(){ for(StatefulRedisConnection connection: getAllConnection()){ connection.close(); } } private static class SafeEncoder { static byte[] encode(final String str) { try { if (str == null) { throw new IllegalArgumentException("value sent to redis cannot be null"); } return str.getBytes("UTF-8"); } catch (UnsupportedEncodingException e) { throw new RuntimeException(e); } } } private interface Hashing { Hashing MURMUR_HASH = new MurmurHash(); long hash(String key); long hash(byte[] key); } private static class MurmurHash implements Hashing { static long hash64A(byte[] data, int seed) { return hash64A(ByteBuffer.wrap(data), seed); } static long hash64A(ByteBuffer buf, int seed) { ByteOrder byteOrder = buf.order(); buf.order(ByteOrder.LITTLE_ENDIAN); long m = 0xc6a4a7935bd1e995L; int r = 47; long h = seed ^ (buf.remaining() * m); long k; while (buf.remaining() >= 8) { k = buf.getLong(); k *= m; k ^= k >>> r; k *= m; h ^= k; h *= m; } if (buf.remaining() > 0) { ByteBuffer finish = ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN); // for big-endian version, do this first: // finish.position(8-buf.remaining()); finish.put(buf).rewind(); h ^= finish.getLong(); h *= m; } h ^= h >>> r; h *= m; h ^= h >>> r; buf.order(byteOrder); return h; } public long hash(byte[] key) { return hash64A(key, 0x1234ABCD); } public long hash(String key) { return hash(SafeEncoder.encode(key)); } } }

來源:網易工程師--張偉

有任何問題歡迎留言交流~


整理總結不易,如果覺得這篇文章有意思的話,歡迎轉發、收藏,給我一些鼓勵~

有想看的內容或者建議,敬請留言!

最近利用空余時間整理了一些精選Java架構學習視頻和大廠項目底層知識點,需要的同學歡迎私信我發給你~一起學習進步!有任何問題也歡迎交流~

Java日記本,每日存檔超實用的技術干貨學習筆記,每天陪你前進一點點~

總結

以上是生活随笔為你收集整理的springboot 多线程_redis官方推荐:SpringBoot用这个,一键多线程的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。