日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

redission收发命令流程分析

發布時間:2025/3/19 编程问答 20 豆豆
生活随笔 收集整理的這篇文章主要介紹了 redission收发命令流程分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一、示例,我們從最簡單的GET命令開始。

RBucket<Object> t = redissonClient.getBucket("syncTradeUid_idOff"); int idOff = (int)t.get();

二、springboot的Redission自動配置

@Order(value = 4001) @ConditionalOnProperty("redisson.password") @Configuration @EnableConfigurationProperties({RedissonProperties.class}) public class RedissonAutoConfiguration {public RedissonAutoConfiguration() {System.out.println("==========================redis 初始化成功=======================");}@Autowiredprivate RedissonProperties redissonProperties;@Bean(name = "redissonClient")@ConditionalOnProperty(name="redisson.address")RedissonClient redissonSingle() {Config config = new Config();config.setCodec(new FastJsonCodec());SingleServerConfig serverConfig = config.useSingleServer().setAddress(redissonProperties.getAddress()).setTimeout(redissonProperties.getTimeout()).setConnectionPoolSize(redissonProperties.getConnectionPoolSize()).setConnectionMinimumIdleSize(redissonProperties.getConnectionMinimumIdleSize());if(!StrUtil.isEmpty(redissonProperties.getPassword())) {serverConfig.setPassword(redissonProperties.getPassword());}return Redisson.create(config);}/*** 哨兵模式自動裝配* @return*/@Bean(name = "redissonClient")@ConditionalOnProperty(name="redisson.masterName")RedissonClient redissonSentinel() {Config config = new Config();config.setCodec(new FastJsonCodec());SentinelServersConfig serverConfig = config.useSentinelServers().addSentinelAddress(redissonProperties.getSentinelAddresses()).setMasterName(redissonProperties.getMasterName()).setTimeout(redissonProperties.getTimeout()).setMasterConnectionPoolSize(redissonProperties.getMasterConnectionPoolSize()).setSlaveConnectionPoolSize(redissonProperties.getSlaveConnectionPoolSize()).setReadMode(ReadMode.SLAVE);if(!StrUtil.isEmpty(redissonProperties.getPassword())) {serverConfig.setPassword(redissonProperties.getPassword());}return Redisson.create(config);}}

application.properites

#單機 redisson.address = redis://127.0.0.1:6379 redisson.password =#哨兵 #redisson.masterName=BF-20190319DBXF #redisson.schema=redis:// #redisson.sentinelAddresses=redis://127.0.0.1:26379,redis://127.0.0.1:26479,redis://127.0.0.1:26579 #redisson.password=

三、REDISSION自動配置初始化流程

1.從Redisson.create(config)創建redission對象開始。Redission繼承于

RedissonClient

2. 創建連接管理器對象

org.redisson.config.ConfigSupportpublic static ConnectionManager createConnectionManager(Config configCopy) {UUID id = UUID.randomUUID();if (configCopy.getMasterSlaveServersConfig() != null) {validate(configCopy.getMasterSlaveServersConfig());return new MasterSlaveConnectionManager(configCopy.getMasterSlaveServersConfig(), configCopy, id);} else if (configCopy.getSingleServerConfig() != null) {validate(configCopy.getSingleServerConfig());return new SingleConnectionManager(configCopy.getSingleServerConfig(), configCopy, id);} else if (configCopy.getSentinelServersConfig() != null) {validate(configCopy.getSentinelServersConfig());return new SentinelConnectionManager(configCopy.getSentinelServersConfig(), configCopy, id);} else if (configCopy.getClusterServersConfig() != null) {validate(configCopy.getClusterServersConfig());return new ClusterConnectionManager(configCopy.getClusterServersConfig(), configCopy, id);} else if (configCopy.getReplicatedServersConfig() != null) {validate(configCopy.getReplicatedServersConfig());return new ReplicatedConnectionManager(configCopy.getReplicatedServersConfig(), configCopy, id);} else if (configCopy.getConnectionManager() != null) {return configCopy.getConnectionManager();}else {throw new IllegalArgumentException("server(s) address(es) not defined!");}}

3.先看下MasterSlaveEntry->setupMasterEntry,這里會創建RedisClient,以及連接REDIS服務器。

org.redisson.connection.MasterSlaveEntry public RFuture<RedisClient> setupMasterEntry(RedisURI address) {RedisClient client = connectionManager.createClient(NodeType.MASTER, address, sslHostname);return setupMasterEntry(client);}

4.創建RedisClient,這里面單機也是使用主從管理器,即是只有主沒有從。統一起來。

org.redisson.connection.MasterSlaveConnectionManager@Overridepublic RedisClient createClient(NodeType type, RedisURI address, int timeout, int commandTimeout, String sslHostname) {RedisClientConfig redisConfig = createRedisConfig(type, address, timeout, commandTimeout, sslHostname);return RedisClient.create(redisConfig);}

