diff --git a/CHANGES b/CHANGES index 1b495a8..7f54419 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,7 @@ +1.1.1 (Nov 10, 2023): +- Updated startup logic to remove dead sockets instead of failing immediately if the socket file exists +- Fix memory leak when accepting an incoming connection. + 1.1.0 (Sep 19, 2023): - Add support for Client/GetTreatment(s)WithConfig operations. - Add support for Manager operations. diff --git a/Makefile b/Makefile index 2106c2d..98cb2a5 100644 --- a/Makefile +++ b/Makefile @@ -10,7 +10,7 @@ PLATFORM ?= PLATFORM_STR := $(if $(PLATFORM),--platform=$(PLATFORM),) VERSION := $(shell cat splitio/version.go | grep 'const Version' | sed 's/const Version = //' | tr -d '"') -COMMIT_SHA := $(shell git rev-parse --short HEAD) +COMMIT_SHA := $(shell bash -c '[ ! -z $${GITHUB_SHA} ] && echo $${GITHUB_SHA:0:7} || git rev-parse --short=7 HEAD') COMMIT_SHA_FILE := splitio/commitsha.go GO_FILES := $(shell find . -name "*.go" -not -name "$(COMMIT_SHA_FILE)") go.sum @@ -37,6 +37,9 @@ clean: ## build binaries for this platform build: splitd splitcli sdhelper +## print current commit SHA (from repo metadata if local, from env-var if GHA) +printsha: + @echo $(COMMIT_SHA) ## run all tests diff --git a/cmd/splitcli/main.go b/cmd/splitcli/main.go index 5323c65..cc83f16 100644 --- a/cmd/splitcli/main.go +++ b/cmd/splitcli/main.go @@ -11,6 +11,7 @@ import ( "github.com/splitio/splitd/splitio/conf" "github.com/splitio/splitd/splitio/link" "github.com/splitio/splitd/splitio/link/client/types" + "github.com/splitio/splitd/splitio/link/transfer" "github.com/splitio/splitd/splitio/util" ) @@ -38,6 +39,21 @@ func main() { VerboseWriter: os.Stderr, }) + if args.Method == "ping" { + // no consumer is created (to avoid registering) + c, err := transfer.NewClientConn(logger, &linkOpts.Transfer) + if err != nil { + logger.Error("error connecting to socket: ", err.Error()) + os.Exit(2) + } + if err = c.Shutdown(); err != nil { + logger.Error("error closing connection: ", err.Error()) + os.Exit(2) + } + logger.Info("socket is accepting connections properly") + os.Exit(0) + } + c, err := link.Consumer(logger, linkOpts) if err != nil { logger.Error("error creating client wrapper: ", err) diff --git a/cmd/splitd/main.go b/cmd/splitd/main.go index c94c4d1..82a991b 100644 --- a/cmd/splitd/main.go +++ b/cmd/splitd/main.go @@ -11,6 +11,8 @@ import ( "github.com/splitio/splitd/splitio/link" "github.com/splitio/splitd/splitio/sdk" "github.com/splitio/splitd/splitio/util" + + "github.com/splitio/splitd/splitio/provisional/profiler" ) func main() { @@ -47,6 +49,15 @@ func main() { }) defer shutdown.Wait() + if pc := cfg.Debug.Profiling; pc.Enable { + go func() { + p := profiler.New(pc.Host, pc.Port) + if err := p.ListenAndServe(); err != nil { + panic(err.Error()) + } + }() + } + // Wait for connection to end (either gracefully of because of an error) err = <-errc exitOnErr("shutdown: ", err) diff --git a/infra/entrypoint.sh b/infra/entrypoint.sh index 1c2843c..ecaa72e 100755 --- a/infra/entrypoint.sh +++ b/infra/entrypoint.sh @@ -57,6 +57,11 @@ accum=$(yq '.sdk.apikey = env(SPLITD_APIKEY) | .link.address = env(SPLITD_LINK_A # logger configs [ ! -z ${SPLITD_LOG_LEVEL+x} ] && accum=$(echo "${accum}" | yq '.logging.level = env(SPLITD_LOG_LEVEL)') [ ! -z ${SPLITD_LOG_OUTPUT+x} ] && accum=$(echo "${accum}" | yq '.logging.output = env(SPLITD_LOG_OUTPUT)') + +# profiling configs +[ ! -z ${SPLITD_PROFILING_ENABLE+x} ] && accum=$(echo "${accum}" | yq '.debug.profiling.enable = env(SPLITD_PROFILING_ENABLE)') +[ ! -z ${SPLITD_PROFILING_HOST+x} ] && accum=$(echo "${accum}" | yq '.debug.profiling.host = env(SPLITD_PROFILING_HOST)') +[ ! -z ${SPLITD_PROFILING_PORT+x} ] && accum=$(echo "${accum}" | yq '.debug.profiling.port = env(SPLITD_PROFILING_PORT)') # @} # Ensure that the socket-file is read-writable by anyone diff --git a/infra/test/test_entrypoint.sh b/infra/test/test_entrypoint.sh index efda841..fcace0d 100755 --- a/infra/test/test_entrypoint.sh +++ b/infra/test/test_entrypoint.sh @@ -53,6 +53,10 @@ function testAllVars { export SPLITD_EVENTS_REFRESH_SECS="11" export SPLITD_EVENTS_QUEUE_SIZE="12" + export SPLITD_PROFILING_ENABLE="true" + export SPLITD_PROFILING_HOST="somehost" + export SPLITD_PROFILING_PORT="1234" + # Exec entrypoint [ -f "./testcfg" ] && rm ./testcfg @@ -97,6 +101,12 @@ function testAllVars { # --- + assert_eq "true" $(echo "$conf_json" | jq '.Debug.Profiling.Enable') "incorrect profiling status" + assert_eq '"somehost"' $(echo "$conf_json" | jq '.Debug.Profiling.Host') "incorrect profiling host" + assert_eq "1234" $(echo "$conf_json" | jq '.Debug.Profiling.Port') "incorrect profiling port" + + # --- + assert_eq '"WARNING"' $(echo "$conf_json" | jq '.Logger.Level') "incorrect log level" assert_eq '"/dev/stderr"' $(echo "$conf_json" | jq '.Logger.Output') "incorrect log output" diff --git a/splitd.yaml.tpl b/splitd.yaml.tpl index cfdaaf3..490a010 100644 --- a/splitd.yaml.tpl +++ b/splitd.yaml.tpl @@ -40,4 +40,9 @@ link: serialization: msgpack bufferSize: 1024 protocol: v1 +debug: + profiling: + enable: false + host: localhost + port: 8888 diff --git a/splitio/commitsha.go b/splitio/commitsha.go index f065945..f4fb30e 100644 --- a/splitio/commitsha.go +++ b/splitio/commitsha.go @@ -1,3 +1,3 @@ package splitio -const CommitSHA = "29ff22d" +const CommitSHA = "a41bede" diff --git a/splitio/conf/splitd.go b/splitio/conf/splitd.go index dd84342..41d1c18 100644 --- a/splitio/conf/splitd.go +++ b/splitio/conf/splitd.go @@ -28,6 +28,7 @@ type Config struct { Logger Logger `yaml:"logging"` SDK SDK `yaml:"sdk"` Link Link `yaml:"link"` + Debug Debug `yaml:"debug"` } func (c Config) String() string { @@ -58,6 +59,7 @@ func (c *Config) PopulateWithDefaults() { c.SDK.PopulateWithDefaults() c.Link.PopulateWithDefaults() c.Logger.PopulateWithDefaults() + c.Debug.PopulateWithDefaults() } type Link struct { @@ -271,6 +273,26 @@ func (l *Logger) ToLoggerOptions() (*logging.LoggerOptions, error) { return opts, nil } +type Debug struct { + Profiling Profiling `yaml:"profiling"` +} + +func (d *Debug) PopulateWithDefaults() { + d.Profiling.PopulateWithDefaults() +} + +type Profiling struct { + Enable bool `yaml:"enable"` + Host string `yaml:"host"` + Port int `yaml:"port"` +} + +func (p *Profiling) PopulateWithDefaults() { + p.Enable = false + p.Host = "localhost" + p.Port = 8888 +} + func ReadConfig() (*Config, error) { cfgFN := defaultConfigFN if fromEnv := os.Getenv("SPLITD_CONF_FILE"); fromEnv != "" { diff --git a/splitio/link/service/v1/clientmgr.go b/splitio/link/service/v1/clientmgr.go index 73e41b1..d61c711 100644 --- a/splitio/link/service/v1/clientmgr.go +++ b/splitio/link/service/v1/clientmgr.go @@ -56,13 +56,13 @@ func (m *ClientManager) handleClientInteractions() error { rpc, err := m.fetchRPC() if err != nil { if errors.Is(err, io.EOF) { // connection ended, no error - m.logger.Debug(fmt.Sprintf("connection remotely closed for metadata=%+v", m.clientConfig.Metadata)) + m.logger.Debug(fmt.Sprintf("connection remotely closed for metadata=%s", formatClientConfig(m.clientConfig))) return nil } else if errors.Is(err, os.ErrDeadlineExceeded) { // we waited for an RPC, got none, try again. - m.logger.Debug(fmt.Sprintf("read timeout/no RPC fetched. restarting loop for metadata=%+v", m.clientConfig)) + m.logger.Debug(fmt.Sprintf("read timeout/no RPC fetched. restarting loop for metadata=%s", formatClientConfig(m.clientConfig))) continue } else { - m.logger.Error(fmt.Sprintf("unexpected error reading RPC: %s. Closing conn for metadata=%+v", err, m.clientConfig)) + m.logger.Error(fmt.Sprintf("unexpected error reading RPC: %s. Closing conn for metadata=%s", err, formatClientConfig(m.clientConfig))) return err } } @@ -313,3 +313,11 @@ func (m *ClientManager) handleSplits(rpc *protov1.RPC) (interface{}, error) { return response, nil } + +func formatClientConfig(c *types.ClientConfig) string { + if c == nil { + return "" + } + + return fmt.Sprintf("%+v", c) +} diff --git a/splitio/link/transfer/acceptor.go b/splitio/link/transfer/acceptor.go index 8109905..bfa48d4 100644 --- a/splitio/link/transfer/acceptor.go +++ b/splitio/link/transfer/acceptor.go @@ -92,9 +92,13 @@ func (a *Acceptor) Start(onClientAttachedCallback OnClientAttachedCallback) (<-c return } - ctx, cancel := context.WithTimeout(context.Background(), a.maxWait) - defer cancel() - err = a.sem.Acquire(ctx, 1) + // try to acquire a semaphore slot (throughput limiting): + // to avoid leaks, the lifetime of the context/deadline is scoped to a func containing a defer statement + err = func() error { + ctx, cancel := context.WithTimeout(context.Background(), a.maxWait) + defer cancel() + return a.sem.Acquire(ctx, 1) + }() if err != nil { a.logger.Error(fmt.Sprintf("Incoming connection request timed out. If the current parallelism is expected, "+ "consider increasing `maxConcurrentConnections` (current=%d)", a.maxConns)) diff --git a/splitio/link/transfer/acceptor_test.go b/splitio/link/transfer/acceptor_test.go index c9cdb84..bf4ee45 100644 --- a/splitio/link/transfer/acceptor_test.go +++ b/splitio/link/transfer/acceptor_test.go @@ -96,5 +96,4 @@ func TestNewAcceptorInstantiation(t *testing.T) { assert.Equal(t, opts.Address, acc.address.(*net.UnixAddr).Name) assert.Equal(t, "unix", acc.address.(*net.UnixAddr).Network()) assert.Nil(t, acc.Shutdown()) - } diff --git a/splitio/link/transfer/setup.go b/splitio/link/transfer/setup.go index 31c2726..a82c878 100644 --- a/splitio/link/transfer/setup.go +++ b/splitio/link/transfer/setup.go @@ -4,6 +4,8 @@ import ( "errors" "fmt" "net" + "os" + "syscall" "time" "github.com/splitio/go-toolkit/v5/logging" @@ -29,7 +31,8 @@ const ( ) var ( - ErrInvalidConnType = errors.New("invalid conn type") + ErrInvalidConnType = errors.New("invalid conn type") + ErrServiceAddressInUse = errors.New("provided socket file / address is already in use") ) func NewAcceptor(logger logging.LoggerInterface, o *Options, listenerConfig *AcceptorConfig) (*Acceptor, error) { @@ -46,6 +49,10 @@ func NewAcceptor(logger logging.LoggerInterface, o *Options, listenerConfig *Acc return nil, ErrInvalidConnType } + if err := ensureAddressUsable(logger, address); err != nil { + return nil, err + } + cf := func(c net.Conn) RawConn { return newConnWrapper(c, ff, o) } return newAcceptor(address, cf, logger, listenerConfig), nil } @@ -93,3 +100,35 @@ func DefaultOpts() Options { // helpers func lpFramerFromConn(c net.Conn) framing.Interface { return framing.NewLengthPrefix(c) } + +func ensureAddressUsable(logger logging.LoggerInterface, address net.Addr) error { + switch address.Network() { + case "unix", "unixpacket": + if _, err := os.Stat(address.String()); errors.Is(err, os.ErrNotExist) { + return nil // file doesn't exist, we're ok + } + + logger.Warning("The socket file exists. Testing if it's currently accepting connections") + c, err := net.Dial(address.Network(), address.String()) + if err == nil { + c.Close() + return ErrServiceAddressInUse + } + + logger.Warning("The socket appears to be from a previous (dead) execution. Will try to remove it") + + if !errors.Is(err, syscall.ECONNREFUSED) { + return fmt.Errorf("unknown error when testing for a dead socket: %w", err) + } + + // the socket seems to be bound to a dead process, will try removing it + // so that a listener can be created + if err := os.Remove(address.String()); err != nil { + return fmt.Errorf("error removing dead-socket file from a previous execution: %w", err) + } + + logger.Warning("Dead socket file removed successfuly") + + } + return nil +} diff --git a/splitio/link/transfer/setup_test.go b/splitio/link/transfer/setup_test.go index 14b7152..407fa94 100644 --- a/splitio/link/transfer/setup_test.go +++ b/splitio/link/transfer/setup_test.go @@ -1,8 +1,14 @@ package transfer import ( - "github.com/stretchr/testify/assert" + "net" + "os" + "path/filepath" "testing" + "time" + + "github.com/splitio/go-toolkit/v5/logging" + "github.com/stretchr/testify/assert" ) func TestConnType(t *testing.T) { @@ -10,3 +16,49 @@ func TestConnType(t *testing.T) { assert.Equal(t, "unix-stream", ConnTypeUnixStream.String()) assert.Equal(t, "invalid-socket-type", ConnType(123).String()) } + +func TestEnsureAddressIsUsable(t *testing.T) { + + logger := logging.NewLogger(nil) + assert.Nil(t, ensureAddressUsable(logger, &net.UDPAddr{})) + assert.Nil(t, ensureAddressUsable(logger, &net.TCPAddr{})) + assert.Nil(t, ensureAddressUsable(logger, &net.UnixAddr{Name: "/some/nonexistent/file"})) + + // test unknown error (in this case trying to connect to a different socket type) + ready := make(chan struct{}) + path := filepath.Join(os.TempDir(), "splitd_test_ensure_address_usable.sock") + os.Remove(path) // por las dudas + go func() { + l, err := net.ListenUnix("unix", &net.UnixAddr{Name: path, Net: "unix"}) + assert.Nil(t, err) + defer l.Close() + + l.SetDeadline(time.Now().Add(1 * time.Second)) + go func() { + time.Sleep(100 * time.Millisecond) + ready <- struct{}{} + }() + l.Accept() + }() + <-ready + assert.ErrorContains(t, ensureAddressUsable(logger, &net.UnixAddr{Name: path, Net: "unixpacket"}), "unknown error when testing for a dead socket") + + // test socket in use error + ready = make(chan struct{}) + path = filepath.Join(os.TempDir(), "splitd_test_ensure_address_usable2.sock") + os.Remove(path) // por las dudas + go func() { + l, err := net.ListenUnix("unix", &net.UnixAddr{Name: path, Net: "unix"}) + assert.Nil(t, err) + defer l.Close() + + l.SetDeadline(time.Now().Add(1 * time.Second)) + go func() { + time.Sleep(100 * time.Millisecond) + ready <- struct{}{} + }() + l.Accept() + }() + <-ready + assert.ErrorIs(t, ErrServiceAddressInUse, ensureAddressUsable(logger, &net.UnixAddr{Name: path, Net: "unix"})) +} diff --git a/splitio/provisional/profiler/profiler.go b/splitio/provisional/profiler/profiler.go new file mode 100644 index 0000000..21f1b64 --- /dev/null +++ b/splitio/provisional/profiler/profiler.go @@ -0,0 +1,36 @@ +package profiler + +import ( + "fmt" + "net/http" + "net/http/pprof" +) + +func init() { + http.DefaultServeMux = http.NewServeMux() +} + +type HTTPProfileInterface struct { + server http.Server + //server http.ServeMux +} + +func New(host string, port int) *HTTPProfileInterface { + mux := http.NewServeMux() + + mux.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index)) + mux.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline)) + mux.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile)) + mux.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol)) + mux.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace)) + return &HTTPProfileInterface{ + http.Server{ + Addr: fmt.Sprintf("%s:%d", host, port), + Handler: mux, + }, + } +} + +func (h *HTTPProfileInterface) ListenAndServe() error { + return h.server.ListenAndServe() +} diff --git a/splitio/version.go b/splitio/version.go index bf86f46..ccae879 100644 --- a/splitio/version.go +++ b/splitio/version.go @@ -1,3 +1,3 @@ package splitio -const Version = "1.1.0" +const Version = "1.1.1"