日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

redission收发命令流程分析

發(fā)布時(shí)間:2025/3/19 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 redission收发命令流程分析 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

一、示例,我們從最簡(jiǎn)單的GET命令開(kāi)始。

RBucket<Object> t = redissonClient.getBucket("syncTradeUid_idOff"); int idOff = (int)t.get();

二、springboot的Redission自動(dòng)配置

@Order(value = 4001) @ConditionalOnProperty("redisson.password") @Configuration @EnableConfigurationProperties({RedissonProperties.class}) public class RedissonAutoConfiguration {public RedissonAutoConfiguration() {System.out.println("==========================redis 初始化成功=======================");}@Autowiredprivate RedissonProperties redissonProperties;@Bean(name = "redissonClient")@ConditionalOnProperty(name="redisson.address")RedissonClient redissonSingle() {Config config = new Config();config.setCodec(new FastJsonCodec());SingleServerConfig serverConfig = config.useSingleServer().setAddress(redissonProperties.getAddress()).setTimeout(redissonProperties.getTimeout()).setConnectionPoolSize(redissonProperties.getConnectionPoolSize()).setConnectionMinimumIdleSize(redissonProperties.getConnectionMinimumIdleSize());if(!StrUtil.isEmpty(redissonProperties.getPassword())) {serverConfig.setPassword(redissonProperties.getPassword());}return Redisson.create(config);}/*** 哨兵模式自動(dòng)裝配* @return*/@Bean(name = "redissonClient")@ConditionalOnProperty(name="redisson.masterName")RedissonClient redissonSentinel() {Config config = new Config();config.setCodec(new FastJsonCodec());SentinelServersConfig serverConfig = config.useSentinelServers().addSentinelAddress(redissonProperties.getSentinelAddresses()).setMasterName(redissonProperties.getMasterName()).setTimeout(redissonProperties.getTimeout()).setMasterConnectionPoolSize(redissonProperties.getMasterConnectionPoolSize()).setSlaveConnectionPoolSize(redissonProperties.getSlaveConnectionPoolSize()).setReadMode(ReadMode.SLAVE);if(!StrUtil.isEmpty(redissonProperties.getPassword())) {serverConfig.setPassword(redissonProperties.getPassword());}return Redisson.create(config);}}

application.properites

#單機(jī) redisson.address = redis://127.0.0.1:6379 redisson.password =#哨兵 #redisson.masterName=BF-20190319DBXF #redisson.schema=redis:// #redisson.sentinelAddresses=redis://127.0.0.1:26379,redis://127.0.0.1:26479,redis://127.0.0.1:26579 #redisson.password=

三、REDISSION自動(dòng)配置初始化流程

1.從Redisson.create(config)創(chuàng)建redission對(duì)象開(kāi)始。Redission繼承于

RedissonClient

2. 創(chuàng)建連接管理器對(duì)象

org.redisson.config.ConfigSupportpublic static ConnectionManager createConnectionManager(Config configCopy) {UUID id = UUID.randomUUID();if (configCopy.getMasterSlaveServersConfig() != null) {validate(configCopy.getMasterSlaveServersConfig());return new MasterSlaveConnectionManager(configCopy.getMasterSlaveServersConfig(), configCopy, id);} else if (configCopy.getSingleServerConfig() != null) {validate(configCopy.getSingleServerConfig());return new SingleConnectionManager(configCopy.getSingleServerConfig(), configCopy, id);} else if (configCopy.getSentinelServersConfig() != null) {validate(configCopy.getSentinelServersConfig());return new SentinelConnectionManager(configCopy.getSentinelServersConfig(), configCopy, id);} else if (configCopy.getClusterServersConfig() != null) {validate(configCopy.getClusterServersConfig());return new ClusterConnectionManager(configCopy.getClusterServersConfig(), configCopy, id);} else if (configCopy.getReplicatedServersConfig() != null) {validate(configCopy.getReplicatedServersConfig());return new ReplicatedConnectionManager(configCopy.getReplicatedServersConfig(), configCopy, id);} else if (configCopy.getConnectionManager() != null) {return configCopy.getConnectionManager();}else {throw new IllegalArgumentException("server(s) address(es) not defined!");}}

