From d589dfaa925ef63dc305c8aa1ae19f4d72d974ea Mon Sep 17 00:00:00 2001 From: Karim Radhouani Date: Mon, 6 Nov 2023 20:49:00 -0800 Subject: [PATCH] file output: set write concurrency to 1 when writing to stdout or stderr --- outputs/file/file_output.go | 104 +++++++++++++++++++----------------- 1 file changed, 55 insertions(+), 49 deletions(-) diff --git a/outputs/file/file_output.go b/outputs/file/file_output.go index 14ebb724..6aad7590 100644 --- a/outputs/file/file_output.go +++ b/outputs/file/file_output.go @@ -19,13 +19,14 @@ import ( "text/template" "time" + "github.com/prometheus/client_golang/prometheus" + "golang.org/x/sync/semaphore" + "google.golang.org/protobuf/proto" + "github.com/openconfig/gnmic/formatters" "github.com/openconfig/gnmic/outputs" "github.com/openconfig/gnmic/types" "github.com/openconfig/gnmic/utils" - "github.com/prometheus/client_golang/prometheus" - "golang.org/x/sync/semaphore" - "google.golang.org/protobuf/proto" ) const ( @@ -38,7 +39,7 @@ const ( func init() { outputs.Register("file", func() outputs.Output { return &File{ - Cfg: &Config{}, + cfg: &Config{}, logger: log.New(io.Discard, loggingPrefix, utils.DefaultLoggingFlags), } }) @@ -46,7 +47,7 @@ func init() { // File // type File struct { - Cfg *Config + cfg *Config file *os.File logger *log.Logger mo *formatters.MarshalOptions @@ -78,7 +79,7 @@ type Config struct { } func (f *File) String() string { - b, err := json.Marshal(f) + b, err := json.Marshal(f.cfg) if err != nil { return "" } @@ -89,7 +90,7 @@ func (f *File) SetEventProcessors(ps map[string]map[string]interface{}, logger *log.Logger, tcs map[string]*types.TargetConfig, acts map[string]map[string]interface{}) { - for _, epName := range f.Cfg.EventProcessors { + for _, epName := range f.cfg.EventProcessors { if epCfg, ok := ps[epName]; ok { epType := "" for k := range epCfg { @@ -127,7 +128,7 @@ func (f *File) SetLogger(logger *log.Logger) { // Init // func (f *File) Init(ctx context.Context, name string, cfg map[string]interface{}, opts ...outputs.Option) error { - err := outputs.DecodeConfig(cfg, f.Cfg) + err := outputs.DecodeConfig(cfg, f.cfg) if err != nil { return err } @@ -137,24 +138,24 @@ func (f *File) Init(ctx context.Context, name string, cfg map[string]interface{} for _, opt := range opts { opt(f) } - if f.Cfg.Format == "proto" { + if f.cfg.Format == "proto" { return fmt.Errorf("proto format not supported in output type 'file'") } - if f.Cfg.Separator == "" { - f.Cfg.Separator = defaultSeparator + if f.cfg.Separator == "" { + f.cfg.Separator = defaultSeparator } - if f.Cfg.FileName == "" && f.Cfg.FileType == "" { - f.Cfg.FileType = "stdout" + if f.cfg.FileName == "" && f.cfg.FileType == "" { + f.cfg.FileType = "stdout" } - switch f.Cfg.FileType { + switch f.cfg.FileType { case "stdout": f.file = os.Stdout case "stderr": f.file = os.Stderr default: CRFILE: - f.file, err = os.OpenFile(f.Cfg.FileName, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) + f.file, err = os.OpenFile(f.cfg.FileName, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) if err != nil { f.logger.Printf("failed to create file: %v", err) time.Sleep(10 * time.Second) @@ -162,41 +163,46 @@ func (f *File) Init(ctx context.Context, name string, cfg map[string]interface{} } } - if f.Cfg.Format == "" { - f.Cfg.Format = defaultFormat + if f.cfg.Format == "" { + f.cfg.Format = defaultFormat } - if f.Cfg.FileType == "stdout" || f.Cfg.FileType == "stderr" { - f.Cfg.Indent = " " - f.Cfg.Multiline = true + if f.cfg.FileType == "stdout" || f.cfg.FileType == "stderr" { + f.cfg.Indent = " " + f.cfg.Multiline = true } - if f.Cfg.Multiline && f.Cfg.Indent == "" { - f.Cfg.Indent = " " + if f.cfg.Multiline && f.cfg.Indent == "" { + f.cfg.Indent = " " } - if f.Cfg.ConcurrencyLimit < 1 { - f.Cfg.ConcurrencyLimit = defaultWriteConcurrency + if f.cfg.ConcurrencyLimit < 1 { + switch f.cfg.FileType { + case "stdout", "stderr": + f.cfg.ConcurrencyLimit = 1 + default: + f.cfg.ConcurrencyLimit = defaultWriteConcurrency + } } - f.sem = semaphore.NewWeighted(int64(f.Cfg.ConcurrencyLimit)) + f.sem = semaphore.NewWeighted(int64(f.cfg.ConcurrencyLimit)) f.mo = &formatters.MarshalOptions{ - Multiline: f.Cfg.Multiline, - Indent: f.Cfg.Indent, - Format: f.Cfg.Format, - OverrideTS: f.Cfg.OverrideTimestamps, - CalculateLatency: f.Cfg.CalculateLatency, + Multiline: f.cfg.Multiline, + Indent: f.cfg.Indent, + Format: f.cfg.Format, + OverrideTS: f.cfg.OverrideTimestamps, + CalculateLatency: f.cfg.CalculateLatency, } - if f.Cfg.TargetTemplate == "" { + if f.cfg.TargetTemplate == "" { f.targetTpl = outputs.DefaultTargetTemplate - } else if f.Cfg.AddTarget != "" { - f.targetTpl, err = utils.CreateTemplate("target-template", f.Cfg.TargetTemplate) + } else if f.cfg.AddTarget != "" { + f.targetTpl, err = utils.CreateTemplate("target-template", f.cfg.TargetTemplate) if err != nil { return err } f.targetTpl = f.targetTpl.Funcs(outputs.TemplateFuncs) } - if f.Cfg.MsgTemplate != "" { - f.msgTpl, err = utils.CreateTemplate(fmt.Sprintf("%s-msg-template", name), f.Cfg.MsgTemplate) + if f.cfg.MsgTemplate != "" { + f.msgTpl, err = utils.CreateTemplate(fmt.Sprintf("%s-msg-template", name), f.cfg.MsgTemplate) if err != nil { return err } @@ -227,13 +233,13 @@ func (f *File) Write(ctx context.Context, rsp proto.Message, meta outputs.Meta) defer f.sem.Release(1) numberOfReceivedMsgs.WithLabelValues(f.file.Name()).Inc() - rsp, err = outputs.AddSubscriptionTarget(rsp, meta, f.Cfg.AddTarget, f.targetTpl) + rsp, err = outputs.AddSubscriptionTarget(rsp, meta, f.cfg.AddTarget, f.targetTpl) if err != nil { f.logger.Printf("failed to add target to the response: %v", err) } - bb, err := outputs.Marshal(rsp, meta, f.mo, f.Cfg.SplitEvents, f.evps...) + bb, err := outputs.Marshal(rsp, meta, f.mo, f.cfg.SplitEvents, f.evps...) if err != nil { - if f.Cfg.Debug { + if f.cfg.Debug { f.logger.Printf("failed marshaling proto msg: %v", err) } numberOfFailWriteMsgs.WithLabelValues(f.file.Name(), "marshal_error").Inc() @@ -246,7 +252,7 @@ func (f *File) Write(ctx context.Context, rsp proto.Message, meta outputs.Meta) if f.msgTpl != nil { b, err = outputs.ExecTemplate(b, f.msgTpl) if err != nil { - if f.Cfg.Debug { + if f.cfg.Debug { log.Printf("failed to execute template: %v", err) } numberOfFailWriteMsgs.WithLabelValues(f.file.Name(), "template_error").Inc() @@ -254,9 +260,9 @@ func (f *File) Write(ctx context.Context, rsp proto.Message, meta outputs.Meta) } } - n, err := f.file.Write(append(b, []byte(f.Cfg.Separator)...)) + n, err := f.file.Write(append(b, []byte(f.cfg.Separator)...)) if err != nil { - if f.Cfg.Debug { + if f.cfg.Debug { f.logger.Printf("failed to write to file '%s': %v", f.file.Name(), err) } numberOfFailWriteMsgs.WithLabelValues(f.file.Name(), "write_error").Inc() @@ -278,12 +284,12 @@ func (f *File) WriteEvent(ctx context.Context, ev *formatters.EventMsg) { evs = proc.Apply(evs...) } toWrite := []byte{} - if f.Cfg.SplitEvents { + if f.cfg.SplitEvents { for _, pev := range evs { var err error var b []byte - if f.Cfg.Multiline { - b, err = json.MarshalIndent(pev, "", f.Cfg.Indent) + if f.cfg.Multiline { + b, err = json.MarshalIndent(pev, "", f.cfg.Indent) } else { b, err = json.Marshal(pev) } @@ -293,13 +299,13 @@ func (f *File) WriteEvent(ctx context.Context, ev *formatters.EventMsg) { return } toWrite = append(toWrite, b...) - toWrite = append(toWrite, []byte(f.Cfg.Separator)...) + toWrite = append(toWrite, []byte(f.cfg.Separator)...) } } else { var err error var b []byte - if f.Cfg.Multiline { - b, err = json.MarshalIndent(evs, "", f.Cfg.Indent) + if f.cfg.Multiline { + b, err = json.MarshalIndent(evs, "", f.cfg.Indent) } else { b, err = json.Marshal(evs) } @@ -309,7 +315,7 @@ func (f *File) WriteEvent(ctx context.Context, ev *formatters.EventMsg) { return } toWrite = append(toWrite, b...) - toWrite = append(toWrite, []byte(f.Cfg.Separator)...) + toWrite = append(toWrite, []byte(f.cfg.Separator)...) } n, err := f.file.Write(toWrite) @@ -330,7 +336,7 @@ func (f *File) Close() error { // Metrics // func (f *File) RegisterMetrics(reg *prometheus.Registry) { - if !f.Cfg.EnableMetrics { + if !f.cfg.EnableMetrics { return } if err := registerMetrics(reg); err != nil {