Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: consolidate oracle and provider orchestrator #503

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
a4b72cd
UpdateWithMarketMap -> UpdateMarketMap (did someone get mogged by the…
technicallyty Jun 6, 2024
88eeabe
some comments
technicallyty Jun 7, 2024
5ac1364
idk why this go.mod always wants to update
Jun 7, 2024
171ec92
move oracle logic into orchestrator, make aggregator safe for concurr…
Jun 10, 2024
c03f93b
fold orchestrator into oracle
Jun 10, 2024
adf3e19
go mod tidy
Jun 10, 2024
21a7d78
fix oracle construction in main, update comments, etc
Jun 10, 2024
f405cb2
providers -> priceProviders
technicallyty Jun 10, 2024
4c38aac
checkpoint. the last test in lifecycle_test is hanging. rogue gophers…
technicallyty Jun 11, 2024
c262e9b
lol fixed
technicallyty Jun 11, 2024
018140c
tests pass
technicallyty Jun 11, 2024
c1bb4e9
tests passing
technicallyty Jun 11, 2024
dce614b
address or remove todos
technicallyty Jun 11, 2024
8e45ac8
fix server_test
technicallyty Jun 11, 2024
39b7477
Merge branch 'main' into tyler/blo-1329-move-oracle-service-into-orch…
technicallyty Jun 11, 2024
79f7900
linter
technicallyty Jun 11, 2024
42a8f0e
Merge branch 'tyler/blo-1329-move-oracle-service-into-orchestrator' o…
technicallyty Jun 11, 2024
cbfb7ac
address comments
technicallyty Jun 11, 2024
671f41d
final readme update
technicallyty Jun 11, 2024
6ce4831
race condi changes
technicallyty Jun 11, 2024
5389f43
maybe this works
technicallyty Jun 11, 2024
a3653ca
revert mutex changes
technicallyty Jun 11, 2024
c8d871e
revert
technicallyty Jun 11, 2024
8e362b0
Merge branch 'main' into tyler/blo-1329-move-oracle-service-into-orch…
technicallyty Jun 12, 2024
a8c3afc
fix startup and stuff
technicallyty Jun 12, 2024
c28b9a6
Merge branch 'main' into tyler/blo-1329-move-oracle-service-into-orch…
technicallyty Jun 12, 2024
0de8401
remove start oracle assertion
technicallyty Jun 12, 2024
7bb0c3f
Merge branch 'tyler/blo-1329-move-oracle-service-into-orchestrator' o…
technicallyty Jun 12, 2024
77c4a36
remove test
technicallyty Jun 12, 2024
c8190c2
Merge branch 'main' into tyler/blo-1329-move-oracle-service-into-orch…
davidterpay Jun 12, 2024
12ba0b0
move interfaces to interface.go
technicallyty Jun 13, 2024
ff9d0f1
readd comment to setMainCtx
technicallyty Jun 13, 2024
bf436df
comment on setMainCtx
technicallyty Jun 13, 2024
8cb65ab
add read lock to GetLastSyncTime and nil check aggregator
technicallyty Jun 13, 2024
5370d09
remove WithMarketMapProvider option. unused.
technicallyty Jun 13, 2024
3ab9e7a
Update cmd/slinky/main.go
technicallyty Jun 13, 2024
f9c7bd4
add godocs to other options
technicallyty Jun 13, 2024
c7f20d3
fix tests, add a test for aggregator nil check
technicallyty Jun 13, 2024
8b65487
Merge branch 'main' into tyler/blo-1329-move-oracle-service-into-orch…
technicallyty Jun 13, 2024
2ef577c
Merge branch 'main' into tyler/blo-1329-move-oracle-service-into-orch…
Jun 14, 2024
ff444e9
Merge branch 'main' into tyler/blo-1329-move-oracle-service-into-orch…
davidterpay Jun 17, 2024
bc3c6ca
Merge branch 'main' into tyler/blo-1329-move-oracle-service-into-orch…
Jun 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 20 additions & 36 deletions cmd/slinky/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

"github.com/skip-mev/slinky/cmd/build"
oraclemetrics "github.com/skip-mev/slinky/oracle/metrics"
"github.com/skip-mev/slinky/oracle/orchestrator"
"github.com/skip-mev/slinky/pkg/log"
oraclemath "github.com/skip-mev/slinky/pkg/math/oracle"
oraclefactory "github.com/skip-mev/slinky/providers/factories/oracle"
Expand Down Expand Up @@ -254,50 +253,35 @@
return fmt.Errorf("failed to create data aggregator: %w", err)
}

