Skip to content

Commit

Permalink
Merge pull request #280 from nokia/fix273
Browse files Browse the repository at this point in the history
file output: set write concurrency to 1 when writing to stdout or stderr
  • Loading branch information
karimra authored Nov 7, 2023
2 parents a1a987b + d589dfa commit 4e18a6a
Showing 1 changed file with 55 additions and 49 deletions.
104 changes: 55 additions & 49 deletions outputs/file/file_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ import (
"text/template"
"time"

"github.com/prometheus/client_golang/prometheus"
"golang.org/x/sync/semaphore"
"google.golang.org/protobuf/proto"

"github.com/openconfig/gnmic/formatters"
"github.com/openconfig/gnmic/outputs"
"github.com/openconfig/gnmic/types"
"github.com/openconfig/gnmic/utils"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/sync/semaphore"
"google.golang.org/protobuf/proto"
)

const (
Expand All @@ -38,15 +39,15 @@ const (
func init() {
outputs.Register("file", func() outputs.Output {
return &File{
Cfg: &Config{},
cfg: &Config{},
logger: log.New(io.Discard, loggingPrefix, utils.DefaultLoggingFlags),
}
})
}

// File //
type File struct {
Cfg *Config
cfg *Config
file *os.File
logger *log.Logger
mo *formatters.MarshalOptions
Expand Down Expand Up @@ -78,7 +79,7 @@ type Config struct {
}

func (f *File) String() string {
b, err := json.Marshal(f)
b, err := json.Marshal(f.cfg)
if err != nil {
return ""
}
Expand All @@ -89,7 +90,7 @@ func (f *File) SetEventProcessors(ps map[string]map[string]interface{},
logger *log.Logger,
tcs map[string]*types.TargetConfig,
acts map[string]map[string]interface{}) {
for _, epName := range f.Cfg.EventProcessors {
for _, epName := range f.cfg.EventProcessors {
if epCfg, ok := ps[epName]; ok {
epType := ""
for k := range epCfg {
Expand Down Expand Up @@ -127,7 +128,7 @@ func (f *File) SetLogger(logger *log.Logger) {

// Init //
func (f *File) Init(ctx context.Context, name string, cfg map[string]interface{}, opts ...outputs.Option) error {
err := outputs.DecodeConfig(cfg, f.Cfg)
err := outputs.DecodeConfig(cfg, f.cfg)
if err != nil {
return err
}
Expand All @@ -137,66 +138,71 @@ func (f *File) Init(ctx context.Context, name string, cfg map[string]interface{}
for _, opt := range opts {
opt(f)
}
if f.Cfg.Format == "proto" {
if f.cfg.Format == "proto" {
return fmt.Errorf("proto format not supported in output type 'file'")
}
if f.Cfg.Separator == "" {
f.Cfg.Separator = defaultSeparator
if f.cfg.Separator == "" {
f.cfg.Separator = defaultSeparator
}
if f.Cfg.FileName == "" && f.Cfg.FileType == "" {
f.Cfg.FileType = "stdout"
if f.cfg.FileName == "" && f.cfg.FileType == "" {
f.cfg.FileType = "stdout"
}

switch f.Cfg.FileType {
switch f.cfg.FileType {
case "stdout":
f.file = os.Stdout
case "stderr":
f.file = os.Stderr
default:
CRFILE:
f.file, err = os.OpenFile(f.Cfg.FileName, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
f.file, err = os.OpenFile(f.cfg.FileName, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
f.logger.Printf("failed to create file: %v", err)
time.Sleep(10 * time.Second)
goto CRFILE
}
}

if f.Cfg.Format == "" {
f.Cfg.Format = defaultFormat
if f.cfg.Format == "" {
f.cfg.Format = defaultFormat
}
if f.Cfg.FileType == "stdout" || f.Cfg.FileType == "stderr" {
f.Cfg.Indent = " "
f.Cfg.Multiline = true
if f.cfg.FileType == "stdout" || f.cfg.FileType == "stderr" {
f.cfg.Indent = " "
f.cfg.Multiline = true
}
if f.Cfg.Multiline && f.Cfg.Indent == "" {
f.Cfg.Indent = " "
if f.cfg.Multiline && f.cfg.Indent == "" {
f.cfg.Indent = " "
}
if f.Cfg.ConcurrencyLimit < 1 {
f.Cfg.ConcurrencyLimit = defaultWriteConcurrency
if f.cfg.ConcurrencyLimit < 1 {
switch f.cfg.FileType {
case "stdout", "stderr":
f.cfg.ConcurrencyLimit = 1
default:
f.cfg.ConcurrencyLimit = defaultWriteConcurrency
}
}

f.sem = semaphore.NewWeighted(int64(f.Cfg.ConcurrencyLimit))
f.sem = semaphore.NewWeighted(int64(f.cfg.ConcurrencyLimit))

f.mo = &formatters.MarshalOptions{
Multiline: f.Cfg.Multiline,
Indent: f.Cfg.Indent,
Format: f.Cfg.Format,
OverrideTS: f.Cfg.OverrideTimestamps,
CalculateLatency: f.Cfg.CalculateLatency,
Multiline: f.cfg.Multiline,
Indent: f.cfg.Indent,
Format: f.cfg.Format,
OverrideTS: f.cfg.OverrideTimestamps,
CalculateLatency: f.cfg.CalculateLatency,
}
if f.Cfg.TargetTemplate == "" {
if f.cfg.TargetTemplate == "" {
f.targetTpl = outputs.DefaultTargetTemplate
} else if f.Cfg.AddTarget != "" {
f.targetTpl, err = utils.CreateTemplate("target-template", f.Cfg.TargetTemplate)
} else if f.cfg.AddTarget != "" {
f.targetTpl, err = utils.CreateTemplate("target-template", f.cfg.TargetTemplate)
if err != nil {
return err
}
f.targetTpl = f.targetTpl.Funcs(outputs.TemplateFuncs)
}

if f.Cfg.MsgTemplate != "" {
f.msgTpl, err = utils.CreateTemplate(fmt.Sprintf("%s-msg-template", name), f.Cfg.MsgTemplate)
if f.cfg.MsgTemplate != "" {
f.msgTpl, err = utils.CreateTemplate(fmt.Sprintf("%s-msg-template", name), f.cfg.MsgTemplate)
if err != nil {
return err
}
Expand Down Expand Up @@ -227,13 +233,13 @@ func (f *File) Write(ctx context.Context, rsp proto.Message, meta outputs.Meta)
defer f.sem.Release(1)

numberOfReceivedMsgs.WithLabelValues(f.file.Name()).Inc()
rsp, err = outputs.AddSubscriptionTarget(rsp, meta, f.Cfg.AddTarget, f.targetTpl)
rsp, err = outputs.AddSubscriptionTarget(rsp, meta, f.cfg.AddTarget, f.targetTpl)
if err != nil {
f.logger.Printf("failed to add target to the response: %v", err)
}
bb, err := outputs.Marshal(rsp, meta, f.mo, f.Cfg.SplitEvents, f.evps...)
bb, err := outputs.Marshal(rsp, meta, f.mo, f.cfg.SplitEvents, f.evps...)
if err != nil {
if f.Cfg.Debug {
if f.cfg.Debug {
f.logger.Printf("failed marshaling proto msg: %v", err)
}
numberOfFailWriteMsgs.WithLabelValues(f.file.Name(), "marshal_error").Inc()
Expand All @@ -246,17 +252,17 @@ func (f *File) Write(ctx context.Context, rsp proto.Message, meta outputs.Meta)
if f.msgTpl != nil {
b, err = outputs.ExecTemplate(b, f.msgTpl)
if err != nil {
if f.Cfg.Debug {
if f.cfg.Debug {
log.Printf("failed to execute template: %v", err)
}
numberOfFailWriteMsgs.WithLabelValues(f.file.Name(), "template_error").Inc()
continue
}
}

n, err := f.file.Write(append(b, []byte(f.Cfg.Separator)...))
n, err := f.file.Write(append(b, []byte(f.cfg.Separator)...))
if err != nil {
if f.Cfg.Debug {
if f.cfg.Debug {
f.logger.Printf("failed to write to file '%s': %v", f.file.Name(), err)
}
numberOfFailWriteMsgs.WithLabelValues(f.file.Name(), "write_error").Inc()
Expand All @@ -278,12 +284,12 @@ func (f *File) WriteEvent(ctx context.Context, ev *formatters.EventMsg) {
evs = proc.Apply(evs...)
}
toWrite := []byte{}
if f.Cfg.SplitEvents {
if f.cfg.SplitEvents {
for _, pev := range evs {
var err error
var b []byte
if f.Cfg.Multiline {
b, err = json.MarshalIndent(pev, "", f.Cfg.Indent)
if f.cfg.Multiline {
b, err = json.MarshalIndent(pev, "", f.cfg.Indent)
} else {
b, err = json.Marshal(pev)
}
Expand All @@ -293,13 +299,13 @@ func (f *File) WriteEvent(ctx context.Context, ev *formatters.EventMsg) {
return
}
toWrite = append(toWrite, b...)
toWrite = append(toWrite, []byte(f.Cfg.Separator)...)
toWrite = append(toWrite, []byte(f.cfg.Separator)...)
}
} else {
var err error
var b []byte
if f.Cfg.Multiline {
b, err = json.MarshalIndent(evs, "", f.Cfg.Indent)
if f.cfg.Multiline {
b, err = json.MarshalIndent(evs, "", f.cfg.Indent)
} else {
b, err = json.Marshal(evs)
}
Expand All @@ -309,7 +315,7 @@ func (f *File) WriteEvent(ctx context.Context, ev *formatters.EventMsg) {
return
}
toWrite = append(toWrite, b...)
toWrite = append(toWrite, []byte(f.Cfg.Separator)...)
toWrite = append(toWrite, []byte(f.cfg.Separator)...)
}

n, err := f.file.Write(toWrite)
Expand All @@ -330,7 +336,7 @@ func (f *File) Close() error {

// Metrics //
func (f *File) RegisterMetrics(reg *prometheus.Registry) {
if !f.Cfg.EnableMetrics {
if !f.cfg.EnableMetrics {
return
}
if err := registerMetrics(reg); err != nil {
Expand Down

0 comments on commit 4e18a6a

Please sign in to comment.