From 0da9c74bf14c92afc17dff4de5c17e6a4003a326 Mon Sep 17 00:00:00 2001 From: Jacky Date: Wed, 5 Feb 2025 16:45:03 +0800 Subject: [PATCH] refactor: websocket error handler --- api/analytic/analytic.go | 15 ++++++++------ api/analytic/nodes.go | 18 +++++++++++------ api/nginx_log/nginx_log.go | 6 ++++-- api/upstream/upstream.go | 6 ++++-- app/src/views/dashboard/ServerAnalytic.vue | 4 ---- internal/analytic/node_record.go | 23 ++++++++-------------- internal/helper/websocket_error.go | 11 ++++------- internal/pty/pipeline.go | 12 +++++++---- 8 files changed, 49 insertions(+), 46 deletions(-) diff --git a/api/analytic/analytic.go b/api/analytic/analytic.go index 4fff8b6dd..e267f230b 100644 --- a/api/analytic/analytic.go +++ b/api/analytic/analytic.go @@ -2,6 +2,10 @@ package analytic import ( "fmt" + "net/http" + "runtime" + "time" + "github.com/0xJacky/Nginx-UI/internal/analytic" "github.com/0xJacky/Nginx-UI/internal/helper" "github.com/shirou/gopsutil/v4/cpu" @@ -10,9 +14,6 @@ import ( "github.com/shirou/gopsutil/v4/net" "github.com/spf13/cast" "github.com/uozi-tech/cosy/logger" - "net/http" - "runtime" - "time" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" @@ -86,12 +87,14 @@ func Analytic(c *gin.Context) { // write err = ws.WriteJSON(stat) - if helper.IsUnexpectedWebsocketError(err) { - logger.Error(err) + if err != nil { + if helper.IsUnexpectedWebsocketError(err) { + logger.Error(err) + } break } - time.Sleep(1000 * time.Microsecond) + time.Sleep(1 * time.Second) } } diff --git a/api/analytic/nodes.go b/api/analytic/nodes.go index 8d4fe9f22..c67f9139d 100644 --- a/api/analytic/nodes.go +++ b/api/analytic/nodes.go @@ -1,13 +1,14 @@ package analytic import ( + "net/http" + "time" + "github.com/0xJacky/Nginx-UI/internal/analytic" "github.com/0xJacky/Nginx-UI/internal/helper" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" "github.com/uozi-tech/cosy/logger" - "net/http" - "time" ) func GetNodeStat(c *gin.Context) { @@ -28,8 +29,10 @@ func GetNodeStat(c *gin.Context) { for { // write err = ws.WriteJSON(analytic.GetNodeStat()) - if helper.IsUnexpectedWebsocketError(err) { - logger.Error(err) + if err != nil { + if helper.IsUnexpectedWebsocketError(err) { + logger.Error(err) + } break } @@ -55,11 +58,14 @@ func GetNodesAnalytic(c *gin.Context) { for { // write err = ws.WriteJSON(analytic.NodeMap) - if helper.IsUnexpectedWebsocketError(err) { - logger.Error(err) + if err != nil { + if helper.IsUnexpectedWebsocketError(err) { + logger.Error(err) + } break } time.Sleep(10 * time.Second) + logger.Debug("[analytic nodes] sleep 10 seconds") } } diff --git a/api/nginx_log/nginx_log.go b/api/nginx_log/nginx_log.go index 078c4e956..192a12af7 100644 --- a/api/nginx_log/nginx_log.go +++ b/api/nginx_log/nginx_log.go @@ -253,8 +253,10 @@ func tailNginxLog(ws *websocket.Conn, controlChan chan controlStruct, errChan ch } err = ws.WriteMessage(websocket.TextMessage, []byte(line.Text)) - if helper.IsUnexpectedWebsocketError(err) { - errChan <- errors.Wrap(err, "error tailNginxLog write message") + if err != nil { + if helper.IsUnexpectedWebsocketError(err) { + errChan <- errors.Wrap(err, "error tailNginxLog write message") + } return } case control = <-controlChan: diff --git a/api/upstream/upstream.go b/api/upstream/upstream.go index 14c39b2a9..0cead0007 100644 --- a/api/upstream/upstream.go +++ b/api/upstream/upstream.go @@ -36,8 +36,10 @@ func AvailabilityTest(c *gin.Context) { for { err = ws.WriteJSON(upstream.AvailabilityTest(body)) - if helper.IsUnexpectedWebsocketError(err) { - logger.Error(err) + if err != nil { + if helper.IsUnexpectedWebsocketError(err) { + logger.Error(err) + } break } diff --git a/app/src/views/dashboard/ServerAnalytic.vue b/app/src/views/dashboard/ServerAnalytic.vue index ccba53c8f..abd8815f0 100644 --- a/app/src/views/dashboard/ServerAnalytic.vue +++ b/app/src/views/dashboard/ServerAnalytic.vue @@ -90,10 +90,6 @@ onMounted(() => { }) }) -onUnmounted(() => { - websocket.close() -}) - function handle_uptime(t: number) { // uptime let _uptime = Math.floor(t) diff --git a/internal/analytic/node_record.go b/internal/analytic/node_record.go index 77706460d..6f62d3399 100644 --- a/internal/analytic/node_record.go +++ b/internal/analytic/node_record.go @@ -43,7 +43,9 @@ func RetrieveNodesStatus() { defer logger.Info("RetrieveNodesStatus exited") mutex.Lock() - NodeMap = make(TNodeMap) + if NodeMap == nil { + NodeMap = make(TNodeMap) + } mutex.Unlock() env := query.Environment @@ -70,6 +72,11 @@ func RetrieveNodesStatus() { default: if err := nodeAnalyticRecord(e, ctx); err != nil { logger.Error(err) + if NodeMap[env.ID] != nil { + mutex.Lock() + NodeMap[env.ID].Status = false + mutex.Unlock() + } select { case <-retryTicker.C: case <-ctx.Done(): @@ -88,14 +95,11 @@ func nodeAnalyticRecord(env *model.Environment, ctx context.Context) error { scopeCtx, cancel := context.WithCancel(ctx) defer cancel() - logger.Debug("nodeAnalyticRecord") node, err := InitNode(env) mutex.Lock() - logger.Debug("lock") NodeMap[env.ID] = node mutex.Unlock() - logger.Debug("unlock") if err != nil { return err @@ -125,27 +129,16 @@ func nodeAnalyticRecord(env *model.Environment, ctx context.Context) error { go func() { <-scopeCtx.Done() _ = c.Close() - logger.Debug("close") }() var nodeStat NodeStat - defer func() { - if NodeMap[env.ID] != nil { - mutex.Lock() - NodeMap[env.ID].Status = false - mutex.Unlock() - } - }() - for { err = c.ReadJSON(&nodeStat) if err != nil { return err } - logger.Debug("nodeStat", nodeStat) - // set online nodeStat.Status = true nodeStat.ResponseAt = time.Now() diff --git a/internal/helper/websocket_error.go b/internal/helper/websocket_error.go index cb9b600a1..bde926237 100644 --- a/internal/helper/websocket_error.go +++ b/internal/helper/websocket_error.go @@ -1,24 +1,21 @@ package helper import ( - "strings" - "github.com/gorilla/websocket" "errors" + "github.com/gorilla/websocket" + "strings" "syscall" ) +// IsUnexpectedWebsocketError checks if the error is an unexpected websocket error func IsUnexpectedWebsocketError(err error) bool { - // nil error is an expected error - if err == nil { - return false - } // ignore: write: broken pipe if errors.Is(err, syscall.EPIPE) { return false } // client closed error: *net.OpErr if strings.Contains(err.Error(), "An existing connection was forcibly closed by the remote host") { - return true + return false } return websocket.IsUnexpectedCloseError(err, diff --git a/internal/pty/pipeline.go b/internal/pty/pipeline.go index 422a33edc..50a8b21c4 100644 --- a/internal/pty/pipeline.go +++ b/internal/pty/pipeline.go @@ -47,8 +47,10 @@ func NewPipeLine(conn *websocket.Conn) (p *Pipeline, err error) { func (p *Pipeline) ReadWsAndWritePty(errorChan chan error) { for { msgType, payload, err := p.ws.ReadMessage() - if helper.IsUnexpectedWebsocketError(err) { - errorChan <- errors.Wrap(err, "Error ReadWsAndWritePty unexpected close") + if err != nil { + if helper.IsUnexpectedWebsocketError(err) { + errorChan <- errors.Wrap(err, "Error ReadWsAndWritePty unexpected close") + } return } if msgType != websocket.TextMessage { @@ -117,8 +119,10 @@ func (p *Pipeline) ReadPtyAndWriteWs(errorChan chan error) { } processedOutput := validString(string(buf[:n])) err = p.ws.WriteMessage(websocket.TextMessage, []byte(processedOutput)) - if helper.IsUnexpectedWebsocketError(err) { - errorChan <- errors.Wrap(err, "Error ReadPtyAndWriteWs websocket write") + if err != nil { + if helper.IsUnexpectedWebsocketError(err) { + errorChan <- errors.Wrap(err, "Error ReadPtyAndWriteWs websocket write") + } return } }