3.先看下MasterSlaveEntry->setupMasterEntry,這里會(huì)創(chuàng)建RedisClient,以及連接REDIS服務(wù)器。

org.redisson.connection.MasterSlaveEntry public RFuture<RedisClient> setupMasterEntry(RedisURI address) {RedisClient client = connectionManager.createClient(NodeType.MASTER, address, sslHostname);return setupMasterEntry(client);}

4.創(chuàng)建RedisClient,這里面單機(jī)也是使用主從管理器,即是只有主沒(méi)有從。統(tǒng)一起來(lái)。

org.redisson.connection.MasterSlaveConnectionManager@Overridepublic RedisClient createClient(NodeType type, RedisURI address, int timeout, int commandTimeout, String sslHostname) {RedisClientConfig redisConfig = createRedisConfig(type, address, timeout, commandTimeout, sslHostname);return RedisClient.create(redisConfig);}

5.在RedisClient會(huì)創(chuàng)建NEETY的bootstrap,channel,handler.

org.redisson.client.RedisClientprivate Bootstrap createBootstrap(RedisClientConfig config, Type type) {Bootstrap bootstrap = new Bootstrap().resolver(config.getResolverGroup()).channel(config.getSocketChannelClass()).group(config.getGroup());bootstrap.handler(new RedisChannelInitializer(bootstrap, config, this, channels, type));bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeout());bootstrap.option(ChannelOption.SO_KEEPALIVE, config.isKeepAlive());bootstrap.option(ChannelOption.TCP_NODELAY, config.isTcpNoDelay());config.getNettyHook().afterBoostrapInitialization(bootstrap);return bootstrap;}

?6.我們?cè)倏聪翿edisChannelInitializer,有添加哪些inBounder,outBounder

org.redisson.client.handler.RedisChannelInitializer @Overrideprotected void initChannel(Channel ch) throws Exception {initSsl(config, ch);if (type == Type.PLAIN) {ch.pipeline().addLast(new RedisConnectionHandler(redisClient));} else {ch.pipeline().addLast(new RedisPubSubConnectionHandler(redisClient));}ch.pipeline().addLast(connectionWatchdog,CommandEncoder.INSTANCE,CommandBatchEncoder.INSTANCE,new CommandsQueue());if (pingConnectionHandler != null) {ch.pipeline().addLast(pingConnectionHandler);}if (type == Type.PLAIN) {ch.pipeline().addLast(new CommandDecoder(config.getAddress().getScheme()));} else {ch.pipeline().addLast(new CommandPubSubDecoder(config));}ch.pipeline().addLast(new ErrorsLoggingHandler());config.getNettyHook().afterChannelInitialization(ch);}

?

?7.創(chuàng)建好RedisClient后,開(kāi)始連接REDIS服務(wù)器。這里首先異步解析地址,解析成功后,在添加到寫(xiě)連接池時(shí)會(huì)創(chuàng)建和添加連接,在創(chuàng)建連接時(shí)會(huì)去連接REDIS服務(wù)器。

org.redisson.connection. MasterSlaveEntryprivate RFuture<RedisClient> setupMasterEntry(RedisClient client) {RPromise<RedisClient> result = new RedissonPromise<RedisClient>();result.onComplete((res, e) -> {if (e != null) {client.shutdownAsync();}});RFuture<InetSocketAddress> addrFuture = client.resolveAddr();addrFuture.onComplete((res, e) -> {if (e != null) {result.tryFailure(e);return;}masterEntry = new ClientConnectionsEntry(client, config.getMasterConnectionMinimumIdleSize(), config.getMasterConnectionPoolSize(),config.getSubscriptionConnectionMinimumIdleSize(),config.getSubscriptionConnectionPoolSize(), connectionManager,NodeType.MASTER);int counter = 1;if (config.getSubscriptionMode() == SubscriptionMode.MASTER) {counter++;}CountableListener<RedisClient> listener = new CountableListener<>(result, client, counter);RFuture<Void> writeFuture = writeConnectionPool.add(masterEntry);writeFuture.onComplete(listener);});return result;}