5.在RedisClient會創建NEETY的bootstrap,channel,handler.

org.redisson.client.RedisClientprivate Bootstrap createBootstrap(RedisClientConfig config, Type type) {Bootstrap bootstrap = new Bootstrap().resolver(config.getResolverGroup()).channel(config.getSocketChannelClass()).group(config.getGroup());bootstrap.handler(new RedisChannelInitializer(bootstrap, config, this, channels, type));bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeout());bootstrap.option(ChannelOption.SO_KEEPALIVE, config.isKeepAlive());bootstrap.option(ChannelOption.TCP_NODELAY, config.isTcpNoDelay());config.getNettyHook().afterBoostrapInitialization(bootstrap);return bootstrap;}

?6.我們再看下RedisChannelInitializer,有添加哪些inBounder,outBounder

org.redisson.client.handler.RedisChannelInitializer @Overrideprotected void initChannel(Channel ch) throws Exception {initSsl(config, ch);if (type == Type.PLAIN) {ch.pipeline().addLast(new RedisConnectionHandler(redisClient));} else {ch.pipeline().addLast(new RedisPubSubConnectionHandler(redisClient));}ch.pipeline().addLast(connectionWatchdog,CommandEncoder.INSTANCE,CommandBatchEncoder.INSTANCE,new CommandsQueue());if (pingConnectionHandler != null) {ch.pipeline().addLast(pingConnectionHandler);}if (type == Type.PLAIN) {ch.pipeline().addLast(new CommandDecoder(config.getAddress().getScheme()));} else {ch.pipeline().addLast(new CommandPubSubDecoder(config));}ch.pipeline().addLast(new ErrorsLoggingHandler());config.getNettyHook().afterChannelInitialization(ch);}

?

?7.創建好RedisClient后,開始連接REDIS服務器。這里首先異步解析地址,解析成功后,在添加到寫連接池時會創建和添加連接,在創建連接時會去連接REDIS服務器。

org.redisson.connection. MasterSlaveEntryprivate RFuture<RedisClient> setupMasterEntry(RedisClient client) {RPromise<RedisClient> result = new RedissonPromise<RedisClient>();result.onComplete((res, e) -> {if (e != null) {client.shutdownAsync();}});RFuture<InetSocketAddress> addrFuture = client.resolveAddr();addrFuture.onComplete((res, e) -> {if (e != null) {result.tryFailure(e);return;}masterEntry = new ClientConnectionsEntry(client, config.getMasterConnectionMinimumIdleSize(), config.getMasterConnectionPoolSize(),config.getSubscriptionConnectionMinimumIdleSize(),config.getSubscriptionConnectionPoolSize(), connectionManager,NodeType.MASTER);int counter = 1;if (config.getSubscriptionMode() == SubscriptionMode.MASTER) {counter++;}CountableListener<RedisClient> listener = new CountableListener<>(result, client, counter);RFuture<Void> writeFuture = writeConnectionPool.add(masterEntry);writeFuture.onComplete(listener);});return result;}

8.查看連接REDIS服務器過程

org.redisson.connection.pool.ConnectionPool. private void initConnections(ClientConnectionsEntry entry, RPromise<Void> initPromise, boolean checkFreezed) {int minimumIdleSize = getMinimumIdleSize(entry);if (minimumIdleSize == 0 || (checkFreezed && entry.isFreezed())) {initPromise.trySuccess(null);return;}AtomicInteger initializedConnections = new AtomicInteger(minimumIdleSize);int startAmount = Math.min(10, minimumIdleSize);AtomicInteger requests = new AtomicInteger(startAmount);for (int i = 0; i < startAmount; i++) {createConnection(checkFreezed, requests, entry, initPromise, minimumIdleSize, initializedConnections);}}

在這里可以看到會初始化10個客戶端連接到連接池。

9.從連接池去申請創建連接

ConnectionPool private void createConnection(boolean checkFreezed, AtomicInteger requests, ClientConnectionsEntry entry, RPromise<Void> initPromise,int minimumIdleSize, AtomicInteger initializedConnections) {acquireConnection(entry, new Runnable() {@Overridepublic void run() {RPromise<T> promise = new RedissonPromise<T>();createConnection(entry, promise);promise.onComplete((conn, e) -> {});}});}

?10.最終創建連接是在RedisClient.connectAsync這個異步連接方法中。

public RFuture<RedisConnection> connectAsync() {final RPromise<RedisConnection> f = new RedissonPromise<RedisConnection>();RFuture<InetSocketAddress> addrFuture = resolveAddr();addrFuture.onComplete((res, e) -> {if (e != null) {f.tryFailure(e);return;}ChannelFuture channelFuture = bootstrap.connect(res);});return f;}

11.在連接成功后,RedisConnectionHandler.channelRegistered方法中創建連接對象。

@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {if (connection == null) {connection = createConnection(ctx);}super.channelRegistered(ctx);}

