From dbfa73d92deac70b415a2aa04c6c0c79ae681541 Mon Sep 17 00:00:00 2001 From: wendall-robinson Date: Tue, 5 Nov 2024 12:06:53 -0600 Subject: [PATCH 1/3] Added BufferSize to Jetsream and NATS outputs * follow-up to Parallel processing with Jetstream #342 * added option to provide a buffer size to go channel on output * updaed user guides for Jetstream and NATS outputs --- docs/user_guide/outputs/jetstream_output.md | 4 ++++ docs/user_guide/outputs/nats_output.md | 4 ++++ pkg/outputs/nats_outputs/jetstream/jetstream_output.go | 3 ++- pkg/outputs/nats_outputs/nats/nats_output.go | 3 ++- 4 files changed, 12 insertions(+), 2 deletions(-) diff --git a/docs/user_guide/outputs/jetstream_output.md b/docs/user_guide/outputs/jetstream_output.md index 2e256724..6e7ed42a 100644 --- a/docs/user_guide/outputs/jetstream_output.md +++ b/docs/user_guide/outputs/jetstream_output.md @@ -123,6 +123,10 @@ outputs: # boolean, enables extra logging for the nats output debug: false # boolean, enables the collection and export (via prometheus) of output specific metrics + # integer, sets the size of the local buffer where received + # NATS messages are stored before being sent to outputs. + # This value is set per worker. Defaults to 100 messages + buffer-size: 100 enable-metrics: false # list of processors to apply to the message before writing event-processors: diff --git a/docs/user_guide/outputs/nats_output.md b/docs/user_guide/outputs/nats_output.md index 7301e900..4d26d4ff 100644 --- a/docs/user_guide/outputs/nats_output.md +++ b/docs/user_guide/outputs/nats_output.md @@ -73,6 +73,10 @@ outputs: # boolean, enables extra logging for the nats output debug: false # boolean, enables the collection and export (via prometheus) of output specific metrics + # integer, sets the size of the local buffer where received + # NATS messages are stored before being sent to outputs. + # This value is set per worker. Defaults to 100 messages + buffer-size: 100 enable-metrics: false # list of processors to apply on the message before writing event-processors: diff --git a/pkg/outputs/nats_outputs/jetstream/jetstream_output.go b/pkg/outputs/nats_outputs/jetstream/jetstream_output.go index 0c2908b8..b0a47081 100644 --- a/pkg/outputs/nats_outputs/jetstream/jetstream_output.go +++ b/pkg/outputs/nats_outputs/jetstream/jetstream_output.go @@ -87,6 +87,7 @@ type config struct { NumWorkers int `mapstructure:"num-workers,omitempty" json:"num-workers,omitempty"` WriteTimeout time.Duration `mapstructure:"write-timeout,omitempty" json:"write-timeout,omitempty"` Debug bool `mapstructure:"debug,omitempty" json:"debug,omitempty"` + BufferSize int `mapstructure:"buffer-size,omitempty"` EnableMetrics bool `mapstructure:"enable-metrics,omitempty" json:"enable-metrics,omitempty"` EventProcessors []string `mapstructure:"event-processors,omitempty" json:"event-processors,omitempty"` } @@ -136,7 +137,7 @@ func (n *jetstreamOutput) Init(ctx context.Context, name string, cfg map[string] return err } - n.msgChan = make(chan *outputs.ProtoMsg) + n.msgChan = make(chan *outputs.ProtoMsg, uint(n.Cfg.BufferSize)) initMetrics() n.mo = &formatters.MarshalOptions{ Format: n.Cfg.Format, diff --git a/pkg/outputs/nats_outputs/nats/nats_output.go b/pkg/outputs/nats_outputs/nats/nats_output.go index a0b12098..c4308da2 100644 --- a/pkg/outputs/nats_outputs/nats/nats_output.go +++ b/pkg/outputs/nats_outputs/nats/nats_output.go @@ -87,6 +87,7 @@ type Config struct { NumWorkers int `mapstructure:"num-workers,omitempty"` WriteTimeout time.Duration `mapstructure:"write-timeout,omitempty"` Debug bool `mapstructure:"debug,omitempty"` + BufferSize int `mapstructure:"buffer-size,omitempty"` EnableMetrics bool `mapstructure:"enable-metrics,omitempty"` EventProcessors []string `mapstructure:"event-processors,omitempty"` } @@ -145,7 +146,7 @@ func (n *NatsOutput) Init(ctx context.Context, name string, cfg map[string]inter return err } - n.msgChan = make(chan *outputs.ProtoMsg) + n.msgChan = make(chan *outputs.ProtoMsg, uint(n.Cfg.BufferSize)) initMetrics() n.mo = &formatters.MarshalOptions{ Format: n.Cfg.Format, From cde522839d36d978d7f13402a8bb6c2010686e9a Mon Sep 17 00:00:00 2001 From: wendall-robinson Date: Tue, 5 Nov 2024 12:46:16 -0600 Subject: [PATCH 2/3] Updated based on feedback from repo owner * updated user guide to communicate the default value of to 0 * fixed ordering of comment * made BufferSize a uint to default to 0 if not set --- docs/user_guide/outputs/jetstream_output.md | 6 +++--- docs/user_guide/outputs/nats_output.md | 6 +++--- pkg/outputs/nats_outputs/jetstream/jetstream_output.go | 2 +- pkg/outputs/nats_outputs/nats/nats_output.go | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/docs/user_guide/outputs/jetstream_output.md b/docs/user_guide/outputs/jetstream_output.md index 6e7ed42a..dd214a85 100644 --- a/docs/user_guide/outputs/jetstream_output.md +++ b/docs/user_guide/outputs/jetstream_output.md @@ -122,11 +122,11 @@ outputs: write-timeout: 5s # boolean, enables extra logging for the nats output debug: false - # boolean, enables the collection and export (via prometheus) of output specific metrics # integer, sets the size of the local buffer where received # NATS messages are stored before being sent to outputs. - # This value is set per worker. Defaults to 100 messages - buffer-size: 100 + # This value is set per worker. Defaults to 0 messages + buffer-size: 0 + # boolean, enables the collection and export (via prometheus) of output specific metrics enable-metrics: false # list of processors to apply to the message before writing event-processors: diff --git a/docs/user_guide/outputs/nats_output.md b/docs/user_guide/outputs/nats_output.md index 4d26d4ff..8ddd6df9 100644 --- a/docs/user_guide/outputs/nats_output.md +++ b/docs/user_guide/outputs/nats_output.md @@ -72,11 +72,11 @@ outputs: write-timeout: 5s # boolean, enables extra logging for the nats output debug: false - # boolean, enables the collection and export (via prometheus) of output specific metrics # integer, sets the size of the local buffer where received # NATS messages are stored before being sent to outputs. - # This value is set per worker. Defaults to 100 messages - buffer-size: 100 + # This value is set per worker. Defaults to 0 messages + buffer-size: 0 + # boolean, enables the collection and export (via prometheus) of output specific metrics enable-metrics: false # list of processors to apply on the message before writing event-processors: diff --git a/pkg/outputs/nats_outputs/jetstream/jetstream_output.go b/pkg/outputs/nats_outputs/jetstream/jetstream_output.go index b0a47081..def553a9 100644 --- a/pkg/outputs/nats_outputs/jetstream/jetstream_output.go +++ b/pkg/outputs/nats_outputs/jetstream/jetstream_output.go @@ -87,7 +87,7 @@ type config struct { NumWorkers int `mapstructure:"num-workers,omitempty" json:"num-workers,omitempty"` WriteTimeout time.Duration `mapstructure:"write-timeout,omitempty" json:"write-timeout,omitempty"` Debug bool `mapstructure:"debug,omitempty" json:"debug,omitempty"` - BufferSize int `mapstructure:"buffer-size,omitempty"` + BufferSize uint `mapstructure:"buffer-size,omitempty"` EnableMetrics bool `mapstructure:"enable-metrics,omitempty" json:"enable-metrics,omitempty"` EventProcessors []string `mapstructure:"event-processors,omitempty" json:"event-processors,omitempty"` } diff --git a/pkg/outputs/nats_outputs/nats/nats_output.go b/pkg/outputs/nats_outputs/nats/nats_output.go index c4308da2..7221b3dc 100644 --- a/pkg/outputs/nats_outputs/nats/nats_output.go +++ b/pkg/outputs/nats_outputs/nats/nats_output.go @@ -87,7 +87,7 @@ type Config struct { NumWorkers int `mapstructure:"num-workers,omitempty"` WriteTimeout time.Duration `mapstructure:"write-timeout,omitempty"` Debug bool `mapstructure:"debug,omitempty"` - BufferSize int `mapstructure:"buffer-size,omitempty"` + BufferSize uint `mapstructure:"buffer-size,omitempty"` EnableMetrics bool `mapstructure:"enable-metrics,omitempty"` EventProcessors []string `mapstructure:"event-processors,omitempty"` } From edeba0658aa98155cd88e961b2caa06288f3fd13 Mon Sep 17 00:00:00 2001 From: wendall-robinson Date: Wed, 6 Nov 2024 17:57:26 -0600 Subject: [PATCH 3/3] Removed uint type conversion when making the chan * BufferSize was changed to default to uint and no longer needs this type conversion * this was requested by the repo owner --- pkg/outputs/nats_outputs/jetstream/jetstream_output.go | 2 +- pkg/outputs/nats_outputs/nats/nats_output.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/outputs/nats_outputs/jetstream/jetstream_output.go b/pkg/outputs/nats_outputs/jetstream/jetstream_output.go index def553a9..1dddcd2f 100644 --- a/pkg/outputs/nats_outputs/jetstream/jetstream_output.go +++ b/pkg/outputs/nats_outputs/jetstream/jetstream_output.go @@ -137,7 +137,7 @@ func (n *jetstreamOutput) Init(ctx context.Context, name string, cfg map[string] return err } - n.msgChan = make(chan *outputs.ProtoMsg, uint(n.Cfg.BufferSize)) + n.msgChan = make(chan *outputs.ProtoMsg, n.Cfg.BufferSize) initMetrics() n.mo = &formatters.MarshalOptions{ Format: n.Cfg.Format, diff --git a/pkg/outputs/nats_outputs/nats/nats_output.go b/pkg/outputs/nats_outputs/nats/nats_output.go index 7221b3dc..1d43847a 100644 --- a/pkg/outputs/nats_outputs/nats/nats_output.go +++ b/pkg/outputs/nats_outputs/nats/nats_output.go @@ -146,7 +146,7 @@ func (n *NatsOutput) Init(ctx context.Context, name string, cfg map[string]inter return err } - n.msgChan = make(chan *outputs.ProtoMsg, uint(n.Cfg.BufferSize)) + n.msgChan = make(chan *outputs.ProtoMsg, n.Cfg.BufferSize) initMetrics() n.mo = &formatters.MarshalOptions{ Format: n.Cfg.Format,