8.查看連接REDIS服務(wù)器過(guò)程

org.redisson.connection.pool.ConnectionPool. private void initConnections(ClientConnectionsEntry entry, RPromise<Void> initPromise, boolean checkFreezed) {int minimumIdleSize = getMinimumIdleSize(entry);if (minimumIdleSize == 0 || (checkFreezed && entry.isFreezed())) {initPromise.trySuccess(null);return;}AtomicInteger initializedConnections = new AtomicInteger(minimumIdleSize);int startAmount = Math.min(10, minimumIdleSize);AtomicInteger requests = new AtomicInteger(startAmount);for (int i = 0; i < startAmount; i++) {createConnection(checkFreezed, requests, entry, initPromise, minimumIdleSize, initializedConnections);}}

在這里可以看到會(huì)初始化10個(gè)客戶(hù)端連接到連接池。

9.從連接池去申請(qǐng)創(chuàng)建連接

ConnectionPool private void createConnection(boolean checkFreezed, AtomicInteger requests, ClientConnectionsEntry entry, RPromise<Void> initPromise,int minimumIdleSize, AtomicInteger initializedConnections) {acquireConnection(entry, new Runnable() {@Overridepublic void run() {RPromise<T> promise = new RedissonPromise<T>();createConnection(entry, promise);promise.onComplete((conn, e) -> {});}});}

?10.最終創(chuàng)建連接是在RedisClient.connectAsync這個(gè)異步連接方法中。

public RFuture<RedisConnection> connectAsync() {final RPromise<RedisConnection> f = new RedissonPromise<RedisConnection>();RFuture<InetSocketAddress> addrFuture = resolveAddr();addrFuture.onComplete((res, e) -> {if (e != null) {f.tryFailure(e);return;}ChannelFuture channelFuture = bootstrap.connect(res);});return f;}

11.在連接成功后,RedisConnectionHandler.channelRegistered方法中創(chuàng)建連接對(duì)象。

@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {if (connection == null) {connection = createConnection(ctx);}super.channelRegistered(ctx);}

12.在這里對(duì)channel賦值,保存。這里每個(gè)channel里面會(huì)有一個(gè)RedisConnection的屬性。

RedisConnection public void updateChannel(Channel channel) {this.channel = channel;channel.attr(CONNECTION).set(this);}

13.在連接成功后發(fā)送PING心跳命令

BaseConnectionHandler @Overridepublic void channelActive(final ChannelHandlerContext ctx) {List<RFuture<Object>> futures = new ArrayList<RFuture<Object>>();RedisClientConfig config = redisClient.getConfig();if (config.getPassword() != null) {RFuture<Object> future;if (config.getUsername() != null) {future = connection.async(RedisCommands.AUTH, config.getUsername(), config.getPassword());} else {future = connection.async(RedisCommands.AUTH, config.getPassword());}futures.add(future);}futures.add(future);}if (config.getPingConnectionInterval() > 0) {RFuture<Object> future = connection.async(RedisCommands.PING);futures.add(future);}final AtomicBoolean retry = new AtomicBoolean();final AtomicInteger commandsCounter = new AtomicInteger(futures.size());for (RFuture<Object> future : futures) {future.onComplete((res, e) -> {if (e != null) {if (e instanceof RedisLoadingException) {if (retry.compareAndSet(false, true)) {ctx.executor().schedule(() -> {channelActive(ctx);}, 1, TimeUnit.SECONDS);}return;}connection.closeAsync();connectionPromise.tryFailure(e);return;}if (commandsCounter.decrementAndGet() == 0) {ctx.fireChannelActive();connectionPromise.trySuccess(connection);}});}}

