Skip to content

Commit

Permalink
fix GitHub #119
Browse files Browse the repository at this point in the history
  • Loading branch information
mzzsfy committed Jan 5, 2024
1 parent 5cd7e6a commit 91f1241
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 24 deletions.
2 changes: 1 addition & 1 deletion _example/default/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func main() {
go example.Work()

// Register a Statsviz server on the default mux.
statsviz.Register(http.DefaultServeMux)
statsviz.Register(http.DefaultServeMux, statsviz.EnableDataCache())

fmt.Println("Point your browser to http://localhost:8080/debug/statsviz/")
log.Fatal(http.ListenAndServe(":8080", nil))
Expand Down
18 changes: 18 additions & 0 deletions internal/plot/plots_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"runtime/debug"
"runtime/metrics"
"strings"
"sync"
"time"
)
Expand Down Expand Up @@ -156,6 +157,23 @@ func (pl *List) WriteValues(w io.Writer) error {
}

if err := json.NewEncoder(w).Encode(m); err != nil {
// If we fail to encode the metrics values, it's probably because
if strings.Contains(err.Error(), "NaN") {
for s, a := range m {
if v, ok := a.([]float64); ok {
for i := range v {
if v[i] != v[i] {
v[i] = 0
}
}
m[s] = v
}
}
err = json.NewEncoder(w).Encode(m)
if err == nil {
return nil
}
}
return fmt.Errorf("failed to write/convert metrics values to json: %v", err)
}
return nil
Expand Down
62 changes: 60 additions & 2 deletions statsviz.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,14 @@ import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/gorilla/websocket"
Expand Down Expand Up @@ -91,6 +94,13 @@ type Server struct {
root string // HTTP path root
plots *plot.List // plots shown on the user interface
userPlots []plot.UserPlot

enablePlotCache bool
cacheTime time.Time
plotCache []byte
plotCacheLock *sync.Mutex
idGen int32
mainId int32
}

// NewServer constructs a new Statsviz Server with the provided options, or the
Expand Down Expand Up @@ -134,6 +144,16 @@ func SendFrequency(intv time.Duration) Option {
}
}

// EnableDataCache Multiple web page share data.
// https://github.com/arl/statsviz/issues/119
func EnableDataCache() Option {
return func(s *Server) error {
s.enablePlotCache = true
s.plotCacheLock = &sync.Mutex{}
return nil
}
}