12.在這里對channel賦值,保存。這里每個channel里面會有一個RedisConnection的屬性。

RedisConnection public void updateChannel(Channel channel) {this.channel = channel;channel.attr(CONNECTION).set(this);}

13.在連接成功后發送PING心跳命令

BaseConnectionHandler @Overridepublic void channelActive(final ChannelHandlerContext ctx) {List<RFuture<Object>> futures = new ArrayList<RFuture<Object>>();RedisClientConfig config = redisClient.getConfig();if (config.getPassword() != null) {RFuture<Object> future;if (config.getUsername() != null) {future = connection.async(RedisCommands.AUTH, config.getUsername(), config.getPassword());} else {future = connection.async(RedisCommands.AUTH, config.getPassword());}futures.add(future);}futures.add(future);}if (config.getPingConnectionInterval() > 0) {RFuture<Object> future = connection.async(RedisCommands.PING);futures.add(future);}final AtomicBoolean retry = new AtomicBoolean();final AtomicInteger commandsCounter = new AtomicInteger(futures.size());for (RFuture<Object> future : futures) {future.onComplete((res, e) -> {if (e != null) {if (e instanceof RedisLoadingException) {if (retry.compareAndSet(false, true)) {ctx.executor().schedule(() -> {channelActive(ctx);}, 1, TimeUnit.SECONDS);}return;}connection.closeAsync();connectionPromise.tryFailure(e);return;}if (commandsCounter.decrementAndGet() == 0) {ctx.fireChannelActive();connectionPromise.trySuccess(connection);}});}}

四、發送命令過程。

1.首先從RedissionBucket的set方法

,這里面的?commandExecutor來源于connectionManager中的命令執行器。

2.然后進行入到RedisExecutor中的execute方法,去異步執行命令。這里首先從連接池獲取連接,然后在異步連接成功后,發送命令。

public void execute() {codec = getCodec(codec);RFuture<RedisConnection> connectionFuture = getConnection();connectionFuture.onComplete((connection, e) -> {sendCommand(attemptPromise, connection);writeFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {checkWriteFuture(writeFuture, attemptPromise, connection);}});releaseConnection(attemptPromise, connectionFuture);});}

3.獲取連接是從連接池中獲取。根據讀寫模式從連接管理器中選擇可用連接返回。

RedisExecutor protected RFuture<RedisConnection> getConnection() {if (readOnlyMode) {connectionFuture = connectionManager.connectionReadOp(source, command);} else {connectionFuture = connectionManager.connectionWriteOp(source, command);}return connectionFuture;}

?3.接著調用RedisConnection的send向channel寫入數據。

RedisConnection public <T, R> ChannelFuture send(CommandData<T, R> data) {return channel.writeAndFlush(data);}

4.netty的inBoundHandler中有一個CommandsQueue,為一個命令同步隊列,同一時刻一個連接只有一個命令在執行,執行完后,再執行下一個命令。

org.redisson.client.handler.CommandsQueue private void sendData(Channel ch) {QueueCommandHolder command = queue.peek();if (command != null && command.trySend()) {QueueCommand data = command.getCommand();List<CommandData<Object, Object>> pubSubOps = data.getPubSubOperations();if (!pubSubOps.isEmpty()) {for (CommandData<Object, Object> cd : pubSubOps) {for (Object channel : cd.getParams()) {ch.pipeline().get(CommandPubSubDecoder.class).addPubSubCommand((ChannelName) channel, cd);}}} else {ch.attr(CURRENT_COMMAND).set(data);}command.getChannelPromise().addListener(listener);ch.writeAndFlush(data, command.getChannelPromise());}}

?

?

五、接收數據回調過程。

1.接收inhandler, 在收到數據后,從attr中的current_command屬性中取出數據。

CommandDecoder @Overrideprotected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {QueueCommand data = ctx.channel().attr(CommandsQueue.CURRENT_COMMAND).get();if (state() == null) {state(new State());}if (data == null) {while (in.writerIndex() > in.readerIndex()) {int endIndex = skipCommand(in);try {decode(ctx, in, data);} catch (Exception e) {in.readerIndex(endIndex);throw e;}}} else {int endIndex = 0;if (!(data instanceof CommandsData)) {endIndex = skipCommand(in);}try {decode(ctx, in, data);} catch (Exception e) {if (!(data instanceof CommandsData)) {in.readerIndex(endIndex);}throw e;}}}

?

2.根據相應的PROMISE設置回調數據。

CommandDecoderprotected void completeResponse(CommandData<Object, Object> data, Object result) {if (data != null) {data.getPromise().trySuccess(result);}}

3.在等待異步PROMISE結果。

CommandAsyncService @Overridepublic <V> V get(RFuture<V> future) {try {future.await();} catch (InterruptedException e) {Thread.currentThread().interrupt();}if (future.isSuccess()) {return future.getNow();}throw convertException(future);}

?

總結

以上是生活随笔為你收集整理的redission收发命令流程分析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。