diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/ConnectionStrategy.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/ConnectionStrategy.java index e62c5b3e9..ddc01015b 100644 --- a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/ConnectionStrategy.java +++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/ConnectionStrategy.java @@ -17,6 +17,7 @@ package io.asyncer.r2dbc.mysql; import io.asyncer.r2dbc.mysql.client.Client; +import io.asyncer.r2dbc.mysql.client.ReactorNettyClient; import io.asyncer.r2dbc.mysql.constant.CompressionAlgorithm; import io.asyncer.r2dbc.mysql.constant.SslMode; import io.netty.channel.ChannelOption; @@ -27,6 +28,8 @@ import io.netty.util.concurrent.EventExecutor; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; +import io.r2dbc.spi.R2dbcNonTransientResourceException; +import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Mono; import reactor.netty.resources.LoopResources; import reactor.netty.tcp.TcpClient; @@ -34,6 +37,8 @@ import java.net.InetSocketAddress; import java.time.Duration; import java.util.Set; +import java.util.function.Function; +import java.util.function.Supplier; /** * An interface of a connection strategy that considers how to obtain a MySQL {@link Client} object. @@ -50,7 +55,7 @@ interface ConnectionStrategy { * * @return a logged-in {@link Client} object. */ - Mono connect(); + Mono connect(); /** * Creates a general-purpose {@link TcpClient} with the given {@link SocketClientConfiguration}. @@ -88,7 +93,7 @@ static TcpClient createTcpClient(SocketClientConfiguration configuration, boolea * @param configuration a configuration that affects login behavior. * @return a logged-in {@link Client} object. */ - static Mono login( + static Mono login( TcpClient tcpClient, Credential credential, MySqlConnectionConfiguration configuration @@ -108,33 +113,91 @@ static Mono login( configuration.retrieveConnectionZoneId() ); - return Client.connect(tcpClient, ssl, context).flatMap(client -> + return ReactorNettyClient.connect(tcpClient, ssl, context).flatMap(client -> QueryFlow.login(client, sslMode, loginDb, credential, compressionAlgorithms, zstdLevel)); } -} - -/** - * Resolves the {@link InetSocketAddress} to IP address, randomly pick one if it resolves to multiple IP addresses. - * - * @since 1.2.0 - */ -final class BalancedResolverGroup extends AddressResolverGroup { - BalancedResolverGroup() { + /** + * Creates an exception that indicates a retry failure. + * + * @param message the message of the exception. + * @param cause the last exception that caused the retry. + * @return a retry failure exception. + */ + static R2dbcNonTransientResourceException retryFail(String message, @Nullable Throwable cause) { + return new R2dbcNonTransientResourceException( + message, + "H1000", + 9000, + cause + ); } - public static final BalancedResolverGroup INSTANCE; + /** + * Connect and login to a MySQL server with a specific TCP socket address. + * + * @since 1.2.0 + */ + final class InetConnectFunction implements Function, Mono> { + + private final boolean balancedDns; + + private final boolean tcpKeepAlive; + + private final boolean tcpNoDelay; - static { - INSTANCE = new BalancedResolverGroup(); - Runtime.getRuntime().addShutdownHook(new Thread( - INSTANCE::close, - "R2DBC-MySQL-BalancedResolverGroup-ShutdownHook" - )); + private final Credential credential; + + private final MySqlConnectionConfiguration configuration; + + InetConnectFunction( + boolean balancedDns, + boolean tcpKeepAlive, + boolean tcpNoDelay, + Credential credential, + MySqlConnectionConfiguration configuration + ) { + this.balancedDns = balancedDns; + this.tcpKeepAlive = tcpKeepAlive; + this.tcpNoDelay = tcpNoDelay; + this.credential = credential; + this.configuration = configuration; + } + + @Override + public Mono apply(Supplier address) { + TcpClient cc = ConnectionStrategy.createTcpClient(configuration.getClient(), balancedDns) + .option(ChannelOption.SO_KEEPALIVE, tcpKeepAlive) + .option(ChannelOption.TCP_NODELAY, tcpNoDelay) + .remoteAddress(address); + + return ConnectionStrategy.login(cc, credential, configuration); + } } - @Override - protected AddressResolver newResolver(EventExecutor executor) { - return new RoundRobinInetAddressResolver(executor, new DefaultNameResolver(executor)).asAddressResolver(); + /** + * Resolves the {@link InetSocketAddress} to IP address, randomly pick one if it resolves to multiple IP addresses. + * + * @since 1.2.0 + */ + final class BalancedResolverGroup extends AddressResolverGroup { + + BalancedResolverGroup() { + } + + public static final BalancedResolverGroup INSTANCE; + + static { + INSTANCE = new BalancedResolverGroup(); + Runtime.getRuntime().addShutdownHook(new Thread( + INSTANCE::close, + "R2DBC-MySQL-BalancedResolverGroup-ShutdownHook" + )); + } + + @Override + protected AddressResolver newResolver(EventExecutor executor) { + return new RoundRobinInetAddressResolver(executor, new DefaultNameResolver(executor)).asAddressResolver(); + } } } diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MultiHostsConnectionStrategy.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MultiHostsConnectionStrategy.java index 3443fc17e..6ba127c51 100644 --- a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MultiHostsConnectionStrategy.java +++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MultiHostsConnectionStrategy.java @@ -17,19 +17,17 @@ package io.asyncer.r2dbc.mysql; import io.asyncer.r2dbc.mysql.client.Client; +import io.asyncer.r2dbc.mysql.client.FailoverClient; +import io.asyncer.r2dbc.mysql.client.ReactorNettyClient; import io.asyncer.r2dbc.mysql.constant.ProtocolDriver; import io.asyncer.r2dbc.mysql.internal.NodeAddress; import io.asyncer.r2dbc.mysql.internal.util.InternalArrays; -import io.netty.channel.ChannelOption; import io.netty.resolver.DefaultNameResolver; import io.netty.resolver.NameResolver; import io.netty.util.concurrent.Future; -import io.r2dbc.spi.R2dbcNonTransientResourceException; -import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.netty.resources.LoopResources; -import reactor.netty.tcp.TcpClient; import reactor.netty.tcp.TcpResources; import java.net.InetAddress; @@ -46,101 +44,152 @@ */ final class MultiHostsConnectionStrategy implements ConnectionStrategy { - private final Mono client; + private final Mono client; MultiHostsConnectionStrategy( - TcpSocketConfiguration tcp, MySqlConnectionConfiguration configuration, - boolean shuffle + List addresses, + ProtocolDriver driver, + int retriesAllDown, + boolean shuffle, + boolean tcpKeepAlive, + boolean tcpNoDelay ) { - this.client = Mono.defer(() -> { - if (ProtocolDriver.DNS_SRV.equals(tcp.getDriver())) { + Mono client = configuration.getCredential().flatMap(credential -> { + if (ProtocolDriver.DNS_SRV.equals(driver)) { + logger.debug("Resolve hosts via DNS SRV: {}", addresses); + LoopResources resources = configuration.getClient().getLoopResources(); LoopResources loopResources = resources == null ? TcpResources.get() : resources; - - return resolveAllHosts(loopResources, tcp.getAddresses(), shuffle) - .flatMap(addresses -> connectHost(addresses, tcp, configuration, false, shuffle, 0)); + InetConnectFunction login = new InetConnectFunction( + false, + tcpKeepAlive, + tcpNoDelay, + credential, + configuration + ); + + return resolveAllHosts(loopResources, addresses, shuffle).flatMap(addrs -> { + logger.debug("Connect to multiple addresses: {}", addrs); + + return connectHost( + addrs, + login, + shuffle, + 0, + retriesAllDown + ); + }); } else { - List availableHosts = copyAvailableAddresses(tcp.getAddresses(), shuffle); + List availableHosts = copyAvailableAddresses(addresses, shuffle); + logger.debug("Connect to multiple hosts: {}", availableHosts); + int size = availableHosts.size(); - InetSocketAddress[] addresses = new InetSocketAddress[availableHosts.size()]; + InetSocketAddress[] array = new InetSocketAddress[availableHosts.size()]; for (int i = 0; i < size; i++) { - NodeAddress address = availableHosts.get(i); - addresses[i] = InetSocketAddress.createUnresolved(address.getHost(), address.getPort()); + array[i] = availableHosts.get(i).toUnresolved(); } - return connectHost(InternalArrays.asImmutableList(addresses), tcp, configuration, true, shuffle, 0); + List addrs = InternalArrays.asImmutableList(array); + InetConnectFunction login = new InetConnectFunction( + true, + tcpKeepAlive, + tcpNoDelay, + credential, + configuration + ); + + return connectHost( + addrs, + login, + shuffle, + 0, + retriesAllDown + ); } }); + + this.client = client.map(c -> new FailoverClient(c, client)); } @Override - public Mono connect() { + public Mono connect() { return client; } - private Mono connectHost( + private static Mono connectHost( List addresses, - TcpSocketConfiguration tcp, - MySqlConnectionConfiguration configuration, - boolean balancedDns, + InetConnectFunction login, boolean shuffle, - int attempts + int attempts, + int maxAttempts ) { Iterator iter = addresses.iterator(); if (!iter.hasNext()) { - return Mono.error(fail("Fail to establish connection: no available host", null)); + return Mono.error(ConnectionStrategy.retryFail("Fail to establish connection: no available host", null)); } - return attemptConnect(iter.next(), tcp, configuration, balancedDns).onErrorResume(t -> - resumeConnect(t, addresses, iter, tcp, configuration, balancedDns, shuffle, attempts)); + InetSocketAddress address = iter.next(); + + return login.apply(() -> address).onErrorResume(error -> resumeConnect( + error, + address, + addresses, + iter, + login, + shuffle, + attempts, + maxAttempts + )); } - private Mono resumeConnect( + private static Mono resumeConnect( Throwable t, + InetSocketAddress failed, List addresses, Iterator iter, - TcpSocketConfiguration tcp, - MySqlConnectionConfiguration configuration, - boolean balancedDns, + InetConnectFunction login, boolean shuffle, - int attempts + int attempts, + int maxAttempts ) { + logger.warn("Fail to connect to {}", failed, t); + if (!iter.hasNext()) { // The last host failed to connect - if (attempts >= tcp.getRetriesAllDown()) { - return Mono.error(fail( - "Fail to establish connection, retried " + attempts + " times: " + t.getMessage(), t)); + if (attempts >= maxAttempts) { + return Mono.error(ConnectionStrategy.retryFail( + "Fail to establish connections, retried " + attempts + " times", t)); } - logger.warn("All hosts failed to establish connections, auto-try again after 250ms."); + logger.warn("All hosts failed to establish connections, auto-try again after 250ms.", t); // Ignore waiting error, e.g. interrupted, scheduler rejected return Mono.delay(Duration.ofMillis(250)) .onErrorComplete() - .then(Mono.defer(() -> connectHost(addresses, tcp, configuration, balancedDns, shuffle, attempts + 1))); + .then(Mono.defer(() -> connectHost( + addresses, + login, + shuffle, + attempts + 1, + maxAttempts + ))); } - return attemptConnect(iter.next(), tcp, configuration, balancedDns).onErrorResume(tt -> - resumeConnect(tt, addresses, iter, tcp, configuration, balancedDns, shuffle, attempts)); - } - - private Mono attemptConnect( - InetSocketAddress address, - TcpSocketConfiguration tcp, - MySqlConnectionConfiguration configuration, - boolean balancedDns - ) { - return configuration.getCredential().flatMap(credential -> { - TcpClient tcpClient = ConnectionStrategy.createTcpClient(configuration.getClient(), balancedDns) - .option(ChannelOption.SO_KEEPALIVE, tcp.isTcpKeepAlive()) - .option(ChannelOption.TCP_NODELAY, tcp.isTcpNoDelay()) - .remoteAddress(() -> address); - - return ConnectionStrategy.login(tcpClient, credential, configuration); - }).doOnError(e -> logger.warn("Fail to connect: ", e)); + InetSocketAddress address = iter.next(); + + return login.apply(() -> address).onErrorResume(error -> resumeConnect( + error, + address, + addresses, + iter, + login, + shuffle, + attempts, + maxAttempts + )); } private static Mono> resolveAllHosts( @@ -199,13 +248,4 @@ private static List copyAvailableAddresses(List addres return InternalArrays.asImmutableList(addresses.toArray(new NodeAddress[0])); } - - private static R2dbcNonTransientResourceException fail(String message, @Nullable Throwable cause) { - return new R2dbcNonTransientResourceException( - message, - "H1000", - 9000, - cause - ); - } } diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionConfiguration.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionConfiguration.java index 784c87467..fb4ba179a 100644 --- a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionConfiguration.java +++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionConfiguration.java @@ -64,6 +64,8 @@ public final class MySqlConnectionConfiguration { private final MySqlSslConfiguration ssl; + private final boolean autoReconnect; + private final boolean preserveInstants; private final String connectionTimeZone; @@ -110,6 +112,7 @@ private MySqlConnectionConfiguration( SocketClientConfiguration client, SocketConfiguration socket, MySqlSslConfiguration ssl, + boolean autoReconnect, ZeroDateOption zeroDateOption, boolean preserveInstants, String connectionTimeZone, @@ -127,6 +130,7 @@ private MySqlConnectionConfiguration( this.client = requireNonNull(client, "client must not be null"); this.socket = requireNonNull(socket, "socket must not be null"); this.ssl = requireNonNull(ssl, "ssl must not be null"); + this.autoReconnect = autoReconnect; this.preserveInstants = preserveInstants; this.connectionTimeZone = requireNonNull(connectionTimeZone, "connectionTimeZone must not be null"); this.forceConnectionTimeZoneToSession = forceConnectionTimeZoneToSession; @@ -169,6 +173,10 @@ MySqlSslConfiguration getSsl() { return ssl; } + boolean isAutoReconnect() { + return autoReconnect; + } + ZeroDateOption getZeroDateOption() { return zeroDateOption; } @@ -272,6 +280,7 @@ public boolean equals(Object o) { return client.equals(that.client) && socket.equals(that.socket) && ssl.equals(that.ssl) && + autoReconnect == that.autoReconnect && preserveInstants == that.preserveInstants && connectionTimeZone.equals(that.connectionTimeZone) && forceConnectionTimeZoneToSession == that.forceConnectionTimeZoneToSession && @@ -298,6 +307,7 @@ public int hashCode() { return Objects.hash( client, socket, ssl, + autoReconnect, preserveInstants, connectionTimeZone, forceConnectionTimeZoneToSession, @@ -320,6 +330,7 @@ public String toString() { return "MySqlConnectionConfiguration{client=" + client + ", socket=" + socket + ", ssl=" + ssl + + ", autoReconnect=" + autoReconnect + ", preserveInstants=" + preserveInstants + ", connectionTimeZone='" + connectionTimeZone + '\'' + ", forceConnectionTimeZoneToSession=" + forceConnectionTimeZoneToSession + @@ -357,6 +368,8 @@ public static final class Builder { private final MySqlSslConfiguration.Builder ssl = new MySqlSslConfiguration.Builder(); + private boolean autoReconnect; + @Nullable private String database; @@ -434,6 +447,7 @@ public MySqlConnectionConfiguration build() { client.build(), socket, ssl.build(preferredSsl), + autoReconnect, zeroDateOption, preserveInstants, connectionTimeZone, @@ -600,6 +614,23 @@ public Builder protocol(HaProtocol protocol) { return this; } + /** + * Configures whether to perform failover reconnection. Default is {@code false}. + *

+ * It is not recommended due to it may lead to unexpected results. For example, it may recover a transaction + * state from a failed server node to an available node, the user can not aware of it, and continuing to execute + * more queries in the transaction will lead to unexpected inconsistencies. + * + * @param enabled {@code true} to enable failover reconnection. + * @return {@link Builder this} + * @see JDBC Failover + * @since 1.2.0 + */ + public Builder autoReconnect(boolean enabled) { + this.autoReconnect = enabled; + return this; + } + /** * Configures the connection timeout. Default no timeout. * diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactoryProvider.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactoryProvider.java index c26469aec..ad71be06d 100644 --- a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactoryProvider.java +++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactoryProvider.java @@ -66,6 +66,18 @@ public final class MySqlConnectionFactoryProvider implements ConnectionFactoryPr */ public static final Option UNIX_SOCKET = Option.valueOf("unixSocket"); + /** + * Option to whether to perform failover reconnection. Default to {@code false}. + *

+ * It is not recommended due to it may lead to unexpected results. For example, it may recover a transaction state + * from a failed server node to an available node, the user can not aware of it, and continuing to execute more + * queries in the transaction will lead to unexpected inconsistencies or errors. Or, user set a self-defined + * variable in the session, it may not be recovered to the new node due to the driver can not aware of it. + * + * @since 1.2.0 + */ + public static final Option AUTO_RECONNECT = Option.valueOf("autoReconnect"); + /** * Option to set the time zone conversion. Default to {@code true} means enable conversion between JVM and * {@link #CONNECTION_TIME_ZONE}. @@ -361,6 +373,8 @@ static MySqlConnectionConfiguration setup(ConnectionFactoryOptions options) { mapper.optional(FORCE_CONNECTION_TIME_ZONE_TO_SESSION).asBoolean() .to(builder::forceConnectionTimeZoneToSession); + mapper.optional(AUTO_RECONNECT).asBoolean() + .to(builder::autoReconnect); mapper.optional(TCP_KEEP_ALIVE).asBoolean() .to(builder::tcpKeepAlive); mapper.optional(TCP_NO_DELAY).asBoolean() diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/QueryFlow.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/QueryFlow.java index f2e2edf7c..c37be617f 100644 --- a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/QueryFlow.java +++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/QueryFlow.java @@ -22,6 +22,7 @@ import io.asyncer.r2dbc.mysql.cache.PrepareCache; import io.asyncer.r2dbc.mysql.client.Client; import io.asyncer.r2dbc.mysql.client.FluxExchangeable; +import io.asyncer.r2dbc.mysql.client.ReactorNettyClient; import io.asyncer.r2dbc.mysql.constant.CompressionAlgorithm; import io.asyncer.r2dbc.mysql.constant.ServerStatuses; import io.asyncer.r2dbc.mysql.constant.SslMode; @@ -207,7 +208,7 @@ static Flux> execute(Client client, List statements) * @return the messages received in response to the login exchange. */ - static Mono login(Client client, SslMode sslMode, String database, Credential credential, + static Mono login(ReactorNettyClient client, SslMode sslMode, String database, Credential credential, Set compressionAlgorithms, int zstdCompressionLevel) { return client.exchange(new LoginExchangeable( client, @@ -831,7 +832,7 @@ final class LoginExchangeable extends FluxExchangeable { private final Sinks.Many requests = Sinks.many().unicast() .onBackpressureBuffer(Queues.one().get()); - private final Client client; + private final ReactorNettyClient client; private final SslMode sslMode; @@ -854,7 +855,7 @@ final class LoginExchangeable extends FluxExchangeable { private boolean sslCompleted; - LoginExchangeable(Client client, SslMode sslMode, String database, String user, + LoginExchangeable(ReactorNettyClient client, SslMode sslMode, String database, String user, @Nullable CharSequence password, Set compressions, int zstdCompressionLevel) { this.client = client; diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/SingleHostConnectionStrategy.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/SingleHostConnectionStrategy.java index 59f83e457..5790e21c4 100644 --- a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/SingleHostConnectionStrategy.java +++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/SingleHostConnectionStrategy.java @@ -17,10 +17,11 @@ package io.asyncer.r2dbc.mysql; import io.asyncer.r2dbc.mysql.client.Client; +import io.asyncer.r2dbc.mysql.client.ReactorNettyClient; import io.asyncer.r2dbc.mysql.internal.NodeAddress; -import io.netty.channel.ChannelOption; import reactor.core.publisher.Mono; -import reactor.netty.tcp.TcpClient; + +import java.time.Duration; /** * An implementation of {@link ConnectionStrategy} that connects to a single host. It can be wrapped to a @@ -30,18 +31,24 @@ final class SingleHostConnectionStrategy implements ConnectionStrategy { private final Mono client; - SingleHostConnectionStrategy(TcpSocketConfiguration socket, MySqlConnectionConfiguration configuration) { + SingleHostConnectionStrategy( + MySqlConnectionConfiguration configuration, + NodeAddress address, + boolean tcpKeepAlive, + boolean tcpNoDelay + ) { this.client = configuration.getCredential().flatMap(credential -> { - NodeAddress address = socket.getFirstAddress(); - logger.debug("Connect to a single host: {}", address); - TcpClient tcpClient = ConnectionStrategy.createTcpClient(configuration.getClient(), true) - .option(ChannelOption.SO_KEEPALIVE, socket.isTcpKeepAlive()) - .option(ChannelOption.TCP_NODELAY, socket.isTcpNoDelay()) - .remoteAddress(address::toUnresolved); + InetConnectFunction login = new InetConnectFunction( + true, + tcpKeepAlive, + tcpNoDelay, + credential, + configuration + ); - return ConnectionStrategy.login(tcpClient, credential, configuration); + return connectHost(login, address, 0, 3); }); } @@ -49,4 +56,41 @@ final class SingleHostConnectionStrategy implements ConnectionStrategy { public Mono connect() { return client; } + + private static Mono connectHost( + InetConnectFunction login, + NodeAddress address, + int attempts, + int maxAttempts + ) { + return login.apply(address::toUnresolved) + .onErrorResume(t -> resumeConnect(t, address, login, attempts, maxAttempts)); + } + + private static Mono resumeConnect( + Throwable t, + NodeAddress address, + InetConnectFunction login, + int attempts, + int maxAttempts + ) { + logger.warn("Fail to connect to {}", address, t); + + if (attempts >= maxAttempts) { + return Mono.error(ConnectionStrategy.retryFail( + "Fail to establish connection, retried " + attempts + " times", t)); + } + + logger.warn("Failed to establish connection, auto-try again after 250ms.", t); + + // Ignore waiting error, e.g. interrupted, scheduler rejected + return Mono.delay(Duration.ofMillis(250)) + .onErrorComplete() + .then(Mono.defer(() -> connectHost( + login, + address, + attempts + 1, + maxAttempts + ))); + } } diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/TcpSocketConfiguration.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/TcpSocketConfiguration.java index f100a687f..e3ea6bb1f 100644 --- a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/TcpSocketConfiguration.java +++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/TcpSocketConfiguration.java @@ -219,16 +219,48 @@ public ConnectionStrategy strategy(MySqlConnectionConfiguration configuration) { case REPLICATION: ConnectionStrategy.logger.warn( "R2DBC Connection cannot be set to read-only, replication protocol will use the first host"); - return new SingleHostConnectionStrategy(this, configuration); + return new MultiHostsConnectionStrategy( + configuration, + Collections.singletonList(getFirstAddress()), + driver, + retriesAllDown, + false, + tcpKeepAlive, + tcpNoDelay + ); case SEQUENTIAL: - return new MultiHostsConnectionStrategy(this, configuration, false); + return new MultiHostsConnectionStrategy( + configuration, + addresses, + driver, + retriesAllDown, + false, + tcpKeepAlive, + tcpNoDelay + ); case LOAD_BALANCE: - return new MultiHostsConnectionStrategy(this, configuration, true); + return new MultiHostsConnectionStrategy( + configuration, + addresses, + driver, + retriesAllDown, + true, + tcpKeepAlive, + tcpNoDelay + ); default: if (ProtocolDriver.MYSQL == driver && addresses.size() == 1) { - return new SingleHostConnectionStrategy(this, configuration); + return new SingleHostConnectionStrategy(configuration, getFirstAddress(), tcpKeepAlive, tcpNoDelay); } else { - return new MultiHostsConnectionStrategy(this, configuration, false); + return new MultiHostsConnectionStrategy( + configuration, + addresses, + driver, + retriesAllDown, + false, + tcpKeepAlive, + tcpNoDelay + ); } } } diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/client/Client.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/client/Client.java index 6ac6e93a5..d6cc36943 100644 --- a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/client/Client.java +++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/client/Client.java @@ -99,31 +99,4 @@ public interface Client { * @return if connection is valid */ boolean isConnected(); - - /** - * Sends a signal to the connection, which means server does not support SSL. - */ - void sslUnsupported(); - - /** - * Sends a signal to {@link Client this}, which means login has succeeded. - */ - void loginSuccess(); - - /** - * Connects to a MySQL server using the provided {@link TcpClient} and {@link MySqlSslConfiguration}. - * - * @param tcpClient the configured TCP client - * @param ssl the SSL configuration - * @param context the connection context - * @return A {@link Mono} that will emit a connected {@link Client}. - * @throws IllegalArgumentException if {@code tcpClient}, {@code ssl} or {@code context} is {@code null}. - */ - static Mono connect(TcpClient tcpClient, MySqlSslConfiguration ssl, ConnectionContext context) { - requireNonNull(tcpClient, "tcpClient must not be null"); - requireNonNull(ssl, "ssl must not be null"); - requireNonNull(context, "context must not be null"); - - return tcpClient.connect().map(conn -> new ReactorNettyClient(conn, ssl, context)); - } } diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/client/FailoverClient.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/client/FailoverClient.java new file mode 100644 index 000000000..539fbd072 --- /dev/null +++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/client/FailoverClient.java @@ -0,0 +1,99 @@ +/* + * Copyright 2024 asyncer.io projects + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.asyncer.r2dbc.mysql.client; + +import io.asyncer.r2dbc.mysql.ConnectionContext; +import io.asyncer.r2dbc.mysql.message.client.ClientMessage; +import io.asyncer.r2dbc.mysql.message.server.ServerMessage; +import io.netty.buffer.ByteBufAllocator; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.SynchronousSink; + +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; + +/** + * An implementation of {@link Client} that supports failover. + */ +public final class FailoverClient implements Client { + + private final Mono failover; + + private final AtomicReference client; + + public FailoverClient(ReactorNettyClient client, Mono failover) { + this.client = new AtomicReference<>(client); + this.failover = failover; + } + + private Mono reconnectIfNecessary() { + return Mono.defer(() -> { + ReactorNettyClient client = this.client.get(); + + if (client.isChannelOpen() || client.isClosingOrClosed()) { + // Open, or closed by user + return Mono.just(client); + } + + return this.failover.flatMap(c -> { + if (this.client.compareAndSet(client, c)) { + // TODO: re-init session variables, transaction state, clear prepared cache, etc. + return Mono.just(c); + } + + // Reconnected by other thread, close this one and retry + return c.forceClose().then(reconnectIfNecessary()); + }); + }); + } + + @Override + public Flux exchange(ClientMessage request, BiConsumer> handler) { + return reconnectIfNecessary().flatMapMany(c -> c.exchange(request, handler)); + } + + @Override + public Flux exchange(FluxExchangeable exchangeable) { + return reconnectIfNecessary().flatMapMany(c -> c.exchange(exchangeable)); + } + + @Override + public Mono close() { + return Mono.fromSupplier(this.client::get).flatMap(ReactorNettyClient::close); + } + + @Override + public Mono forceClose() { + return Mono.fromSupplier(this.client::get).flatMap(ReactorNettyClient::forceClose); + } + + @Override + public ByteBufAllocator getByteBufAllocator() { + return this.client.get().getByteBufAllocator(); + } + + @Override + public ConnectionContext getContext() { + return this.client.get().getContext(); + } + + @Override + public boolean isConnected() { + return this.client.get().isConnected(); + } +} diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/client/ReactorNettyClient.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/client/ReactorNettyClient.java index 81cb5f21e..e8e3b04d7 100644 --- a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/client/ReactorNettyClient.java +++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/client/ReactorNettyClient.java @@ -41,6 +41,7 @@ import reactor.core.publisher.SynchronousSink; import reactor.netty.Connection; import reactor.netty.FutureMono; +import reactor.netty.tcp.TcpClient; import reactor.util.context.Context; import reactor.util.context.ContextView; @@ -54,7 +55,7 @@ /** * An implementation of client based on the Reactor Netty project. */ -final class ReactorNettyClient implements Client { +public final class ReactorNettyClient implements Client { private static final InternalLogger logger = InternalLoggerFactory.getInstance(ReactorNettyClient.class); @@ -250,12 +251,10 @@ public boolean isConnected() { return state < ST_CLOSED && connection.channel().isOpen(); } - @Override public void sslUnsupported() { connection.channel().pipeline().fireUserEventTriggered(SslState.UNSUPPORTED); } - @Override public void loginSuccess() { if (context.getCapability().isCompression()) { connection.channel().pipeline().fireUserEventTriggered(PacketEvent.USE_COMPRESSION); @@ -264,6 +263,14 @@ public void loginSuccess() { } } + boolean isClosingOrClosed() { + return state >= ST_CLOSING; + } + + boolean isChannelOpen() { + return connection.channel().isOpen(); + } + private static void resetSequence(Connection connection) { connection.channel().pipeline().fireUserEventTriggered(PacketEvent.RESET_SEQUENCE); } @@ -324,6 +331,23 @@ private void handleClose() { } } + /** + * Connects to a MySQL server using the provided {@link TcpClient} and {@link MySqlSslConfiguration}. + * + * @param tcpClient the configured TCP client + * @param ssl the SSL configuration + * @param context the connection context + * @return A {@link Mono} that will emit a connected {@link Client}. + * @throws IllegalArgumentException if {@code tcpClient}, {@code ssl} or {@code context} is {@code null}. + */ + public static Mono connect(TcpClient tcpClient, MySqlSslConfiguration ssl, ConnectionContext context) { + requireNonNull(tcpClient, "tcpClient must not be null"); + requireNonNull(ssl, "ssl must not be null"); + requireNonNull(context, "context must not be null"); + + return tcpClient.connect().map(conn -> new ReactorNettyClient(conn, ssl, context)); + } + private final class ResponseSubscriber implements CoreSubscriber { private final ResponseSink sink; diff --git a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/constant/HaProtocol.java b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/constant/HaProtocol.java index c54cd8923..3adc296ab 100644 --- a/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/constant/HaProtocol.java +++ b/r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/constant/HaProtocol.java @@ -62,8 +62,8 @@ public enum HaProtocol { *

* Using: I want to use the first node for read-write if connection is set to read-write, and other nodes if * connection is set to read-only. R2DBC can not set a {@link io.r2dbc.spi.Connection Connection} to read-only mode. - * So it will always use the first host. R2DBC does not recommend this mutability. Perhaps in the future, R2DBC will - * support using read-only mode to create a connection instead of modifying an existing connection. + * So it will always use the first host. Perhaps in the future, R2DBC will support using read-only mode to create a + * connection instead of modifying an existing connection. *

* Reconnect: I want to reconnect to the current node if the current node is unavailable and * {@code autoReconnect=true}.