Skip to content

Commit

Permalink
Temp commit: Add support for failover
Browse files Browse the repository at this point in the history
  • Loading branch information
mirromutth committed Mar 22, 2024
1 parent 487fdd6 commit 9c49e9a
Show file tree
Hide file tree
Showing 11 changed files with 456 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.asyncer.r2dbc.mysql;

import io.asyncer.r2dbc.mysql.client.Client;
import io.asyncer.r2dbc.mysql.client.ReactorNettyClient;
import io.asyncer.r2dbc.mysql.constant.CompressionAlgorithm;
import io.asyncer.r2dbc.mysql.constant.SslMode;
import io.netty.channel.ChannelOption;
Expand All @@ -27,13 +28,17 @@
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import io.r2dbc.spi.R2dbcNonTransientResourceException;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Mono;
import reactor.netty.resources.LoopResources;
import reactor.netty.tcp.TcpClient;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;

/**
* An interface of a connection strategy that considers how to obtain a MySQL {@link Client} object.
Expand All @@ -50,7 +55,7 @@ interface ConnectionStrategy {
*
* @return a logged-in {@link Client} object.
*/
Mono<Client> connect();
Mono<? extends Client> connect();

/**
* Creates a general-purpose {@link TcpClient} with the given {@link SocketClientConfiguration}.
Expand Down Expand Up @@ -88,7 +93,7 @@ static TcpClient createTcpClient(SocketClientConfiguration configuration, boolea
* @param configuration a configuration that affects login behavior.
* @return a logged-in {@link Client} object.
*/
static Mono<Client> login(
static Mono<ReactorNettyClient> login(
TcpClient tcpClient,
Credential credential,
MySqlConnectionConfiguration configuration
Expand All @@ -108,33 +113,91 @@ static Mono<Client> login(
configuration.retrieveConnectionZoneId()
);

return Client.connect(tcpClient, ssl, context).flatMap(client ->
return ReactorNettyClient.connect(tcpClient, ssl, context).flatMap(client ->
QueryFlow.login(client, sslMode, loginDb, credential, compressionAlgorithms, zstdLevel));
}
}

/**
* Resolves the {@link InetSocketAddress} to IP address, randomly pick one if it resolves to multiple IP addresses.
*
* @since 1.2.0
*/
final class BalancedResolverGroup extends AddressResolverGroup<InetSocketAddress> {

BalancedResolverGroup() {
/**
* Creates an exception that indicates a retry failure.
*
* @param message the message of the exception.
* @param cause the last exception that caused the retry.
* @return a retry failure exception.
*/
static R2dbcNonTransientResourceException retryFail(String message, @Nullable Throwable cause) {
return new R2dbcNonTransientResourceException(
message,
"H1000",
9000,
cause
);
}

public static final BalancedResolverGroup INSTANCE;
/**
* Connect and login to a MySQL server with a specific TCP socket address.
*
* @since 1.2.0
*/
final class InetConnectFunction implements Function<Supplier<InetSocketAddress>, Mono<ReactorNettyClient>> {

private final boolean balancedDns;

private final boolean tcpKeepAlive;

private final boolean tcpNoDelay;

static {
INSTANCE = new BalancedResolverGroup();
Runtime.getRuntime().addShutdownHook(new Thread(
INSTANCE::close,
"R2DBC-MySQL-BalancedResolverGroup-ShutdownHook"
));
private final Credential credential;

private final MySqlConnectionConfiguration configuration;

InetConnectFunction(
boolean balancedDns,
boolean tcpKeepAlive,
boolean tcpNoDelay,
Credential credential,
MySqlConnectionConfiguration configuration
) {
this.balancedDns = balancedDns;
this.tcpKeepAlive = tcpKeepAlive;
this.tcpNoDelay = tcpNoDelay;
this.credential = credential;
this.configuration = configuration;
}

@Override
public Mono<ReactorNettyClient> apply(Supplier<InetSocketAddress> address) {
TcpClient cc = ConnectionStrategy.createTcpClient(configuration.getClient(), balancedDns)
.option(ChannelOption.SO_KEEPALIVE, tcpKeepAlive)
.option(ChannelOption.TCP_NODELAY, tcpNoDelay)
.remoteAddress(address);

return ConnectionStrategy.login(cc, credential, configuration);
}
}

@Override
protected AddressResolver<InetSocketAddress> newResolver(EventExecutor executor) {
return new RoundRobinInetAddressResolver(executor, new DefaultNameResolver(executor)).asAddressResolver();
/**
* Resolves the {@link InetSocketAddress} to IP address, randomly pick one if it resolves to multiple IP addresses.
*
* @since 1.2.0
*/
final class BalancedResolverGroup extends AddressResolverGroup<InetSocketAddress> {

BalancedResolverGroup() {
}

public static final BalancedResolverGroup INSTANCE;

static {
INSTANCE = new BalancedResolverGroup();
Runtime.getRuntime().addShutdownHook(new Thread(
INSTANCE::close,
"R2DBC-MySQL-BalancedResolverGroup-ShutdownHook"
));
}

@Override
protected AddressResolver<InetSocketAddress> newResolver(EventExecutor executor) {
return new RoundRobinInetAddressResolver(executor, new DefaultNameResolver(executor)).asAddressResolver();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,17 @@
package io.asyncer.r2dbc.mysql;

import io.asyncer.r2dbc.mysql.client.Client;
import io.asyncer.r2dbc.mysql.client.FailoverClient;
import io.asyncer.r2dbc.mysql.client.ReactorNettyClient;
import io.asyncer.r2dbc.mysql.constant.ProtocolDriver;
import io.asyncer.r2dbc.mysql.internal.NodeAddress;
import io.asyncer.r2dbc.mysql.internal.util.InternalArrays;
import io.netty.channel.ChannelOption;
import io.netty.resolver.DefaultNameResolver;
import io.netty.resolver.NameResolver;
import io.netty.util.concurrent.Future;
import io.r2dbc.spi.R2dbcNonTransientResourceException;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.resources.LoopResources;
import reactor.netty.tcp.TcpClient;
import reactor.netty.tcp.TcpResources;

import java.net.InetAddress;
Expand All @@ -46,101 +44,152 @@
*/
final class MultiHostsConnectionStrategy implements ConnectionStrategy {

private final Mono<Client> client;
private final Mono<? extends Client> client;

MultiHostsConnectionStrategy(
TcpSocketConfiguration tcp,
MySqlConnectionConfiguration configuration,
boolean shuffle
List<NodeAddress> addresses,
ProtocolDriver driver,
int retriesAllDown,
boolean shuffle,
boolean tcpKeepAlive,
boolean tcpNoDelay
) {
this.client = Mono.defer(() -> {
if (ProtocolDriver.DNS_SRV.equals(tcp.getDriver())) {
Mono<ReactorNettyClient> client = configuration.getCredential().flatMap(credential -> {
if (ProtocolDriver.DNS_SRV.equals(driver)) {
logger.debug("Resolve hosts via DNS SRV: {}", addresses);

LoopResources resources = configuration.getClient().getLoopResources();
LoopResources loopResources = resources == null ? TcpResources.get() : resources;

return resolveAllHosts(loopResources, tcp.getAddresses(), shuffle)
.flatMap(addresses -> connectHost(addresses, tcp, configuration, false, shuffle, 0));
InetConnectFunction login = new InetConnectFunction(
false,
tcpKeepAlive,
tcpNoDelay,
credential,
configuration
);

return resolveAllHosts(loopResources, addresses, shuffle).flatMap(addrs -> {
logger.debug("Connect to multiple addresses: {}", addrs);

return connectHost(
addrs,
login,
shuffle,
0,
retriesAllDown
);
});
} else {
List<NodeAddress> availableHosts = copyAvailableAddresses(tcp.getAddresses(), shuffle);
List<NodeAddress> availableHosts = copyAvailableAddresses(addresses, shuffle);
logger.debug("Connect to multiple hosts: {}", availableHosts);

int size = availableHosts.size();
InetSocketAddress[] addresses = new InetSocketAddress[availableHosts.size()];
InetSocketAddress[] array = new InetSocketAddress[availableHosts.size()];

for (int i = 0; i < size; i++) {
NodeAddress address = availableHosts.get(i);
addresses[i] = InetSocketAddress.createUnresolved(address.getHost(), address.getPort());
array[i] = availableHosts.get(i).toUnresolved();
}

return connectHost(InternalArrays.asImmutableList(addresses), tcp, configuration, true, shuffle, 0);
List<InetSocketAddress> addrs = InternalArrays.asImmutableList(array);
InetConnectFunction login = new InetConnectFunction(
true,
tcpKeepAlive,
tcpNoDelay,
credential,
configuration
);

return connectHost(
addrs,
login,
shuffle,
0,
retriesAllDown
);
}
});

this.client = client.map(c -> new FailoverClient(c, client));
}

@Override
public Mono<Client> connect() {
public Mono<? extends Client> connect() {
return client;
}

private Mono<Client> connectHost(
private static Mono<ReactorNettyClient> connectHost(
List<InetSocketAddress> addresses,
TcpSocketConfiguration tcp,
MySqlConnectionConfiguration configuration,
boolean balancedDns,
InetConnectFunction login,
boolean shuffle,
int attempts
int attempts,
int maxAttempts
) {
Iterator<InetSocketAddress> iter = addresses.iterator();

if (!iter.hasNext()) {
return Mono.error(fail("Fail to establish connection: no available host", null));
return Mono.error(ConnectionStrategy.retryFail("Fail to establish connection: no available host", null));
}

return attemptConnect(iter.next(), tcp, configuration, balancedDns).onErrorResume(t ->
resumeConnect(t, addresses, iter, tcp, configuration, balancedDns, shuffle, attempts));
InetSocketAddress address = iter.next();

return login.apply(() -> address).onErrorResume(error -> resumeConnect(
error,
address,
addresses,
iter,
login,
shuffle,
attempts,
maxAttempts
));
}

private Mono<Client> resumeConnect(
private static Mono<ReactorNettyClient> resumeConnect(
Throwable t,
InetSocketAddress failed,
List<InetSocketAddress> addresses,
Iterator<InetSocketAddress> iter,
TcpSocketConfiguration tcp,
MySqlConnectionConfiguration configuration,
boolean balancedDns,
InetConnectFunction login,
boolean shuffle,
int attempts
int attempts,
int maxAttempts
) {
logger.warn("Fail to connect to {}", failed, t);

if (!iter.hasNext()) {
// The last host failed to connect
if (attempts >= tcp.getRetriesAllDown()) {
return Mono.error(fail(
"Fail to establish connection, retried " + attempts + " times: " + t.getMessage(), t));
if (attempts >= maxAttempts) {
return Mono.error(ConnectionStrategy.retryFail(
"Fail to establish connections, retried " + attempts + " times", t));
}

logger.warn("All hosts failed to establish connections, auto-try again after 250ms.");
logger.warn("All hosts failed to establish connections, auto-try again after 250ms.", t);

// Ignore waiting error, e.g. interrupted, scheduler rejected
return Mono.delay(Duration.ofMillis(250))
.onErrorComplete()
.then(Mono.defer(() -> connectHost(addresses, tcp, configuration, balancedDns, shuffle, attempts + 1)));
.then(Mono.defer(() -> connectHost(
addresses,
login,
shuffle,
attempts + 1,
maxAttempts
)));
}

return attemptConnect(iter.next(), tcp, configuration, balancedDns).onErrorResume(tt ->
resumeConnect(tt, addresses, iter, tcp, configuration, balancedDns, shuffle, attempts));
}

private Mono<Client> attemptConnect(
InetSocketAddress address,
TcpSocketConfiguration tcp,
MySqlConnectionConfiguration configuration,
boolean balancedDns
) {
return configuration.getCredential().flatMap(credential -> {
TcpClient tcpClient = ConnectionStrategy.createTcpClient(configuration.getClient(), balancedDns)
.option(ChannelOption.SO_KEEPALIVE, tcp.isTcpKeepAlive())
.option(ChannelOption.TCP_NODELAY, tcp.isTcpNoDelay())
.remoteAddress(() -> address);

return ConnectionStrategy.login(tcpClient, credential, configuration);
}).doOnError(e -> logger.warn("Fail to connect: ", e));
InetSocketAddress address = iter.next();

return login.apply(() -> address).onErrorResume(error -> resumeConnect(
error,
address,
addresses,
iter,
login,
shuffle,
attempts,
maxAttempts
));
}

private static Mono<List<InetSocketAddress>> resolveAllHosts(
Expand Down Expand Up @@ -199,13 +248,4 @@ private static List<NodeAddress> copyAvailableAddresses(List<NodeAddress> addres

return InternalArrays.asImmutableList(addresses.toArray(new NodeAddress[0]));
}

private static R2dbcNonTransientResourceException fail(String message, @Nullable Throwable cause) {
return new R2dbcNonTransientResourceException(
message,
"H1000",
9000,
cause
);
}
}
Loading

0 comments on commit 9c49e9a

Please sign in to comment.