diff --git a/proxy/src/main/java/net/md_5/bungee/ServerConnector.java b/proxy/src/main/java/net/md_5/bungee/ServerConnector.java index b58a01c6dd..6a9f96fcae 100644 --- a/proxy/src/main/java/net/md_5/bungee/ServerConnector.java +++ b/proxy/src/main/java/net/md_5/bungee/ServerConnector.java @@ -161,6 +161,7 @@ public void handle(PacketWrapper packet) throws Exception public void handle(LoginSuccess loginSuccess) throws Exception { Preconditions.checkState( thisState == State.LOGIN_SUCCESS, "Not expecting LOGIN_SUCCESS" ); + user.getCh().setConsolidate( false ); if ( user.getPendingConnection().getVersion() >= ProtocolConstants.MINECRAFT_1_20_2 ) { ServerConnection server = new ServerConnection( ch, target ); diff --git a/proxy/src/main/java/net/md_5/bungee/connection/DownstreamBridge.java b/proxy/src/main/java/net/md_5/bungee/connection/DownstreamBridge.java index 64e9a7641e..b4330c05de 100644 --- a/proxy/src/main/java/net/md_5/bungee/connection/DownstreamBridge.java +++ b/proxy/src/main/java/net/md_5/bungee/connection/DownstreamBridge.java @@ -799,9 +799,21 @@ public void handle(Login login) throws Exception receivedLogin = true; ServerConnector.handleLogin( bungee, server.getCh(), con, server.getInfo(), null, server, login ); + con.getCh().setConsolidate( true ); + server.getCh().setConsolidate( true ); + throw CancelSendSignal.INSTANCE; } + @Override + public void channelReadComplete(ChannelWrapper channel) throws Exception + { + if ( con.isConnected() ) + { + con.getCh().forceFlush(); + } + } + @Override public String toString() { diff --git a/proxy/src/main/java/net/md_5/bungee/connection/UpstreamBridge.java b/proxy/src/main/java/net/md_5/bungee/connection/UpstreamBridge.java index 89ce8a5f70..dd76044646 100644 --- a/proxy/src/main/java/net/md_5/bungee/connection/UpstreamBridge.java +++ b/proxy/src/main/java/net/md_5/bungee/connection/UpstreamBridge.java @@ -385,6 +385,15 @@ public void handle(LoginPayloadResponse loginPayloadResponse) throws Exception con.getPendingConnection().handle( loginPayloadResponse ); } + @Override + public void channelReadComplete(ChannelWrapper channel) throws Exception + { + if ( con.getServer() != null && con.getServer().isConnected() ) + { + con.getServer().getCh().forceFlush(); + } + } + @Override public String toString() { diff --git a/proxy/src/main/java/net/md_5/bungee/netty/ChannelWrapper.java b/proxy/src/main/java/net/md_5/bungee/netty/ChannelWrapper.java index a7627d4016..9378527e21 100644 --- a/proxy/src/main/java/net/md_5/bungee/netty/ChannelWrapper.java +++ b/proxy/src/main/java/net/md_5/bungee/netty/ChannelWrapper.java @@ -24,6 +24,7 @@ public class ChannelWrapper { + private static final int MAX_CONSOLIDATION = Integer.getInteger( "net.md_5.bungee.flush-consolidation-limit", 20 ); private final Channel ch; @Getter @@ -33,6 +34,8 @@ public class ChannelWrapper private volatile boolean closed; @Getter private volatile boolean closing; + private boolean consolidate; + private int consolidationCounter; public ChannelWrapper(ChannelHandlerContext ctx) { @@ -40,6 +43,12 @@ public ChannelWrapper(ChannelHandlerContext ctx) this.remoteAddress = ( this.ch.remoteAddress() == null ) ? this.ch.parent().localAddress() : this.ch.remoteAddress(); } + public void setConsolidate(boolean enabled) + { + consolidate = enabled; + forceFlush(); + } + public Protocol getDecodeProtocol() { return getMinecraftDecoder().getProtocol(); @@ -57,6 +66,8 @@ public Protocol getEncodeProtocol() public void setEncodeProtocol(Protocol protocol) { + // before changing the encoder protocol we should always flush to ensure no wrong states + forceFlush(); getMinecraftEncoder().setProtocol( protocol ); } @@ -89,6 +100,15 @@ public int getEncodeVersion() public void write(Object packet) { + // ensure netty context to for less context schedules + // by default we are mostly in netty context anyway, but plugins can use ProxiedPlayer.unsafe().sendPacket() + // in non netty context + if ( !ch.eventLoop().inEventLoop() ) + { + ch.eventLoop().execute( () -> write( packet ) ); + return; + } + if ( !closed ) { DefinedPacket defined = null; @@ -96,11 +116,11 @@ public void write(Object packet) { PacketWrapper wrapper = (PacketWrapper) packet; wrapper.setReleased( true ); - ch.writeAndFlush( wrapper.buf, ch.voidPromise() ); + ch.write( wrapper.buf, ch.voidPromise() ); defined = wrapper.packet; } else { - ch.writeAndFlush( packet, ch.voidPromise() ); + ch.write( packet, ch.voidPromise() ); if ( packet instanceof DefinedPacket ) { defined = (DefinedPacket) packet; @@ -113,11 +133,32 @@ public void write(Object packet) if ( nextProtocol != null ) { setEncodeProtocol( nextProtocol ); + return; } } + signalFlush(); + } + } + + + + private void signalFlush() + { + if ( !consolidate || consolidationCounter++ >= MAX_CONSOLIDATION ) + { + forceFlush(); } + } + public void forceFlush() + { + ch.flush(); + consolidationCounter = 0; + } + + + public void markClosed() { closed = closing = true; diff --git a/proxy/src/main/java/net/md_5/bungee/netty/HandlerBoss.java b/proxy/src/main/java/net/md_5/bungee/netty/HandlerBoss.java index d82173b1ea..5e3cc53082 100644 --- a/proxy/src/main/java/net/md_5/bungee/netty/HandlerBoss.java +++ b/proxy/src/main/java/net/md_5/bungee/netty/HandlerBoss.java @@ -74,6 +74,7 @@ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exceptio if ( handler != null ) { handler.writabilityChanged( channel ); + channel.forceFlush(); } } @@ -142,6 +143,15 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception } } + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception + { + if ( handler != null ) + { + handler.channelReadComplete( channel ); + } + } + @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { diff --git a/proxy/src/main/java/net/md_5/bungee/netty/PacketHandler.java b/proxy/src/main/java/net/md_5/bungee/netty/PacketHandler.java index 7bd223d9ca..619f67920d 100644 --- a/proxy/src/main/java/net/md_5/bungee/netty/PacketHandler.java +++ b/proxy/src/main/java/net/md_5/bungee/netty/PacketHandler.java @@ -32,4 +32,8 @@ public void disconnected(ChannelWrapper channel) throws Exception public void writabilityChanged(ChannelWrapper channel) throws Exception { } + + public void channelReadComplete(ChannelWrapper channel) throws Exception + { + } }