From 2e95d85653fae329e3aa8fdc3e2237aec066aadd Mon Sep 17 00:00:00 2001 From: Marek Cermak Date: Fri, 22 Nov 2024 18:11:19 +0100 Subject: [PATCH] feat: implement websocket client for quotes Signed-off-by: Marek Cermak --- client/{ => rest}/client.go | 6 +- go.mod | 4 +- go.sum | 4 + market/client.go | 60 ----------- market/event.go | 4 +- market/fmp.go | 90 ++++++++++++++++ market/quote.go | 94 ++++++++--------- market/ticker.go | 4 +- market/websocket.go | 199 ++++++++++++++++++++++++------------ model/websocket.go | 54 +++++++--- util/config.go | 60 +++++++++++ 11 files changed, 386 insertions(+), 193 deletions(-) rename client/{ => rest}/client.go (98%) delete mode 100644 market/client.go create mode 100644 market/fmp.go create mode 100644 util/config.go diff --git a/client/client.go b/client/rest/client.go similarity index 98% rename from client/client.go rename to client/rest/client.go index 6327f08..7f16fdf 100644 --- a/client/client.go +++ b/client/rest/client.go @@ -1,4 +1,4 @@ -package client +package rest import ( "context" @@ -15,16 +15,14 @@ import ( ) const ( + apiURL = "https://financialmodelingprep.com" clientVersion = "v0.0.0" -) -const ( DefaultRetryCount = 3 DefaultClientTimeout = 10 * time.Second ) func New( - apiURL string, apiKey string, logger *slog.Logger, ) *Client { diff --git a/go.mod b/go.mod index 7cdd7d9..da0d156 100644 --- a/go.mod +++ b/go.mod @@ -3,11 +3,12 @@ module go.tradeforge.dev/fmp go 1.23.3 require ( - github.com/eapache/go-resiliency v1.7.0 + github.com/caarlos0/env/v10 v10.0.0 github.com/go-playground/form/v4 v4.2.1 github.com/go-playground/validator/v10 v10.23.0 github.com/go-resty/resty/v2 v2.11.0 github.com/gorilla/websocket v1.5.3 + github.com/joho/godotenv v1.5.1 github.com/shopspring/decimal v1.4.0 github.com/stretchr/testify v1.9.0 go.tradeforge.dev/background v0.2.0 @@ -15,6 +16,7 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect + github.com/eapache/go-resiliency v1.7.0 // indirect github.com/gabriel-vasile/mimetype v1.4.3 // indirect github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect diff --git a/go.sum b/go.sum index d54665e..91fe8e1 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/caarlos0/env/v10 v10.0.0 h1:yIHUBZGsyqCnpTkbjk8asUlx6RFhhEs+h7TOBdgdzXA= +github.com/caarlos0/env/v10 v10.0.0/go.mod h1:ZfulV76NvVPw3tm591U4SwL3Xx9ldzBP9aGxzeN7G18= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -21,6 +23,8 @@ github.com/go-resty/resty/v2 v2.11.0 h1:i7jMfNOJYMp69lq7qozJP+bjgzfAzeOhuGlyDrqx github.com/go-resty/resty/v2 v2.11.0/go.mod h1:iiP/OpA0CkcL3IGt1O0+/SIItFUbkkyw5BGXiVdTu+A= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= +github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= diff --git a/market/client.go b/market/client.go deleted file mode 100644 index 562d14b..0000000 --- a/market/client.go +++ /dev/null @@ -1,60 +0,0 @@ -// Package market defines HTTP and a Websocket client for the FMP API. -package market - -import ( - "log/slog" - - "go.tradeforge.dev/fmp/client" -) - -// HTTPClient defines a client to the Polygon REST API. -type HTTPClient struct { - QuoteClient - TickerClient - EventClient -} - -// NewHTTPClient returns a new HTTP client with the specified API key and config. -func NewHTTPClient( - apiURL string, - apiKey string, - logger *slog.Logger, -) *HTTPClient { - c := client.New( - apiURL, - apiKey, - logger, - ) - - return &HTTPClient{ - QuoteClient: QuoteClient{ - Client: c, - }, - TickerClient: TickerClient{ - Client: c, - }, - EventClient: EventClient{ - Client: c, - }, - } -} - -type WebsocketClientConfig struct { - APIKey string `validate:"required"` - APIURL string `validate:"required"` -} - -func NewWebsocketClient( - config WebsocketClientConfig, - logger *slog.Logger, -) *WebsocketClient { - return &WebsocketClient{ - config: config, - logger: logger, - } -} - -type WebsocketClient struct { - config WebsocketClientConfig - logger *slog.Logger -} diff --git a/market/event.go b/market/event.go index 962e3d7..1b4f41f 100644 --- a/market/event.go +++ b/market/event.go @@ -4,7 +4,7 @@ import ( "context" "net/http" - "go.tradeforge.dev/fmp/client" + "go.tradeforge.dev/fmp/client/rest" "go.tradeforge.dev/fmp/model" ) @@ -14,7 +14,7 @@ const ( ) type EventClient struct { - *client.Client + *rest.Client } func (ec *EventClient) GetEarningsCalendar(ctx context.Context, params *model.GetEarningsCalendarParams, opts ...model.RequestOption) ([]model.GetEarningsCalendarResponse, error) { diff --git a/market/fmp.go b/market/fmp.go new file mode 100644 index 0000000..2fd5aba --- /dev/null +++ b/market/fmp.go @@ -0,0 +1,90 @@ +// Package market defines HTTP and a Websocket client for the FMP API. +package market + +import ( + "context" + "errors" + "log/slog" + "sync" + + "github.com/gorilla/websocket" + "go.tradeforge.dev/background/manager" + + "go.tradeforge.dev/fmp/client/rest" + "go.tradeforge.dev/fmp/model" +) + +type HTTPClientConfig struct { + ApiKey string `validate:"required" env:"FMP_API_KEY"` +} + +// HTTPClient defines a client to the Polygon REST API. +type HTTPClient struct { + QuoteClient + TickerClient + EventClient +} + +// NewHTTPClient returns a new HTTP client with the specified API key and config. +func NewHTTPClient( + config HTTPClientConfig, + logger *slog.Logger, +) *HTTPClient { + c := rest.New( + config.ApiKey, + logger, + ) + + return &HTTPClient{ + QuoteClient: QuoteClient{ + Client: c, + }, + TickerClient: TickerClient{ + Client: c, + }, + EventClient: EventClient{ + Client: c, + }, + } +} + +type WebsocketClientConfig struct { + ApiKey string `validate:"required" env:"FMP_API_KEY"` +} + +func NewWebsocketClient( + ctx context.Context, + config WebsocketClientConfig, + logger *slog.Logger, +) (*WebsocketClient, error) { + if ctx.Done() != nil { + return nil, errors.New("context is already cancelled") + } + return &WebsocketClient{ + ctx: ctx, + config: config, + logger: logger, + manager: manager.New(ctx, manager.WithCancelOnError(), manager.WithFirstError()), + + events: make(chan model.WebsocketMesssage), + quotes: make(chan model.WebsocketQuote), + }, nil +} + +type WebsocketClient struct { + ctx context.Context + config WebsocketClientConfig + logger *slog.Logger + + manager *manager.Manager + + connectOnce sync.Once + connectionLock sync.Mutex + connection *websocket.Conn + + subscribedQuotesLock sync.RWMutex + subscribedQuotes map[string]struct{} + + events chan model.WebsocketMesssage + quotes chan model.WebsocketQuote +} diff --git a/market/quote.go b/market/quote.go index bb2fbe4..4485215 100644 --- a/market/quote.go +++ b/market/quote.go @@ -1,77 +1,77 @@ package market import ( - "context" - "fmt" - "net/http" + "context" + "fmt" + "net/http" - "go.tradeforge.dev/fmp/client" - "go.tradeforge.dev/fmp/model" + "go.tradeforge.dev/fmp/client/rest" + "go.tradeforge.dev/fmp/model" ) const ( - GetRealTimeQuotePath = "/api/v3/stock/full/real-time-price/:symbol" - GetFullPricePath = "/api/v3/quote/:symbol" - GetPriceChangePath = "/api/v3/stock-price-target/:symbol" + GetRealTimeQuotePath = "/api/v3/stock/full/real-time-price/:symbol" + GetFullPricePath = "/api/v3/quote/:symbol" + GetPriceChangePath = "/api/v3/stock-price-target/:symbol" - BatchGetRealTimeQuotePath = "/api/v3/stock/full/real-time-price/:symbols" - BatchGetFullPricePath = "/api/v3/quote/:symbols" + BatchGetRealTimeQuotePath = "/api/v3/stock/full/real-time-price/:symbols" + BatchGetFullPricePath = "/api/v3/quote/:symbols" ) type QuoteClient struct { - *client.Client + *rest.Client } func (qc *QuoteClient) GetFullPrice(ctx context.Context, params *model.GetFullPriceParams, opts ...model.RequestOption) (response *model.GetFullPriceResponse, err error) { - var res []model.GetFullPriceResponse - _, err = qc.Call(ctx, http.MethodGet, GetFullPricePath, params, &res, opts...) - if err != nil { - return nil, err - } - if len(res) != 1 { - return nil, fmt.Errorf("expected response of length 1, got %d", len(res)) - } - return &res[0], nil + var res []model.GetFullPriceResponse + _, err = qc.Call(ctx, http.MethodGet, GetFullPricePath, params, &res, opts...) + if err != nil { + return nil, err + } + if len(res) != 1 { + return nil, fmt.Errorf("expected response of length 1, got %d", len(res)) + } + return &res[0], nil } func (qc *QuoteClient) BatchGetFullPrice(ctx context.Context, params *model.BatchGetFullPriceParams, opts ...model.RequestOption) (model.BatchGetFullPriceResponse, error) { - var res model.BatchGetFullPriceResponse - _, err := qc.Call(ctx, http.MethodGet, BatchGetFullPricePath, params, &res, opts...) - return res, err + var res model.BatchGetFullPriceResponse + _, err := qc.Call(ctx, http.MethodGet, BatchGetFullPricePath, params, &res, opts...) + return res, err } func (qc *QuoteClient) GetPriceChange(ctx context.Context, params *model.GetPriceChangeParams, opts ...model.RequestOption) (response *model.GetPriceChangeResponse, err error) { - var res []model.GetPriceChangeResponse - _, err = qc.Call(ctx, http.MethodGet, GetPriceChangePath, params, &res, opts...) - if err != nil { - return nil, err - } - if len(res) != 1 { - return nil, fmt.Errorf("expected response of length 1, got %d", len(res)) - } - return &res[0], nil + var res []model.GetPriceChangeResponse + _, err = qc.Call(ctx, http.MethodGet, GetPriceChangePath, params, &res, opts...) + if err != nil { + return nil, err + } + if len(res) != 1 { + return nil, fmt.Errorf("expected response of length 1, got %d", len(res)) + } + return &res[0], nil } func (qc *QuoteClient) BatchGetPriceChange(ctx context.Context, params *model.BatchGetPriceChangeParams, opts ...model.RequestOption) ([]model.GetPriceChangeResponse, error) { - var res []model.GetPriceChangeResponse - _, err := qc.Call(ctx, http.MethodGet, BatchGetRealTimeQuotePath, params, &res, opts...) - return res, err + var res []model.GetPriceChangeResponse + _, err := qc.Call(ctx, http.MethodGet, BatchGetRealTimeQuotePath, params, &res, opts...) + return res, err } func (qc *QuoteClient) GetRealTimeQuote(ctx context.Context, params *model.GetRealTimeQuoteParams, opts ...model.RequestOption) (response *model.GetRealTimeQuoteResponse, err error) { - var res []model.GetRealTimeQuoteResponse - _, err = qc.Call(ctx, http.MethodGet, GetRealTimeQuotePath, params, &res, opts...) - if err != nil { - return nil, err - } - if len(res) != 1 { - return nil, fmt.Errorf("expected response of length 1, got %d", len(res)) - } - return &res[0], nil + var res []model.GetRealTimeQuoteResponse + _, err = qc.Call(ctx, http.MethodGet, GetRealTimeQuotePath, params, &res, opts...) + if err != nil { + return nil, err + } + if len(res) != 1 { + return nil, fmt.Errorf("expected response of length 1, got %d", len(res)) + } + return &res[0], nil } func (qc *QuoteClient) BatchGetRealTimeQuote(ctx context.Context, params *model.BatchGetRealTimeQuoteParams, opts ...model.RequestOption) (model.BatchGetRealTimeQuoteResponse, error) { - var res model.BatchGetRealTimeQuoteResponse - _, err := qc.Call(ctx, http.MethodGet, BatchGetRealTimeQuotePath, params, &res, opts...) - return res, err + var res model.BatchGetRealTimeQuoteResponse + _, err := qc.Call(ctx, http.MethodGet, BatchGetRealTimeQuotePath, params, &res, opts...) + return res, err } diff --git a/market/ticker.go b/market/ticker.go index f7e4bf7..a71c6f4 100644 --- a/market/ticker.go +++ b/market/ticker.go @@ -7,7 +7,7 @@ import ( "fmt" "net/http" - "go.tradeforge.dev/fmp/client" + "go.tradeforge.dev/fmp/client/rest" "go.tradeforge.dev/fmp/model" ) @@ -25,7 +25,7 @@ const ( ) type TickerClient struct { - *client.Client + *rest.Client } func (tc *TickerClient) GetCompanyProfile(ctx context.Context, params *model.GetCompanyProfileParams, opts ...model.RequestOption) (_ *model.GetCompanyProfileResponse, err error) { diff --git a/market/websocket.go b/market/websocket.go index 7088c87..c92375a 100644 --- a/market/websocket.go +++ b/market/websocket.go @@ -2,124 +2,179 @@ package market import ( "context" + "encoding/json" "fmt" - "time" + "log/slog" + "sync" - "github.com/eapache/go-resiliency/retrier" "github.com/gorilla/websocket" - "go.tradeforge.dev/background/manager" "go.tradeforge.dev/fmp/model" ) const ( - defaultConstantBackoffRetries = 5 + QuoteEndpoint = "wss://websockets.financialmodelingprep.com" ) -func (wss *WebsocketClient) SubscribeToPriceFeed(ctx context.Context, symbols []string) (<-chan model.WebsocketQuote, error) { - conn, _, err := websocket.DefaultDialer.Dial(wss.config.APIURL, nil) - defer func() { - if err := conn.Close(); err != nil { - wss.logger.Error("closing websocket connection", "error", err) +func (wss *WebsocketClient) Connect(endpoint string) (err error) { + wss.connectOnce.Do(func() { + var conn *websocket.Conn + conn, _, err = websocket.DefaultDialer.Dial(endpoint, nil) + if err != nil { + return } - }() - if err != nil { - return nil, err - } + wss.connection = conn + wss.manager.Run(wss.maintainConnection) - if err := wss.authenticate(conn); err != nil { - return nil, fmt.Errorf("authenticating websocket connection: %w", err) - } - if err := wss.subscribeToCompanyFeed(conn, symbols); err != nil { - return nil, fmt.Errorf("subscribing to company feed: %w", err) - } - defer func() { - if err := wss.unsubscribeFromPriceFeed(conn, symbols); err != nil { - wss.logger.Error("unsubscribing from price feed", "error", err) + msg := model.WebsocketAuthenticationRequest{ + Event: model.WebsocketEventNameLogin, + Data: model.WebsocketAuthenticationRequestData{APIKey: wss.config.ApiKey}, } - }() - wss.logger.Debug("subscribed to price feed") - - priceFeed := make(chan model.WebsocketQuote) - - mgr := manager.New(ctx) - mgr.RunWithRetry(func(ctx context.Context) error { - defer func() { - wss.logger.Debug("closing price feed") - close(priceFeed) - }() - return wss.startReadingPriceFeed(ctx, conn, priceFeed) - }, retrier.New(retrier.ConstantBackoff(defaultConstantBackoffRetries, 5*time.Second), nil)) - - return priceFeed, nil + if connErr := conn.WriteJSON(msg); connErr != nil { + err = fmt.Errorf("writing authentication message: %w", connErr) + return + } + L: + for { + select { + case <-wss.ctx.Done(): + break L + case evt := <-wss.events: + if evt.Event != model.WebsocketEventNameLogin { + continue + } + if evt.Status == nil || *evt.Status >= 400 { + errMsg := fmt.Sprintf("unexpected error code: %d", evt.Status) + if evt.Message != nil { + errMsg = *evt.Message + } + err = fmt.Errorf("authentication failed: %s", errMsg) + } + break L + } + } + }) + if err != nil { + return fmt.Errorf("dialing websocket connection: %w", err) + } + return nil } -func (wss *WebsocketClient) startReadingPriceFeed(ctx context.Context, conn *websocket.Conn, priceFeed chan<- model.WebsocketQuote) error { +func (wss *WebsocketClient) maintainConnection(ctx context.Context) error { L: for { select { case <-ctx.Done(): return nil default: - var msg model.WebsocketEvent - if err := conn.ReadJSON(&msg); err != nil { + var rawMessage json.RawMessage + if err := wss.connection.ReadJSON(&rawMessage); err != nil { return fmt.Errorf("reading websocket message: %w", err) } + msg := model.WebsocketMesssage{} + if err := json.Unmarshal(rawMessage, &msg); err != nil { + return fmt.Errorf("unmarshaling websocket message: %w", err) + } switch msg.Event { - case model.WebsocketEventTypeHeartbeat: + case model.WebsocketEventNameHeartbeat: wss.logger.Debug("received heartbeat") continue - case model.WebsocketEventTypeLogin: - wss.logger.Debug("authenticated to price feed") + case model.WebsocketEventNameLogin: + wss.events <- msg + wss.logger.Debug("authenticated", slog.Any("message", msg)) continue - case model.WebsocketEventTypeSubscribe: - wss.logger.Debug("subscribed to price feed") + case model.WebsocketEventNameSubscribe: + wss.events <- msg + wss.logger.Debug("subscribed", slog.Any("message", msg)) continue - case model.WebsocketEventTypeUnsubscribe: - wss.logger.Debug("unsubscribed from price feed") + case model.WebsocketEventNameUnsubscribe: + wss.events <- msg + wss.logger.Debug("unsubscribed", slog.Any("message", msg)) break L default: - var quote model.WebsocketQuote - if err := conn.ReadJSON("e); err != nil { - return fmt.Errorf("reading websocket quote: %w", err) + wss.logger.Debug("received message", slog.Any("raw", rawMessage)) + if msg.Type == nil { + return fmt.Errorf("unknown message type: nil") } - if quote.LastPrice == 0 { - continue + if err := wss.processRawMessage(*msg.Type, rawMessage); err != nil { + return fmt.Errorf("processing message: %w", err) } - priceFeed <- quote } } } - return nil } -func (wss *WebsocketClient) authenticate(conn *websocket.Conn) error { - msg := model.WebsocketAuthenticationRequest{ - Event: model.WebsocketEventTypeLogin, - Data: model.WebsocketAuthenticationRequestData{APIKey: wss.config.APIKey}, +func (wss *WebsocketClient) Disconnect() error { + if wss.connection == nil { + return nil } - if err := conn.WriteJSON(msg); err != nil { - return fmt.Errorf("writing authentication message: %w", err) + if err := wss.connection.Close(); err != nil { + return fmt.Errorf("closing websocket connection: %w", err) } + wss.connection = nil + wss.connectOnce = sync.Once{} + return nil } -func (wss *WebsocketClient) subscribeToCompanyFeed(conn *websocket.Conn, symbols []string) error { +func (wss *WebsocketClient) Subscribe(symbols []string) error { + wss.subscribedQuotesLock.Lock() + defer wss.subscribedQuotesLock.Unlock() msg := model.WebsocketSubscriptionRequest{ - Event: model.WebsocketEventTypeSubscribe, + Event: model.WebsocketEventNameSubscribe, Data: model.WebsocketSubscriptionRequestData{Symbols: symbols}, } - if err := conn.WriteJSON(msg); err != nil { + if err := wss.connection.WriteJSON(msg); err != nil { return fmt.Errorf("writing subscription message: %w", err) } +L: + for { + select { + case <-wss.ctx.Done(): + break L + case evt := <-wss.events: + if evt.Event != model.WebsocketEventNameSubscribe { + continue + } + if evt.Status == nil || *evt.Status >= 400 { + errMsg := fmt.Sprintf("unexpected error code: %d", evt.Status) + if evt.Message != nil { + errMsg = *evt.Message + } + return fmt.Errorf("subscription failed: %s", errMsg) + } + break L + } + } + return nil +} + +func (wss *WebsocketClient) processRawMessage(typ model.WebsocketMessageType, msg json.RawMessage) error { + switch typ { + case model.WebsocketMessageTypeQuote: + if err := wss.processQuote(msg); err != nil { + return fmt.Errorf("processing quote: %w", err) + } + default: + wss.logger.Debug("received unknown message", slog.Any("message", msg)) + } return nil } -func (wss *WebsocketClient) unsubscribeFromPriceFeed(conn *websocket.Conn, symbols []string) error { +func (wss *WebsocketClient) processQuote(msg json.RawMessage) error { + quote := model.WebsocketQuote{} + if err := json.Unmarshal(msg, "e); err != nil { + return fmt.Errorf("unmarshaling websocket quote: %w", err) + } + wss.quotes <- quote + return nil +} + +func (wss *WebsocketClient) Unsubscribe(conn *websocket.Conn, symbols []string) error { msg := model.WebsocketSubscriptionRequest{ - Event: model.WebsocketEventTypeUnsubscribe, + Event: model.WebsocketEventNameUnsubscribe, Data: model.WebsocketSubscriptionRequestData{Symbols: symbols}, } if err := conn.WriteJSON(msg); err != nil { @@ -127,3 +182,17 @@ func (wss *WebsocketClient) unsubscribeFromPriceFeed(conn *websocket.Conn, symbo } return nil } + +func (wss *WebsocketClient) Quotes() <-chan model.WebsocketQuote { + return wss.quotes +} + +//func (wss *WebsocketClient) readQuote(ctx context.Context) error { +// var quote model.WebsocketQuote +// if err := wss.connection.ReadJSON("e); err != nil { +// return fmt.Errorf("reading websocket quote: %w", err) +// } +// if quote.LastPrice == 0 { +// return nil +// } +//} diff --git a/model/websocket.go b/model/websocket.go index 4a21d73..f0740ac 100644 --- a/model/websocket.go +++ b/model/websocket.go @@ -1,23 +1,53 @@ package model -type WebsocketEventType string +import ( + "log/slog" +) + +type WebsocketEventName string + +const ( + WebsocketEventNameHeartbeat WebsocketEventName = "heartbeat" + WebsocketEventNameLogin WebsocketEventName = "login" + WebsocketEventNameSubscribe WebsocketEventName = "subscribe" + WebsocketEventNameUnsubscribe WebsocketEventName = "unsubscribe" +) + +type WebsocketMessageType string const ( - WebsocketEventTypeHeartbeat WebsocketEventType = "heartbeat" - WebsocketEventTypeLogin WebsocketEventType = "login" - WebsocketEventTypeSubscribe WebsocketEventType = "subscribe" - WebsocketEventTypeUnsubscribe WebsocketEventType = "unsubscribe" + WebsocketMessageTypeQuote WebsocketMessageType = "Q" ) -type WebsocketEvent struct { - Event WebsocketEventType `json:"event"` - Message *string `json:"message"` - Status *int `json:"status"` - Timestamp *int64 `json:"timestamp"` +type WebsocketMesssage struct { + Event WebsocketEventName `json:"event"` + Type *WebsocketMessageType `json:"type"` + Message *string `json:"message"` + Status *int `json:"status"` + Timestamp *int64 `json:"timestamp"` +} + +func (m WebsocketMesssage) LogValue() slog.Value { + valueMap := map[string]interface{}{ + "event": m.Event, + } + if m.Type != nil { + valueMap["type"] = m.Type + } + if m.Message != nil { + valueMap["message"] = *m.Message + } + if m.Status != nil { + valueMap["status"] = *m.Status + } + if m.Timestamp != nil { + valueMap["timestamp"] = *m.Timestamp + } + return slog.AnyValue(valueMap) } type WebsocketAuthenticationRequest struct { - Event WebsocketEventType `json:"event"` + Event WebsocketEventName `json:"event"` Data WebsocketAuthenticationRequestData `json:"data"` } @@ -26,7 +56,7 @@ type WebsocketAuthenticationRequestData struct { } type WebsocketSubscriptionRequest struct { - Event WebsocketEventType `json:"event"` + Event WebsocketEventName `json:"event"` Data WebsocketSubscriptionRequestData `json:"data"` } diff --git a/util/config.go b/util/config.go new file mode 100644 index 0000000..991680f --- /dev/null +++ b/util/config.go @@ -0,0 +1,60 @@ +package util + +import ( + "fmt" + "os" + "sync" + + "github.com/caarlos0/env/v10" + "github.com/go-playground/validator/v10" + "github.com/joho/godotenv" +) + +var ( + once sync.Once + + validate = validator.New() +) + +const appPrefixEnvKey = "APP_PREFIX" + +func LoadConfig[T any](envFilePath string) (*T, error) { + loadEnvFile(envFilePath) + + appPrefix := os.Getenv(appPrefixEnvKey) + if appPrefix != "" { + appPrefix = appPrefix + "_" + } + cfg := new(T) + if err := env.ParseWithOptions(cfg, env.Options{Prefix: appPrefix}); err != nil { + return cfg, fmt.Errorf("applying env from file: %w", err) + } + + if err := validate.Struct(cfg); err != nil { + return nil, fmt.Errorf("validating config: %w", err) + } + + return cfg, nil +} + +func MustLoadConfig[T any](envFilePath string) T { + cfg, err := LoadConfig[T](envFilePath) + if err != nil { + panic(fmt.Errorf("loading config: %w", err)) + } + return *cfg +} + +func loadEnvFile(path string) { + once.Do(func() { + if path == "" { + path = ".env" + } + + if err := godotenv.Load(path); err != nil { + if !os.IsNotExist(err) { + panic(fmt.Errorf("loading env file: %w", err)) + } + } + }) +}