// Define the orchestrator and oracle options. These determine how the orchestrator and oracle are created & executed.
orchestratorOpts := []orchestrator.Option{
orchestrator.WithLogger(logger),
orchestrator.WithMarketMap(marketCfg),
orchestrator.WithPriceAPIQueryHandlerFactory(oraclefactory.APIQueryHandlerFactory), // Replace with custom API query handler factory.
orchestrator.WithPriceWebSocketQueryHandlerFactory(oraclefactory.WebSocketQueryHandlerFactory), // Replace with custom websocket query handler factory.
orchestrator.WithMarketMapperFactory(oraclefactory.MarketMapProviderFactory),
orchestrator.WithAggregator(aggregator),
}
if updateMarketCfgPath != "" {
orchestratorOpts = append(orchestratorOpts, orchestrator.WithWriteTo(updateMarketCfgPath))
}
// Define the oracle options. These determine how the oracle is created & executed.
oracleOpts := []oracle.Option{
oracle.WithLogger(logger),
oracle.WithUpdateInterval(cfg.UpdateInterval),
oracle.WithMarketMap(marketCfg),
oracle.WithPriceAPIQueryHandlerFactory(oraclefactory.APIQueryHandlerFactory), // Replace with custom API query handler factory.
oracle.WithPriceWebSocketQueryHandlerFactory(oraclefactory.WebSocketQueryHandlerFactory), // Replace with custom websocket query handler factory.
oracle.WithMarketMapperFactory(oraclefactory.MarketMapProviderFactory),

Check warning on line 262 in cmd/slinky/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/slinky/main.go#L259-L262

Added lines #L259 - L262 were not covered by tests
oracle.WithMetrics(metrics),
oracle.WithMaxCacheAge(cfg.MaxPriceAge),
oracle.WithPriceAggregator(aggregator),
}

// Create the orchestrator and start the orchestrator.
orch, err := orchestrator.NewProviderOrchestrator(
cfg,
orchestratorOpts...,
)
if err != nil {
return fmt.Errorf("failed to create provider orchestrator: %w", err)
}

if err := orch.Start(ctx); err != nil {
return fmt.Errorf("failed to start provider orchestrator: %w", err)
if updateMarketCfgPath != "" {
oracleOpts = append(oracleOpts, oracle.WithWriteTo(updateMarketCfgPath))

Check warning on line 266 in cmd/slinky/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/slinky/main.go#L265-L266

Added lines #L265 - L266 were not covered by tests
}
defer orch.Stop()

// Create the oracle and start the oracle server.
oracleOpts = append(
oracleOpts,
oracle.WithProviders(orch.GetPriceProviders()),
oracle.WithMarketMapGetter(orch.GetMarketMap),
// Create the oracle and start the oracle.
orc, err := oracle.New(
cfg,
aggregator,
oracleOpts...,

Check warning on line 273 in cmd/slinky/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/slinky/main.go#L270-L273

Added lines #L270 - L273 were not covered by tests
)
orc, err := oracle.New(oracleOpts...)
if err != nil {
return fmt.Errorf("failed to create oracle: %w", err)
}
go func() {
if err := orc.Start(ctx); err != nil {
logger.Fatal("failed to start oracle", zap.Error(err))

Check warning on line 280 in cmd/slinky/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/slinky/main.go#L278-L280

Added lines #L278 - L280 were not covered by tests
}
}()
defer orc.Stop()

Check warning on line 283 in cmd/slinky/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/slinky/main.go#L283

Added line #L283 was not covered by tests

srv := oracleserver.NewOracleServer(orc, logger)

// cancel oracle on interrupt or terminate
Expand Down Expand Up @@ -337,7 +321,7 @@
}()
}

// start oracle + server, and wait for either to finish
// start server (blocks).
if err := srv.StartServer(ctx, cfg.Host, cfg.Port); err != nil {
logger.Error("stopping server", zap.Error(err))
}
Expand Down
26 changes: 26 additions & 0 deletions oracle/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Oracle

Responsibilities of the Oracle:

* Spinning up the necessary services (e.g. the sidecar, the price fetchers, etc.).
* Managing the lifecycle of the providers.
* Determining the set of markets that need to be fetched and updating the providers accordingly.

## Configuration

At a high level the oracle is configured with a `oracle.json` file that contains all providers that need to be instantiated. To read more about the configuration of `oracle.json`, please refer to the [oracle configuration documentation](config/README.md).

Each provider is instantiated using the `PriceAPIQueryHandlerFactory`, `PriceWebSocketQueryHandlerFactory`, and `MarketMapFactory` factory functions. Think of these as the constructors for the providers.

* `PriceAPIQueryHandlerFactory` - This is used to create the API query handler for the provider - which is then passed into a base provider.
* `PriceWebSocketQueryHandlerFactory` - This is used to create the WebSocket query handler for the provider - which is then passed into a base provider.
* `MarketMapFactory` - This is used to create the market map provider.

## Lifecycle

The oracle can be initialized with an option of `WithMarketMap` which allows each provider to be instantiated with a predetermined set of markets. If this option is not provided, the oracle will fetch the markets from the market map provider. **Both options can be set.**

