From fb5886e836612319c8365f231f203a1e2a544f7f Mon Sep 17 00:00:00 2001 From: Maciej Kulawik Date: Mon, 18 Nov 2024 16:12:25 +0100 Subject: [PATCH] broadcastclient: check if compression was negotiated when connecting to feed server --- broadcastclient/broadcastclient.go | 30 +++++++++++++++++++++++++----- wsbroadcastserver/utils.go | 2 +- 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/broadcastclient/broadcastclient.go b/broadcastclient/broadcastclient.go index ac684902e4..c4a3743276 100644 --- a/broadcastclient/broadcastclient.go +++ b/broadcastclient/broadcastclient.go @@ -130,9 +130,10 @@ type BroadcastClient struct { chainId uint64 - // Protects conn and shuttingDown - connMutex sync.Mutex - conn net.Conn + // Protects conn, shuttingDown and compression + connMutex sync.Mutex + conn net.Conn + compression bool retryCount atomic.Int64 @@ -299,7 +300,7 @@ func (bc *BroadcastClient) connect(ctx context.Context, nextSeqNum arbutil.Messa return nil, nil } - conn, br, _, err := timeoutDialer.Dial(ctx, bc.websocketUrl) + conn, br, hs, err := timeoutDialer.Dial(ctx, bc.websocketUrl) if errors.Is(err, ErrIncorrectFeedServerVersion) || errors.Is(err, ErrIncorrectChainId) { return nil, err } @@ -325,6 +326,24 @@ func (bc *BroadcastClient) connect(ctx context.Context, nextSeqNum arbutil.Messa return nil, ErrMissingFeedServerVersion } + compressionNegotiated := false + for _, ext := range hs.Extensions { + if ext.Equal(deflateExt) { + compressionNegotiated = true + break + } + } + if !compressionNegotiated && config.EnableCompression { + log.Warn("Compression was not negotiated when connecting to feed server.") + } + if compressionNegotiated && !config.EnableCompression { + err := conn.Close() + if err != nil { + return nil, fmt.Errorf("error closing connection when negotiated disabled extension: %w", err) + } + return nil, errors.New("error dialing feed server: negotiated compression ws extension, but it is disabled") + } + var earlyFrameData io.Reader if br != nil { // Depending on how long the client takes to read the response, there may be @@ -339,6 +358,7 @@ func (bc *BroadcastClient) connect(ctx context.Context, nextSeqNum arbutil.Messa bc.connMutex.Lock() bc.conn = conn + bc.compression = compressionNegotiated bc.connMutex.Unlock() log.Info("Feed connected", "feedServerVersion", feedServerVersion, "chainId", chainId, "requestedSeqNum", nextSeqNum) @@ -362,7 +382,7 @@ func (bc *BroadcastClient) startBackgroundReader(earlyFrameData io.Reader) { var op ws.OpCode var err error config := bc.config() - msg, op, err = wsbroadcastserver.ReadData(ctx, bc.conn, earlyFrameData, config.Timeout, ws.StateClientSide, config.EnableCompression, flateReader) + msg, op, err = wsbroadcastserver.ReadData(ctx, bc.conn, earlyFrameData, config.Timeout, ws.StateClientSide, bc.compression, flateReader) if err != nil { if bc.isShuttingDown() { return diff --git a/wsbroadcastserver/utils.go b/wsbroadcastserver/utils.go index 1e72915047..40ceb3e5bf 100644 --- a/wsbroadcastserver/utils.go +++ b/wsbroadcastserver/utils.go @@ -137,7 +137,7 @@ func ReadData(ctx context.Context, conn net.Conn, earlyFrameData io.Reader, time var data []byte if msg.IsCompressed() { if !compression { - return nil, 0, errors.New("Received compressed frame even though compression is disabled") + return nil, 0, errors.New("Received compressed frame even though compression extension wasn't negotiated") } flateReader.Reset(&reader) data, err = io.ReadAll(flateReader)