Skip to content

Commit

Permalink
Move ping command to QueryFlow
Browse files Browse the repository at this point in the history
  • Loading branch information
mirromutth committed Apr 1, 2024
1 parent 2f3cc22 commit ea1881b
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import io.asyncer.r2dbc.mysql.client.Client;
import io.asyncer.r2dbc.mysql.codec.Codecs;
import io.asyncer.r2dbc.mysql.internal.util.StringUtils;
import io.asyncer.r2dbc.mysql.message.client.PingMessage;
import io.asyncer.r2dbc.mysql.message.server.CompleteMessage;
import io.asyncer.r2dbc.mysql.message.server.ErrorMessage;
import io.asyncer.r2dbc.mysql.message.server.ServerMessage;
Expand All @@ -38,12 +37,9 @@
import io.r2dbc.spi.ValidationDepth;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SynchronousSink;

import java.time.Duration;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Predicate;

Expand All @@ -66,27 +62,14 @@ final class MySqlSimpleConnection implements MySqlConnection {

if (message instanceof ErrorMessage) {
ErrorMessage msg = (ErrorMessage) message;
logger.debug("Remote validate failed: [{}] [{}] {}", msg.getCode(), msg.getSqlState(),
msg.getMessage());
logger.debug("Remote validate failed: [{}] [{}] {}", msg.getCode(), msg.getSqlState(), msg.getMessage());
} else {
ReferenceCountUtil.safeRelease(message);
}

return false;
};

private static final BiConsumer<ServerMessage, SynchronousSink<ServerMessage>> PING = (message, sink) -> {
if (message instanceof ErrorMessage) {
sink.next(message);
sink.complete();
} else if (message instanceof CompleteMessage && ((CompleteMessage) message).isDone()) {
sink.next(message);
sink.complete();
} else {
ReferenceCountUtil.safeRelease(message);
}
};

private final Client client;

private final Codecs codecs;
Expand Down Expand Up @@ -266,9 +249,9 @@ public Mono<Boolean> validate(ValidationDepth depth) {
return Mono.just(false);
}

return doPingInternal(client)
.last()
return QueryFlow.ping(client)
.map(VALIDATE)
.last()
.onErrorResume(e -> {
// `last` maybe emit a NoSuchElementException, exchange maybe emit exception by Netty.
// But should NEVER emit any exception, so logging exception and emit false.
Expand Down Expand Up @@ -334,8 +317,4 @@ public Mono<Void> setStatementTimeout(Duration timeout) {
ConnectionContext context() {
return client.getContext();
}

static Flux<ServerMessage> doPingInternal(Client client) {
return client.exchange(PingMessage.INSTANCE, PING);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public Flux<MySqlResult> execute() {
client,
codecs,
null,
MySqlSimpleConnection.doPingInternal(client)
QueryFlow.ping(client)
)));
}
}
23 changes: 23 additions & 0 deletions r2dbc-mysql/src/main/java/io/asyncer/r2dbc/mysql/QueryFlow.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Predicate;

Expand All @@ -86,6 +87,18 @@ final class QueryFlow {
}
};

private static final BiConsumer<ServerMessage, SynchronousSink<ServerMessage>> PING = (message, sink) -> {
if (message instanceof ErrorMessage) {
sink.next(message);
sink.complete();
} else if (message instanceof CompleteMessage && ((CompleteMessage) message).isDone()) {
sink.next(message);
sink.complete();
} else {
ReferenceCountUtil.safeRelease(message);
}
};

/**
* Execute multiple bindings of a server-preparing statement with one-by-one binary execution. The execution
* terminates with the last {@link CompleteMessage} or a {@link ErrorMessage}. If client receives a
Expand Down Expand Up @@ -252,6 +265,16 @@ static Mono<Void> createSavepoint(Client client, String name, boolean batchSuppo
return client.exchange(new TransactionMultiExchangeable(savepointState)).then();
}

/**
* Executes a ping command to the server.
*
* @param client the {@link Client} to exchange messages with.
* @return complete or error messages received in response to this exchange.
*/
static Flux<ServerMessage> ping(Client client) {
return client.exchange(PingMessage.INSTANCE, PING);
}

/**
* Sets a session variable to the server.
*
Expand Down

0 comments on commit ea1881b

Please sign in to comment.