The oracle will then start each provider in a separate goroutine. Additionally, if the oracle has a market map provider, it will start a goroutine that will periodically fetch the markets from the market map provider and update the providers accordingly.

All providers are running concurrently and will do so until the main context is canceled (what is passed into `Start`). If the oracle is canceled, it will cancel all providers and wait for them to finish before returning.

25 changes: 22 additions & 3 deletions oracle/orchestrator/helpers_test.go → oracle/helpers_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package orchestrator_test
package oracle_test

import (
"testing"
Expand All @@ -8,8 +8,8 @@ import (

"github.com/stretchr/testify/require"

"github.com/skip-mev/slinky/oracle"
"github.com/skip-mev/slinky/oracle/config"
"github.com/skip-mev/slinky/oracle/orchestrator"
oracletypes "github.com/skip-mev/slinky/oracle/types"
slinkytypes "github.com/skip-mev/slinky/pkg/types"
"github.com/skip-mev/slinky/providers/apis/binance"
Expand Down Expand Up @@ -196,14 +196,33 @@ var (
},
}
)
var _ oracle.PriceAggregator = &noOpPriceAggregator{}

type noOpPriceAggregator struct{}

func (n noOpPriceAggregator) SetProviderPrices(_ string, _ oracletypes.Prices) {
}

func (n noOpPriceAggregator) UpdateMarketMap(_ mmtypes.MarketMap) {
}

func (n noOpPriceAggregator) AggregatePrices() {
}

func (n noOpPriceAggregator) GetPrices() oracletypes.Prices {
return oracletypes.Prices{}
}

func (n noOpPriceAggregator) Reset() {
}

technicallyty marked this conversation as resolved.
Show resolved Hide resolved
func checkProviderState(
t *testing.T,
expectedTickers []oracletypes.ProviderTicker,
expectedName string,
expectedType providertypes.ProviderType,
isRunning bool,
state orchestrator.ProviderState,
state oracle.ProviderState,
) {
t.Helper()

Expand Down
16 changes: 8 additions & 8 deletions oracle/orchestrator/init.go → oracle/init.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package orchestrator
package oracle

import (
"context"
Expand All @@ -14,7 +14,7 @@ import (
)

// Init initializes the all providers that are configured via the oracle config.
func (o *ProviderOrchestrator) Init(ctx context.Context) error {
func (o *OracleImpl) Init(ctx context.Context) error {
o.mut.Lock()
defer o.mut.Unlock()

Expand Down Expand Up @@ -45,7 +45,7 @@ func (o *ProviderOrchestrator) Init(ctx context.Context) error {
}

// createPriceProvider creates a new price provider for the given provider configuration.
func (o *ProviderOrchestrator) createPriceProvider(ctx context.Context, cfg config.ProviderConfig) error {
func (o *OracleImpl) createPriceProvider(ctx context.Context, cfg config.ProviderConfig) error {
// Create the provider market map. This creates the tickers the provider is configured to
// support.
tickers, err := types.ProviderTickersFromMarketMap(cfg.Name, o.marketMap)
Expand Down Expand Up @@ -99,8 +99,8 @@ func (o *ProviderOrchestrator) createPriceProvider(ctx context.Context, cfg conf
Cfg: cfg,
}

// Add the provider to the orchestrator.
o.providers[provider.Name()] = state
// Add the provider to the oracle.
o.priceProviders[provider.Name()] = state

o.logger.Info(
"created price provider state",
Expand All @@ -111,7 +111,7 @@ func (o *ProviderOrchestrator) createPriceProvider(ctx context.Context, cfg conf
}

// createAPIQueryHandler creates a new API query handler for the given provider configuration.
func (o *ProviderOrchestrator) createAPIQueryHandler(
func (o *OracleImpl) createAPIQueryHandler(
ctx context.Context,
cfg config.ProviderConfig,
) (types.PriceAPIQueryHandler, error) {
Expand All @@ -123,7 +123,7 @@ func (o *ProviderOrchestrator) createAPIQueryHandler(
}

// createWebSocketQueryHandler creates a new web socket query handler for the given provider configuration.
func (o *ProviderOrchestrator) createWebSocketQueryHandler(
func (o *OracleImpl) createWebSocketQueryHandler(
ctx context.Context,
cfg config.ProviderConfig,
) (types.PriceWebSocketQueryHandler, error) {
Expand All @@ -135,7 +135,7 @@ func (o *ProviderOrchestrator) createWebSocketQueryHandler(
}

// createMarketMapProvider creates a new market map provider for the given provider configuration.
func (o *ProviderOrchestrator) createMarketMapProvider(cfg config.ProviderConfig) error {
func (o *OracleImpl) createMarketMapProvider(cfg config.ProviderConfig) error {
if o.marketMapperFactory == nil {
return fmt.Errorf("cannot create market map provider; market map factory is not set")
}
Expand Down
Loading
Loading