// Root changes the root path of the Statsviz user interface.
// The default is "/debug/statsviz".
func Root(path string) Option {
Expand Down Expand Up @@ -265,13 +285,16 @@ func (s *Server) sendStats(conn *websocket.Conn, frequency time.Duration) error
// requested. Call plots.Config() manually to ensure that s.plots internals
// are correctly initialized.
s.plots.Config()

//The time interval of tick.C is not precise,
//so use id to ensure that data generation
//will not cause data loss due to the time interval of tick.C
id := atomic.AddInt32(&s.idGen, 1)
for range tick.C {
w, err := conn.NextWriter(websocket.TextMessage)
if err != nil {
return err
}
if err := s.plots.WriteValues(w); err != nil {
if err := s.writeData(w, id); err != nil {
return err
}
if err := w.Close(); err != nil {
Expand All @@ -281,3 +304,38 @@ func (s *Server) sendStats(conn *websocket.Conn, frequency time.Duration) error

panic("unreachable")
}

func (s *Server) writeData(w io.WriteCloser, id int32) (err error) {
if !s.enablePlotCache {
return s.plots.WriteValues(w)
}
defer func() {
if err == nil {
_, err = w.Write(s.plotCache)
}
}()
if s.mainId == id || time.Since(s.cacheTime) >= s.intv {
s.plotCacheLock.Lock()
if s.mainId != id {
// double check
if time.Since(s.cacheTime) < s.intv {
s.plotCacheLock.Unlock()
return
} else {
// change main id to active id
s.mainId = id
}
}
buf := bytes.Buffer{}
// make s.cacheTime closer to tick.C time
start := time.Now()
if err = s.plots.WriteValues(&buf); err != nil {
s.plotCacheLock.Unlock()
return err
}
s.plotCache = buf.Bytes()
s.cacheTime = start
s.plotCacheLock.Unlock()
}
return
}
140 changes: 119 additions & 21 deletions statsviz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,9 @@ func TestRoot(t *testing.T) {
testIndex(t, newServer(t, Root("/test/")).Index(), "http://example.com/test/")
}

func testWs(t *testing.T, f http.Handler, URL string) {
func testWs(t *testing.T, s *httptest.Server, URL string, number int, checkData func() any, check func(*testing.T, any)) {
t.Helper()

s := httptest.NewServer(f)
defer s.Close()

// Build a "ws://" url using the httptest server URL and the URL argument.
u1, err := url.Parse(s.URL)
if err != nil {
Expand All @@ -97,18 +94,31 @@ func testWs(t *testing.T, f http.Handler, URL string) {
defer ws.Close()

// Check the content of 2 consecutive payloads.
for i := 0; i < 2; i++ {

// Verifies that we've received 1 time series (goroutines) and one
// heatmap (sizeClasses).
var data struct {
Goroutines []uint64 `json:"goroutines"`
SizeClasses []uint64 `json:"size-classes"`
}
if err := ws.ReadJSON(&data); err != nil {
for i := 0; i < number; i++ {
data := checkData()
if err := ws.ReadJSON(data); err != nil {
t.Fatalf("failed reading json from websocket: %v", err)
return
}
check(t, data)
}
}

func TestWs(t *testing.T) {
t.Parallel()
// Verifies that we've received 1 time series (goroutines) and one
// heatmap (sizeClasses).
type dataType struct {
Goroutines []uint64 `json:"goroutines"`
SizeClasses []uint64 `json:"size-classes"`
}

s := httptest.NewServer(newServer(t).Ws())
defer s.Close()
testWs(t, s, "http://example.com/debug/statsviz/ws", 2, func() any {
return &dataType{}
}, func(t *testing.T, data1 any) {
data := data1.(*dataType)
// The time series must have one and only one element
if len(data.Goroutines) != 1 {
t.Errorf("len(goroutines) = %d, want 1", len(data.Goroutines))
Expand All @@ -117,13 +127,7 @@ func testWs(t *testing.T, f http.Handler, URL string) {
if len(data.SizeClasses) <= 1 {
t.Errorf("len(sizeClasses) = %d, want > 1", len(data.SizeClasses))
}
}
}

func TestWs(t *testing.T) {
t.Parallel()

testWs(t, newServer(t).Ws(), "http://example.com/debug/statsviz/ws")
})
}

func TestWsCantUpgrade(t *testing.T) {
Expand All @@ -141,7 +145,23 @@ func TestWsCantUpgrade(t *testing.T) {
func testRegister(t *testing.T, f http.Handler, baseURL string) {
testIndex(t, f, baseURL)
ws := strings.TrimRight(baseURL, "/") + "/ws"
testWs(t, f, ws)
type dataType struct {
Goroutines []uint64 `json:"goroutines"`
SizeClasses []uint64 `json:"size-classes"`
}
s := httptest.NewServer(f)
defer s.Close()
testWs(t, s, ws, 2, func() any {
return &dataType{}
}, func(t *testing.T, data1 any) {
data := data1.(*dataType)
if len(data.Goroutines) != 1 {
t.Errorf("len(goroutines) = %d, want 1", len(data.Goroutines))
}
if len(data.SizeClasses) <= 1 {
t.Errorf("len(sizeClasses) = %d, want > 1", len(data.SizeClasses))
}
})
}

func TestRegister(t *testing.T) {
Expand Down Expand Up @@ -174,6 +194,84 @@ func TestRegister(t *testing.T) {

testRegister(t, mux, "http://example.com/path/to/statsviz/")
})
type Data struct {
Test []float64 `json:"test"`
}

makeTestPlot := func() (TimeSeriesPlot, *int) {
num := 0
build, _ := TimeSeriesPlotConfig{
Title: "test",
Name: "test",
Series: []TimeSeries{
{
Name: "1",
GetValue: func() float64 {
num++
return 1
},
},
},
}.Build()
return build, &num
}

t.Run("customizePlot", func(t *testing.T) {
t.Parallel()
plot, i := makeTestPlot()
s := httptest.NewServer(newServer(t,
TimeseriesPlot(plot),
).Ws())
defer s.Close()
go testWs(t, s, "http://example.com/debug/statsviz/ws", 2, func() any { return &Data{} }, func(t *testing.T, a any) {})
testWs(t, s, "http://example.com/debug/statsviz/ws", 2, func() any {
return &Data{}
}, func(t *testing.T, a any) {
data := a.(*Data)
if len(data.Test) != 1 {
t.Fatalf("customizePlot failed,call num %d expect 1", len(data.Test))
}
})
time.Sleep(100 * time.Millisecond)
if *i != 4 {
t.Fatalf("dataCache2 failed,call num %d expect 4", *i)
}
})

t.Run("dataCache", func(t *testing.T) {
t.Parallel()
plot, i := makeTestPlot()
s := httptest.NewServer(newServer(t,
EnableDataCache(),
TimeseriesPlot(plot),
).Ws())
defer s.Close()
go testWs(t, s, "http://example.com/debug/statsviz/ws", 1, func() any { return &Data{} }, func(t *testing.T, a any) {})
go testWs(t, s, "http://example.com/debug/statsviz/ws", 2, func() any { return &Data{} }, func(t *testing.T, a any) {})
go testWs(t, s, "http://example.com/debug/statsviz/ws", 3, func() any { return &Data{} }, func(t *testing.T, a any) {})
go testWs(t, s, "http://example.com/debug/statsviz/ws", 4, func() any { return &Data{} }, func(t *testing.T, a any) {})
testWs(t, s, "http://example.com/debug/statsviz/ws", 4, func() any { return &Data{} }, func(t *testing.T, a any) {})
time.Sleep(100 * time.Millisecond)
if *i != 4 {
t.Fatalf("dataCache failed,call num %d expect 4", *i)
}
})

t.Run("dataCache2", func(t *testing.T) {
t.Parallel()
plot, i := makeTestPlot()
s := httptest.NewServer(newServer(t,
EnableDataCache(),
TimeseriesPlot(plot),
).Ws())
defer s.Close()
go testWs(t, s, "http://example.com/debug/statsviz/ws", 2, func() any { return &Data{} }, func(t *testing.T, a any) {})
testWs(t, s, "http://example.com/debug/statsviz/ws", 2, func() any { return &Data{} }, func(t *testing.T, a any) {})
time.Sleep(100 * time.Millisecond)
if *i != 2 {
t.Fatalf("dataCache failed,call num %d expect 2", *i)
}
})

t.Run("root+frequency", func(t *testing.T) {
t.Parallel()
Expand Down

0 comments on commit 91f1241

Please sign in to comment.