Skip to content

Commit

Permalink
refactor: nodes analytics
Browse files Browse the repository at this point in the history
  • Loading branch information
0xJacky committed Feb 4, 2025
1 parent fee5145 commit d91fd5c
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 36 deletions.
12 changes: 4 additions & 8 deletions internal/analytic/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package analytic

import (
"encoding/json"
"errors"
"github.com/0xJacky/Nginx-UI/internal/transport"
"github.com/0xJacky/Nginx-UI/internal/upgrader"
"github.com/0xJacky/Nginx-UI/model"
Expand Down Expand Up @@ -69,15 +70,14 @@ func GetNode(env *model.Environment) (n *Node) {
return n
}

func InitNode(env *model.Environment) (n *Node) {
func InitNode(env *model.Environment) (n *Node, err error) {
n = &Node{
Environment: env,
}

u, err := url.JoinPath(env.URL, "/api/node")
if err != nil {
logger.Error(err)
return
return
}

t, err := transport.NewTransport()
Expand All @@ -90,29 +90,25 @@ func InitNode(env *model.Environment) (n *Node) {

req, err := http.NewRequest("GET", u, nil)
if err != nil {
logger.Error(err)
return
}

req.Header.Set("X-Node-Secret", env.Token)

resp, err := client.Do(req)
if err != nil {
logger.Error(err)
return
}

defer resp.Body.Close()
bytes, _ := io.ReadAll(resp.Body)

if resp.StatusCode != http.StatusOK {
logger.Error(string(bytes))
return
return n, errors.New(string(bytes))
}

err = json.Unmarshal(bytes, &n.NodeInfo)
if err != nil {
logger.Error(err)
return
}

Expand Down
63 changes: 35 additions & 28 deletions internal/analytic/node_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package analytic
import (
"context"
"encoding/json"
"github.com/0xJacky/Nginx-UI/internal/helper"
"net/http"
"time"

"github.com/0xJacky/Nginx-UI/model"
"github.com/0xJacky/Nginx-UI/query"
"github.com/gorilla/websocket"
"github.com/uozi-tech/cosy/logger"
"net/http"
"time"
)

var stopNodeRecordChan = make(chan struct{})
Expand Down Expand Up @@ -40,13 +40,13 @@ func RetrieveNodesStatus() {

<-stopNodeRecordChan
logger.Info("RetrieveNodesStatus exited normally")
return // will execute defer cancel()
// will execute defer cancel()
}

func nodeAnalyticLive(env *model.Environment, ctx context.Context) {
errChan := make(chan error)
for {
nodeAnalyticRecord(env, errChan, ctx)
go nodeAnalyticRecord(env, errChan, ctx)

select {
case err := <-errChan:
Expand All @@ -65,10 +65,17 @@ func nodeAnalyticLive(env *model.Environment, ctx context.Context) {
}

func nodeAnalyticRecord(env *model.Environment, errChan chan error, ctx context.Context) {
node, err := InitNode(env)

mutex.Lock()
NodeMap[env.ID] = InitNode(env)
NodeMap[env.ID] = node
mutex.Unlock()

if err != nil {
errChan <- err
return
}

u, err := env.GetWebSocketURL("/api/analytic/intro")
if err != nil {
errChan <- err
Expand All @@ -95,30 +102,30 @@ func nodeAnalyticRecord(env *model.Environment, errChan chan error, ctx context.
var nodeStat NodeStat

go func() {
for {
_, message, err := c.ReadMessage()
if helper.IsUnexpectedWebsocketError(err) {
errChan <- err
return
}

err = json.Unmarshal(message, &nodeStat)
if err != nil {
errChan <- err
return
}
// shutdown
<-ctx.Done()
_ = c.Close()
}()

// set online
nodeStat.Status = true
nodeStat.ResponseAt = time.Now()
for {
_, message, err := c.ReadMessage()
if err != nil {
errChan <- err
return
}

mutex.Lock()
NodeMap[env.ID].NodeStat = nodeStat
mutex.Unlock()
err = json.Unmarshal(message, &nodeStat)
if err != nil {
errChan <- err
return
}
}()

// shutdown
<-ctx.Done()
_ = c.Close()
// set online
nodeStat.Status = true
nodeStat.ResponseAt = time.Now()

mutex.Lock()
NodeMap[env.ID].NodeStat = nodeStat
mutex.Unlock()
}
}

0 comments on commit d91fd5c

Please sign in to comment.