四、發(fā)送命令過(guò)程。

1.首先從RedissionBucket的set方法

,這里面的?commandExecutor來(lái)源于connectionManager中的命令執(zhí)行器。

2.然后進(jìn)行入到RedisExecutor中的execute方法,去異步執(zhí)行命令。這里首先從連接池獲取連接,然后在異步連接成功后,發(fā)送命令。

public void execute() {codec = getCodec(codec);RFuture<RedisConnection> connectionFuture = getConnection();connectionFuture.onComplete((connection, e) -> {sendCommand(attemptPromise, connection);writeFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {checkWriteFuture(writeFuture, attemptPromise, connection);}});releaseConnection(attemptPromise, connectionFuture);});}

3.獲取連接是從連接池中獲取。根據(jù)讀寫(xiě)模式從連接管理器中選擇可用連接返回。

RedisExecutor protected RFuture<RedisConnection> getConnection() {if (readOnlyMode) {connectionFuture = connectionManager.connectionReadOp(source, command);} else {connectionFuture = connectionManager.connectionWriteOp(source, command);}return connectionFuture;}

?3.接著調(diào)用RedisConnection的send向channel寫(xiě)入數(shù)據(jù)。

RedisConnection public <T, R> ChannelFuture send(CommandData<T, R> data) {return channel.writeAndFlush(data);}

4.netty的inBoundHandler中有一個(gè)CommandsQueue,為一個(gè)命令同步隊(duì)列,同一時(shí)刻一個(gè)連接只有一個(gè)命令在執(zhí)行,執(zhí)行完后,再執(zhí)行下一個(gè)命令。

org.redisson.client.handler.CommandsQueue private void sendData(Channel ch) {QueueCommandHolder command = queue.peek();if (command != null && command.trySend()) {QueueCommand data = command.getCommand();List<CommandData<Object, Object>> pubSubOps = data.getPubSubOperations();if (!pubSubOps.isEmpty()) {for (CommandData<Object, Object> cd : pubSubOps) {for (Object channel : cd.getParams()) {ch.pipeline().get(CommandPubSubDecoder.class).addPubSubCommand((ChannelName) channel, cd);}}} else {ch.attr(CURRENT_COMMAND).set(data);}command.getChannelPromise().addListener(listener);ch.writeAndFlush(data, command.getChannelPromise());}}

?

?

五、接收數(shù)據(jù)回調(diào)過(guò)程。

1.接收inhandler, 在收到數(shù)據(jù)后,從attr中的current_command屬性中取出數(shù)據(jù)。

CommandDecoder @Overrideprotected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {QueueCommand data = ctx.channel().attr(CommandsQueue.CURRENT_COMMAND).get();if (state() == null) {state(new State());}if (data == null) {while (in.writerIndex() > in.readerIndex()) {int endIndex = skipCommand(in);try {decode(ctx, in, data);} catch (Exception e) {in.readerIndex(endIndex);throw e;}}} else {int endIndex = 0;if (!(data instanceof CommandsData)) {endIndex = skipCommand(in);}try {decode(ctx, in, data);} catch (Exception e) {if (!(data instanceof CommandsData)) {in.readerIndex(endIndex);}throw e;}}}

?

2.根據(jù)相應(yīng)的PROMISE設(shè)置回調(diào)數(shù)據(jù)。

CommandDecoderprotected void completeResponse(CommandData<Object, Object> data, Object result) {if (data != null) {data.getPromise().trySuccess(result);}}

3.在等待異步PROMISE結(jié)果。

CommandAsyncService @Overridepublic <V> V get(RFuture<V> future) {try {future.await();} catch (InterruptedException e) {Thread.currentThread().interrupt();}if (future.isSuccess()) {return future.getNow();}throw convertException(future);}

?

總結(jié)

以上是生活随笔為你收集整理的redission收发命令流程分析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。