归档
简介
- Redisson 有连接池,获取连接时会从池里面去获取
测试
@Testpublic void testGet1() {RList<Integer> list = redisson.getList("list", IntegerCodec.INSTANCE);list.addAll(Arrays.asList(1, 2, 3));Integer i1 = list.get(0); Integer i2 = list.get(1);Integer i3 = list.get(2);System.out.printf("%s %s %s", i1, i2, i3);}
说明
org.redisson.command.RedisExecutor
protected CompletableFuture<RedisConnection> getConnection() {if (readOnlyMode) {connectionFuture = connectionReadOp(command);} else {connectionFuture = connectionWriteOp(command);}return connectionFuture;}final CompletableFuture<RedisConnection> connectionReadOp(RedisCommand<?> command) {entry = getEntry(true);...return entry.connectionReadOp(command);}
org.redisson.connection.SingleEntry
public class SingleEntry extends MasterSlaveEntry {@Overridepublic CompletableFuture<RedisConnection> connectionReadOp(RedisCommand<?> command) {return super.connectionWriteOp(command);}
}
org.redisson.connection.MasterSlaveEntry
public CompletableFuture<RedisConnection> connectionWriteOp(RedisCommand<?> command) {return writeConnectionPool.get(command);}
org.redisson.connection.pool.MasterConnectionPool
@Overridepublic CompletableFuture<RedisConnection> get(RedisCommand<?> command) {return acquireConnection(command, entries.peek()); }
org.redisson.connection.pool.ConnectionPool
protected final CompletableFuture<T> acquireConnection(RedisCommand<?> command, ClientConnectionsEntry entry) {...CompletableFuture<Void> f = acquireConnection(entry, command); f.thenAccept(r -> {connectTo(entry, result, command); });...return result;}private void connectTo(ClientConnectionsEntry entry, CompletableFuture<T> promise, RedisCommand<?> command) {...T conn = poll(entry, command);if (conn != null) {...connectedSuccessful(entry, promise, conn); return;}createConnection(entry, promise); }protected T poll(ClientConnectionsEntry entry, RedisCommand<?> command) {return (T) entry.pollConnection(command);}private void createConnection(ClientConnectionsEntry entry, CompletableFuture<T> promise) {CompletionStage<T> connFuture = connect(entry); connFuture.whenComplete((conn, e) -> {...if (changeUsage()) {promise.thenApply(c -> c.incUsage()); }connectedSuccessful(entry, promise, conn);});}protected CompletionStage<T> connect(ClientConnectionsEntry entry) {return (CompletionStage<T>) entry.connect(); }
org.redisson.connection.ClientConnectionsEntry
public RedisConnection pollConnection(RedisCommand<?> command) {RedisConnection c = freeConnections.poll(); if (c != null) {c.incUsage(); }return c;}public CompletionStage<RedisConnection> connect() {CompletionStage<RedisConnection> future = client.connectAsync(); return future.whenComplete((conn, e) -> {...allConnections.add(conn); });}
org.redisson.client.RedisClient
public RFuture<RedisConnection> connectAsync() {CompletableFuture<InetSocketAddress> addrFuture = resolveAddr();CompletableFuture<RedisConnection> f = addrFuture.thenCompose(res -> {CompletableFuture<RedisConnection> r = new CompletableFuture<>();ChannelFuture channelFuture = bootstrap.connect(res); channelFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(final ChannelFuture future) throws Exception {...if (future.isSuccess()) { RedisConnection c = RedisConnection.getFrom(future.channel());c.getConnectionPromise().whenComplete((res, e) -> {bootstrap.config().group().execute(new Runnable() {@Overridepublic void run() {if (e == null) {if (!r.complete(c)) { c.closeAsync();} ...}...}});});} ...}});return r;});return new CompletableFutureWrapper<>(f);}private RedisClient(RedisClientConfig config) {...bootstrap = createBootstrap(copy, Type.PLAIN);...}private Bootstrap createBootstrap(RedisClientConfig config, Type type) {Bootstrap bootstrap = new Bootstrap();...bootstrap.handler(new RedisChannelInitializer(bootstrap, config, this, channels, type));...return bootstrap;}
org.redisson.client.handler.RedisChannelInitializer
@Overrideprotected void initChannel(Channel ch) throws Exception {initSsl(config, ch);if (type == Type.PLAIN) {ch.pipeline().addLast(new RedisConnectionHandler(redisClient));} else {ch.pipeline().addLast(new RedisPubSubConnectionHandler(redisClient));}...}
org.redisson.client.handler.RedisConnectionHandler
@OverrideRedisConnection createConnection(ChannelHandlerContext ctx) {return new RedisConnection(redisClient, ctx.channel(), connectionPromise);}@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {if (connection == null) {connection = createConnection(ctx); }super.channelRegistered(ctx);}
org.redisson.client.RedisConnection
public <C> RedisConnection(RedisClient redisClient, Channel channel, CompletableFuture<C> connectionPromise) {...updateChannel(channel); ...}public void updateChannel(Channel channel) {...this.channel = channel;channel.attr(CONNECTION).set(this); }