Skip to content

Commit

Permalink
(2.11) Add ability for IP queues to track total size (nats-io#5775)
Browse files Browse the repository at this point in the history
You can optionally pass in an `ipQueue_SizeCalculation` function to an
IP queue that fires each time an item is pushed or popped so that the
total size can be tracked.

There is a slight performance impact but mostly it will depend on the
complexity of the calculation function:
```
go test -v ./server -run=XXX -bench=IPQueueSizeCalculation -benchtime=10s
goos: darwin
goarch: arm64
pkg: github.com/nats-io/nats-server/v2/server
BenchmarkIPQueueSizeCalculation
BenchmarkIPQueueSizeCalculation/WithoutCalc
BenchmarkIPQueueSizeCalculation/WithoutCalc-24          396342910               30.75 ns/op      520.35 MB/s
BenchmarkIPQueueSizeCalculation/WithEmptyCalc
BenchmarkIPQueueSizeCalculation/WithEmptyCalc-24        377004524               33.91 ns/op      471.80 MB/s
BenchmarkIPQueueSizeCalculation/WithLenCalc
BenchmarkIPQueueSizeCalculation/WithLenCalc-24          375148896               33.76 ns/op      473.99 MB/s
PASS
ok      github.com/nats-io/nats-server/v2/server        48.150s
```

Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
derekcollison authored Sep 4, 2024
2 parents 2cfd00f + 0322966 commit 1f9d1e5
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 23 deletions.
100 changes: 82 additions & 18 deletions server/ipqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package server

import (
"errors"
"sync"
"sync/atomic"
)
Expand All @@ -28,36 +29,72 @@ type ipQueue[T any] struct {
elts []T
pos int
pool *sync.Pool
mrs int
sz uint64 // Calculated size (only if calc != nil)
name string
m *sync.Map
ipQueueOpts[T]
}

type ipQueueOpts struct {
maxRecycleSize int
type ipQueueOpts[T any] struct {
mrs int // Max recycle size
calc func(e T) uint64 // Calc function for tracking size
msz uint64 // Limit by total calculated size
mlen int // Limit by number of entries
}

type ipQueueOpt func(*ipQueueOpts)
type ipQueueOpt[T any] func(*ipQueueOpts[T])

// This option allows to set the maximum recycle size when attempting
// to put back a slice to the pool.
func ipQueue_MaxRecycleSize(max int) ipQueueOpt {
return func(o *ipQueueOpts) {
o.maxRecycleSize = max
func ipqMaxRecycleSize[T any](max int) ipQueueOpt[T] {
return func(o *ipQueueOpts[T]) {
o.mrs = max
}
}

func newIPQueue[T any](s *Server, name string, opts ...ipQueueOpt) *ipQueue[T] {
qo := ipQueueOpts{maxRecycleSize: ipQueueDefaultMaxRecycleSize}
for _, o := range opts {
o(&qo)
// This option enables total queue size counting by passing in a function
// that evaluates the size of each entry as it is pushed/popped. This option
// enables the size() function.
func ipqSizeCalculation[T any](calc func(e T) uint64) ipQueueOpt[T] {
return func(o *ipQueueOpts[T]) {
o.calc = calc
}
}

// This option allows setting the maximum queue size. Once the limit is
// reached, then push() will stop returning true and no more entries will
// be stored until some more are popped. The ipQueue_SizeCalculation must
// be provided for this to work.
func ipqLimitBySize[T any](max uint64) ipQueueOpt[T] {
return func(o *ipQueueOpts[T]) {
o.msz = max
}
}

// This option allows setting the maximum queue length. Once the limit is
// reached, then push() will stop returning true and no more entries will
// be stored until some more are popped.
func ipqLimitByLen[T any](max int) ipQueueOpt[T] {
return func(o *ipQueueOpts[T]) {
o.mlen = max
}
}

var errIPQLenLimitReached = errors.New("IPQ len limit reached")
var errIPQSizeLimitReached = errors.New("IPQ size limit reached")

func newIPQueue[T any](s *Server, name string, opts ...ipQueueOpt[T]) *ipQueue[T] {
q := &ipQueue[T]{
ch: make(chan struct{}, 1),
mrs: qo.maxRecycleSize,
pool: &sync.Pool{},
name: name,
m: &s.ipQueues,
ipQueueOpts: ipQueueOpts[T]{
mrs: ipQueueDefaultMaxRecycleSize,
},
}
for _, o := range opts {
o(&q.ipQueueOpts)
}
s.ipQueues.Store(name, q)
return q
Expand All @@ -66,10 +103,14 @@ func newIPQueue[T any](s *Server, name string, opts ...ipQueueOpt) *ipQueue[T] {
// Add the element `e` to the queue, notifying the queue channel's `ch` if the
// entry is the first to be added, and returns the length of the queue after
// this element is added.
func (q *ipQueue[T]) push(e T) int {
func (q *ipQueue[T]) push(e T) (int, error) {
var signal bool
q.Lock()
l := len(q.elts) - q.pos
if q.mlen > 0 && l == q.mlen {
q.Unlock()
return l, errIPQLenLimitReached
}
if l == 0 {
signal = true
eltsi := q.pool.Get()
Expand All @@ -82,16 +123,23 @@ func (q *ipQueue[T]) push(e T) int {
q.elts = make([]T, 0, 32)
}
}
if q.calc != nil {
sz := q.calc(e)
if q.msz > 0 && q.sz+sz > q.msz {
q.Unlock()
return l, errIPQSizeLimitReached
}
q.sz += sz
}
q.elts = append(q.elts, e)
l++
q.Unlock()
if signal {
select {
case q.ch <- struct{}{}:
default:
}
}
return l
return l + 1, nil
}

// Returns the whole list of elements currently present in the queue,
Expand All @@ -116,6 +164,11 @@ func (q *ipQueue[T]) pop() []T {
}
q.elts, q.pos = nil, 0
atomic.AddInt64(&q.inprogress, int64(len(elts)))
if q.calc != nil {
for _, e := range elts {
q.sz -= q.calc(e)
}
}
q.Unlock()
return elts
}
Expand All @@ -140,6 +193,9 @@ func (q *ipQueue[T]) popOne() (T, bool) {
}
e := q.elts[q.pos]
q.pos++
if q.calc != nil {
q.sz -= q.calc(e)
}
l--
if l > 0 {
// We need to re-signal
Expand Down Expand Up @@ -184,9 +240,16 @@ func (q *ipQueue[T]) recycle(elts *[]T) {
// Returns the current length of the queue.
func (q *ipQueue[T]) len() int {
q.Lock()
l := len(q.elts) - q.pos
q.Unlock()
return l
defer q.Unlock()
return len(q.elts) - q.pos
}

// Returns the calculated size of the queue (if ipQueue_SizeCalculation has been
// passed in), otherwise returns zero.
func (q *ipQueue[T]) size() uint64 {
q.Lock()
defer q.Unlock()
return q.sz
}

// Empty the queue and consumes the notification signal if present.
Expand All @@ -202,6 +265,7 @@ func (q *ipQueue[T]) drain() {
q.resetAndReturnToPool(&q.elts)
q.elts, q.pos = nil, 0
}
q.sz = 0
// Consume the signal if it was present to reduce the chance of a reader
// routine to be think that there is something in the queue...
select {
Expand Down
110 changes: 106 additions & 4 deletions server/ipqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestIPQueueBasic(t *testing.T) {
}

// Try to change the max recycle size
q2 := newIPQueue[int](s, "test2", ipQueue_MaxRecycleSize(10))
q2 := newIPQueue[int](s, "test2", ipqMaxRecycleSize[int](10))
if q2.mrs != 10 {
t.Fatalf("Expected max recycle size to be 10, got %v", q2.mrs)
}
Expand Down Expand Up @@ -290,15 +290,15 @@ func TestIPQueueRecycle(t *testing.T) {
for iter := 0; iter < 5; iter++ {
var sz int
for i := 0; i < total; i++ {
sz = q.push(i)
sz, _ = q.push(i)
}
if sz != total {
t.Fatalf("Expected size to be %v, got %v", total, sz)
}
values := q.pop()
preRecycleCap := cap(values)
q.recycle(&values)
sz = q.push(1001)
sz, _ = q.push(1001)
if sz != 1 {
t.Fatalf("Expected size to be %v, got %v", 1, sz)
}
Expand All @@ -317,7 +317,7 @@ func TestIPQueueRecycle(t *testing.T) {
}
}

q = newIPQueue[int](s, "test2", ipQueue_MaxRecycleSize(10))
q = newIPQueue[int](s, "test2", ipqMaxRecycleSize[int](10))
for i := 0; i < 100; i++ {
q.push(i)
}
Expand Down Expand Up @@ -389,3 +389,105 @@ func TestIPQueueDrain(t *testing.T) {
}
}
}

func TestIPQueueSizeCalculation(t *testing.T) {
type testType = [16]byte
var testValue testType

calc := ipqSizeCalculation[testType](func(e testType) uint64 {
return uint64(len(e))
})
s := &Server{}
q := newIPQueue[testType](s, "test", calc)

for i := 0; i < 10; i++ {
q.push(testValue)
require_Equal(t, q.len(), i+1)
require_Equal(t, q.size(), uint64(i+1)*uint64(len(testValue)))
}

for i := 10; i > 5; i-- {
q.popOne()
require_Equal(t, q.len(), i-1)
require_Equal(t, q.size(), uint64(i-1)*uint64(len(testValue)))
}

q.pop()
require_Equal(t, q.len(), 0)
require_Equal(t, q.size(), 0)
}

func TestIPQueueSizeCalculationWithLimits(t *testing.T) {
type testType = [16]byte
var testValue testType

calc := ipqSizeCalculation[testType](func(e testType) uint64 {
return uint64(len(e))
})
s := &Server{}

t.Run("LimitByLen", func(t *testing.T) {
q := newIPQueue[testType](s, "test", calc, ipqLimitByLen[testType](5))
for i := 0; i < 10; i++ {
n, err := q.push(testValue)
if i >= 5 {
require_Error(t, err, errIPQLenLimitReached)
} else {
require_NoError(t, err)
}
require_LessThan(t, n, 6)
}
})

t.Run("LimitBySize", func(t *testing.T) {
q := newIPQueue[testType](s, "test", calc, ipqLimitBySize[testType](16*5))
for i := 0; i < 10; i++ {
n, err := q.push(testValue)
if i >= 5 {
require_Error(t, err, errIPQSizeLimitReached)
} else {
require_NoError(t, err)
}
require_LessThan(t, n, 6)
}
})
}

func BenchmarkIPQueueSizeCalculation(b *testing.B) {
type testType = [16]byte
var testValue testType

s := &Server{}

run := func(b *testing.B, q *ipQueue[testType]) {
b.SetBytes(16)
for i := 0; i < b.N; i++ {
q.push(testValue)
}
for i := b.N; i > 0; i-- {
q.popOne()
}
}

// Measures without calculation function overheads.
b.Run("WithoutCalc", func(b *testing.B) {
run(b, newIPQueue[testType](s, "test"))
})

// Measures the raw overhead of having a calculation function.
b.Run("WithEmptyCalc", func(b *testing.B) {
calc := ipqSizeCalculation[testType](func(e testType) uint64 {
return 0
})
run(b, newIPQueue[testType](s, "test", calc))
})

// Measures the overhead of having a calculation function that
// actually measures something useful.
b.Run("WithLenCalc", func(b *testing.B) {
calc := ipqSizeCalculation[testType](func(e testType) uint64 {
return uint64(len(e))
})
run(b, newIPQueue[testType](s, "test", calc))
})
}
2 changes: 1 addition & 1 deletion server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -863,7 +863,7 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub
// header from the msg body. No other references are needed.
// Check pending and warn if getting backed up.
const warnThresh = 128
pending := s.jsAPIRoutedReqs.push(&jsAPIRoutedReq{jsub, sub, acc, subject, reply, copyBytes(rmsg), c.pa})
pending, _ := s.jsAPIRoutedReqs.push(&jsAPIRoutedReq{jsub, sub, acc, subject, reply, copyBytes(rmsg), c.pa})
if pending >= warnThresh {
s.rateLimitFormatWarnf("JetStream request queue has high pending count: %d", pending)
}
Expand Down

0 comments on commit 1f9d1e5

Please sign in to comment.