Skip to content

Commit

Permalink
refactor: websocket error handler
Browse files Browse the repository at this point in the history
  • Loading branch information
0xJacky committed Feb 5, 2025
1 parent eb35143 commit 0da9c74
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 46 deletions.
15 changes: 9 additions & 6 deletions api/analytic/analytic.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ package analytic

import (
"fmt"
"net/http"
"runtime"
"time"

"github.com/0xJacky/Nginx-UI/internal/analytic"
"github.com/0xJacky/Nginx-UI/internal/helper"
"github.com/shirou/gopsutil/v4/cpu"
Expand All @@ -10,9 +14,6 @@ import (
"github.com/shirou/gopsutil/v4/net"
"github.com/spf13/cast"
"github.com/uozi-tech/cosy/logger"
"net/http"
"runtime"
"time"

"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
Expand Down Expand Up @@ -86,12 +87,14 @@ func Analytic(c *gin.Context) {

// write
err = ws.WriteJSON(stat)
if helper.IsUnexpectedWebsocketError(err) {
logger.Error(err)
if err != nil {
if helper.IsUnexpectedWebsocketError(err) {
logger.Error(err)
}
break
}

time.Sleep(1000 * time.Microsecond)
time.Sleep(1 * time.Second)
}
}

Expand Down
18 changes: 12 additions & 6 deletions api/analytic/nodes.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package analytic

import (
"net/http"
"time"

"github.com/0xJacky/Nginx-UI/internal/analytic"
"github.com/0xJacky/Nginx-UI/internal/helper"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"github.com/uozi-tech/cosy/logger"
"net/http"
"time"
)

func GetNodeStat(c *gin.Context) {
Expand All @@ -28,8 +29,10 @@ func GetNodeStat(c *gin.Context) {
for {
// write
err = ws.WriteJSON(analytic.GetNodeStat())
if helper.IsUnexpectedWebsocketError(err) {
logger.Error(err)
if err != nil {
if helper.IsUnexpectedWebsocketError(err) {
logger.Error(err)
}
break
}

Expand All @@ -55,11 +58,14 @@ func GetNodesAnalytic(c *gin.Context) {
for {
// write
err = ws.WriteJSON(analytic.NodeMap)
if helper.IsUnexpectedWebsocketError(err) {
logger.Error(err)
if err != nil {
if helper.IsUnexpectedWebsocketError(err) {
logger.Error(err)
}
break
}

time.Sleep(10 * time.Second)
logger.Debug("[analytic nodes] sleep 10 seconds")
}
}
6 changes: 4 additions & 2 deletions api/nginx_log/nginx_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,10 @@ func tailNginxLog(ws *websocket.Conn, controlChan chan controlStruct, errChan ch
}

err = ws.WriteMessage(websocket.TextMessage, []byte(line.Text))
if helper.IsUnexpectedWebsocketError(err) {
errChan <- errors.Wrap(err, "error tailNginxLog write message")
if err != nil {
if helper.IsUnexpectedWebsocketError(err) {
errChan <- errors.Wrap(err, "error tailNginxLog write message")
}
return
}
case control = <-controlChan:
Expand Down
6 changes: 4 additions & 2 deletions api/upstream/upstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ func AvailabilityTest(c *gin.Context) {

for {
err = ws.WriteJSON(upstream.AvailabilityTest(body))
if helper.IsUnexpectedWebsocketError(err) {
logger.Error(err)
if err != nil {
if helper.IsUnexpectedWebsocketError(err) {
logger.Error(err)
}
break
}

Expand Down
4 changes: 0 additions & 4 deletions app/src/views/dashboard/ServerAnalytic.vue
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,6 @@ onMounted(() => {
})
})
onUnmounted(() => {
websocket.close()
})
function handle_uptime(t: number) {
// uptime
let _uptime = Math.floor(t)
Expand Down
23 changes: 8 additions & 15 deletions internal/analytic/node_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ func RetrieveNodesStatus() {
defer logger.Info("RetrieveNodesStatus exited")

mutex.Lock()
NodeMap = make(TNodeMap)
if NodeMap == nil {
NodeMap = make(TNodeMap)
}
mutex.Unlock()

env := query.Environment
Expand All @@ -70,6 +72,11 @@ func RetrieveNodesStatus() {
default:
if err := nodeAnalyticRecord(e, ctx); err != nil {
logger.Error(err)
if NodeMap[env.ID] != nil {
mutex.Lock()
NodeMap[env.ID].Status = false
mutex.Unlock()
}
select {
case <-retryTicker.C:
case <-ctx.Done():
Expand All @@ -88,14 +95,11 @@ func nodeAnalyticRecord(env *model.Environment, ctx context.Context) error {
scopeCtx, cancel := context.WithCancel(ctx)
defer cancel()

logger.Debug("nodeAnalyticRecord")
node, err := InitNode(env)

mutex.Lock()
logger.Debug("lock")
NodeMap[env.ID] = node
mutex.Unlock()
logger.Debug("unlock")

if err != nil {
return err
Expand Down Expand Up @@ -125,27 +129,16 @@ func nodeAnalyticRecord(env *model.Environment, ctx context.Context) error {
go func() {
<-scopeCtx.Done()
_ = c.Close()
logger.Debug("close")
}()

var nodeStat NodeStat

defer func() {
if NodeMap[env.ID] != nil {
mutex.Lock()
NodeMap[env.ID].Status = false
mutex.Unlock()
}
}()

for {
err = c.ReadJSON(&nodeStat)
if err != nil {
return err
}

logger.Debug("nodeStat", nodeStat)

// set online
nodeStat.Status = true
nodeStat.ResponseAt = time.Now()
Expand Down
11 changes: 4 additions & 7 deletions internal/helper/websocket_error.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
package helper

import (
"strings"
"github.com/gorilla/websocket"
"errors"
"github.com/gorilla/websocket"
"strings"
"syscall"
)

// IsUnexpectedWebsocketError checks if the error is an unexpected websocket error
func IsUnexpectedWebsocketError(err error) bool {
// nil error is an expected error
if err == nil {
return false
}
// ignore: write: broken pipe
if errors.Is(err, syscall.EPIPE) {
return false
}
// client closed error: *net.OpErr
if strings.Contains(err.Error(), "An existing connection was forcibly closed by the remote host") {
return true
return false
}

return websocket.IsUnexpectedCloseError(err,
Expand Down
12 changes: 8 additions & 4 deletions internal/pty/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ func NewPipeLine(conn *websocket.Conn) (p *Pipeline, err error) {
func (p *Pipeline) ReadWsAndWritePty(errorChan chan error) {
for {
msgType, payload, err := p.ws.ReadMessage()
if helper.IsUnexpectedWebsocketError(err) {
errorChan <- errors.Wrap(err, "Error ReadWsAndWritePty unexpected close")
if err != nil {
if helper.IsUnexpectedWebsocketError(err) {
errorChan <- errors.Wrap(err, "Error ReadWsAndWritePty unexpected close")
}
return
}
if msgType != websocket.TextMessage {
Expand Down Expand Up @@ -117,8 +119,10 @@ func (p *Pipeline) ReadPtyAndWriteWs(errorChan chan error) {
}
processedOutput := validString(string(buf[:n]))
err = p.ws.WriteMessage(websocket.TextMessage, []byte(processedOutput))
if helper.IsUnexpectedWebsocketError(err) {
errorChan <- errors.Wrap(err, "Error ReadPtyAndWriteWs websocket write")
if err != nil {
if helper.IsUnexpectedWebsocketError(err) {
errorChan <- errors.Wrap(err, "Error ReadPtyAndWriteWs websocket write")
}
return
}
}
Expand Down

0 comments on commit 0da9c74

Please sign in to comment.