Skip to content
This repository has been archived by the owner on May 29, 2024. It is now read-only.

Commit

Permalink
[epociask/parallelized_heuristic_execution] Updated docs, fixed tests…
Browse files Browse the repository at this point in the history
… & lint errors
  • Loading branch information
Ethen Pociask committed Oct 12, 2023
1 parent 89ebfd6 commit fb6fc18
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 51 deletions.
12 changes: 8 additions & 4 deletions docs/architecture/engine.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ permalink: /architecture/risk-engine

## Overview

The Risk Engine is responsible for handling and executing active heuristics. It is the primary downstream consumer of ETL output. The Risk Engine will receive data from the ETL and execute the heuristics associated with the data. If an invalidation occurs, the Risk Engine will return an `InvalidationOutcome` to the `EngineManager`. The `EngineManager` will then create an `Alert` using the `InvalidationOutcome` and publish it to the Alerting system.
The Risk Engine is responsible for handling and executing active heuristics. It is the primary downstream consumer of ETL output. The Risk Engine will receive data from the ETL and execute the heuristics associated with the data. If an invalidation occurs, the Risk Engine will return an `InvalidationOutcome` to the `EngineManager`. The `EngineManager` will then create an `Alert` using the `Activation` and publish it to the Alerting system.

The Risk Engine will execute the heuristics associated with some ingested input data and return an `InvalidationOutcome` to the `EngineManager`. The `EngineManager` will then create an `Alert` using the `InvalidationOutcome` and publish it to the Alerting system.
The Risk Engine will execute the heuristics associated with some ingested input data and return an `Activation` to the `EngineManager`. The `EngineManager` will then create an `Alert` using the `Activation` and publish it to the Alerting subsystem.

The following diagram further exemplifies this key interaction:

Expand Down Expand Up @@ -45,7 +45,7 @@ The ETL publishes `Heuristic Input` to the Risk Engine using a relay channel. Th

## Heuristic Session

An heuristic session refers to the execution and representation of a single heuristic. An heuristic session is uniquely identified by a `SUUID` and is associated with a single `PUUID`. An heuristic session is created by the `EngineManager` when a user requests to run an active session. The `EngineManager` will create a new `HeuristicSession` and pass it to the `RiskEngine` to be executed. The `RiskEngine` will then execute the heuristic session and return an `InvalidationOutcome` to the `EngineManager`. The `EngineManager` will then create an `Alert` using the `InvalidationOutcome` and publish it to the Alerting system.
A heuristic session refers to the execution and representation of a single heuristic. A heuristic session is uniquely identified by a `SUUID` and is associated with a single `PUUID`. A heuristic session is created by the `EngineManager` when a user requests to run an active session. The `EngineManager` will create a new `HeuristicSession` and pass it to the `RiskEngine` to be executed. The `RiskEngine` will then execute the heuristic session and return an `InvalidationOutcome` to the `EngineManager`. The `EngineManager` will then create an `Alert` using the `InvalidationOutcome` and publish it to the Alerting system.

## Session UUID (SUUID)

Expand All @@ -71,7 +71,7 @@ The heuristic input is a struct that contains the following fields:

## Heuristic

An heuristic is a logical execution module that defines some set of invalidation criteria. The heuristic is responsible for processing the input data and determining if an invalidation has occurred. If an invalidation has occurred, the heuristic will return a `InvalidationOutcome` that contains relevant metadata necessary for the `EngineManager` to create an `Alert`.
A heuristic is a logical execution module that defines some set of invalidation criteria. The heuristic is responsible for processing the input data and determining if an invalidation has occurred. If an invalidation has occurred, the heuristic will return a `InvalidationOutcome` that contains relevant metadata necessary for the `EngineManager` to create an `Alert`.

### Hardcoded Base Heuristic

Expand All @@ -98,6 +98,10 @@ All heuristics have a boolean property `Addressing` which determines if the heur

For example, a `balance_enforcement` heuristic session will be addressable because it only executes invalidation logic for the native ETH balance of a single address.

### Parallelism
Heuristics are executed by different worker routines in parallel to ensure that a heuristic assessment operation doesn't block upstream processing or other heuristic operations. The number of worker routines that are spawned to execute a heuristic is defined by the `ENGINE_WORKER_COUNT` environment variable. The default value is currently `6`.


### Heuristic States

State is used to represent the current state of a heuristic. The state of a heuristic is represented by a `HeuristicState` type. The following states are supported:
Expand Down
6 changes: 3 additions & 3 deletions internal/app/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,13 @@ func InitializeETL(ctx context.Context, transit chan core.HeuristicInput) pipeli
}

// InitializeEngine ... Performs dependency injection to build engine struct
func InitializeEngine(ctx context.Context, transit chan core.Alert) engine.Manager {
func InitializeEngine(ctx context.Context, cfg *config.Config, transit chan core.Alert) engine.Manager {
store := engine.NewSessionStore()
am := engine.NewAddressingMap()
re := engine.NewHardCodedEngine(transit)
it := e_registry.NewHeuristicTable()

return engine.NewManager(ctx, re, am, store, it, transit)
return engine.NewManager(ctx, cfg.EngineConfig, re, am, store, it, transit)
}

// NewPessimismApp ... Performs dependency injection to build app struct
Expand All @@ -108,7 +108,7 @@ func NewPessimismApp(ctx context.Context, cfg *config.Config) (*Application, fun
return nil, nil, err
}

engine := InitializeEngine(ctx, alerting.Transit())
engine := InitializeEngine(ctx, cfg, alerting.Transit())
etl := InitializeETL(ctx, engine.Transit())

m := subsystem.NewManager(ctx, cfg.SystemConfig, etl, engine, alerting)
Expand Down
34 changes: 22 additions & 12 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/base-org/pessimism/internal/api/server"
"github.com/base-org/pessimism/internal/client"
"github.com/base-org/pessimism/internal/core"
"github.com/base-org/pessimism/internal/engine"
"github.com/base-org/pessimism/internal/logging"
"github.com/base-org/pessimism/internal/metrics"
"github.com/base-org/pessimism/internal/subsystem"
Expand All @@ -23,7 +24,10 @@ import (
)

// TrueEnvVal ... Represents the encoded string value for true (ie. 1)
const trueEnvVal = "1"
const (
trueEnvVal = "1"
maxEngineWorkerCount = 6
)

// Config ... Application level configuration defined by `FilePath` value
// TODO - Consider renaming to "environment config"
Expand All @@ -33,6 +37,7 @@ type Config struct {

AlertConfig *alert.Config
ClientConfig *client.Config
EngineConfig *engine.Config
MetricsConfig *metrics.Config
ServerConfig *server.Config
SystemConfig *subsystem.Config
Expand All @@ -55,10 +60,17 @@ func NewConfig(fileName core.FilePath) *Config {
RoutingParams: nil, // This is populated after the config is created (see IngestAlertConfig)
},

SystemConfig: &subsystem.Config{
MaxPipelineCount: getEnvInt("MAX_PIPELINE_COUNT"),
L1PollInterval: getEnvInt("L1_POLL_INTERVAL"),
L2PollInterval: getEnvInt("L2_POLL_INTERVAL"),
ClientConfig: &client.Config{
L1RpcEndpoint: getEnvStr("L1_RPC_ENDPOINT"),
L2RpcEndpoint: getEnvStr("L2_RPC_ENDPOINT"),
IndexerCfg: &indexer_client.Config{
BaseURL: getEnvStrWithDefault("INDEXER_URL", ""),
PaginationLimit: getEnvIntWithDefault("INDEXER_PAGINATION_LIMIT", 0),
},
},

EngineConfig: &engine.Config{
WorkerCount: getEnvIntWithDefault("ENGINE_WORKER_COUNT", maxEngineWorkerCount),
},

MetricsConfig: &metrics.Config{
Expand All @@ -75,13 +87,11 @@ func NewConfig(fileName core.FilePath) *Config {
ReadTimeout: getEnvInt("SERVER_READ_TIMEOUT"),
WriteTimeout: getEnvInt("SERVER_WRITE_TIMEOUT"),
},
ClientConfig: &client.Config{
L1RpcEndpoint: getEnvStr("L1_RPC_ENDPOINT"),
L2RpcEndpoint: getEnvStr("L2_RPC_ENDPOINT"),
IndexerCfg: &indexer_client.Config{
BaseURL: getEnvStrWithDefault("INDEXER_URL", ""),
PaginationLimit: getEnvIntWithDefault("INDEXER_PAGINATION_LIMIT", 0),
},

SystemConfig: &subsystem.Config{
MaxPipelineCount: getEnvInt("MAX_PIPELINE_COUNT"),
L1PollInterval: getEnvInt("L1_POLL_INTERVAL"),
L2PollInterval: getEnvInt("L2_POLL_INTERVAL"),
},
}

Expand Down
10 changes: 5 additions & 5 deletions internal/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,23 +77,23 @@ type HeuristicInput struct {
Input TransitData
}

// EngineInputRelay ... Represents a inter-subsystem
// ExecInputRelay ... Represents a inter-subsystem
// relay used to bind final ETL pipeline outputs to risk engine inputs
type EngineInputRelay struct {
type ExecInputRelay struct {
pUUID PUUID
outChan chan HeuristicInput
}

// NewEngineRelay ... Initializer
func NewEngineRelay(pUUID PUUID, outChan chan HeuristicInput) *EngineInputRelay {
return &EngineInputRelay{
func NewEngineRelay(pUUID PUUID, outChan chan HeuristicInput) *ExecInputRelay {
return &ExecInputRelay{
pUUID: pUUID,
outChan: outChan,
}
}

// RelayTransitData ... Creates heuristic input from transit data to send to risk engine
func (eir *EngineInputRelay) RelayTransitData(td TransitData) error {
func (eir *ExecInputRelay) RelayTransitData(td TransitData) error {
hi := HeuristicInput{
PUUID: eir.pUUID,
Input: td,
Expand Down
23 changes: 8 additions & 15 deletions internal/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,32 +21,26 @@ const (
Dynamic
)

// EngineInput ... Parameter wrapper for engine execution input
type EngineInput struct {
// ExecInput ... Parameter wrapper for engine execution input
type ExecInput struct {
ctx context.Context
hi core.HeuristicInput
h heuristic.Heuristic
}

// EngineOutput ... Parameter wrapper for engine execution output
type EngineOutput struct {
act *core.Activation
activated bool
}

// RiskEngine ... Execution engine interface
type RiskEngine interface {
Type() Type
Execute(context.Context, core.TransitData,
heuristic.Heuristic) (*core.Activation, bool)
AddWorkerIngress(chan EngineInput)
AddWorkerIngress(chan ExecInput)
EventLoop(context.Context)
}

// hardCodedEngine ... Hard coded execution engine
// IE: native hardcoded application code for heuristic implementation
type hardCodedEngine struct {
heuristicIn chan EngineInput
heuristicIn chan ExecInput
alertEgress chan core.Alert
}

Expand All @@ -58,17 +52,17 @@ func NewHardCodedEngine(egress chan core.Alert) RiskEngine {
}

// Type ... Returns the engine type
func (e *hardCodedEngine) Type() Type {
func (hce *hardCodedEngine) Type() Type {
return HardCoded
}

// AddWorkerIngress ... Adds a worker ingress channel
func (e *hardCodedEngine) AddWorkerIngress(ingress chan EngineInput) {
e.heuristicIn = ingress
func (hce *hardCodedEngine) AddWorkerIngress(ingress chan ExecInput) {
hce.heuristicIn = ingress
}

// Execute ... Executes the heuristic
func (e *hardCodedEngine) Execute(ctx context.Context, data core.TransitData,
func (hce *hardCodedEngine) Execute(ctx context.Context, data core.TransitData,
h heuristic.Heuristic) (*core.Activation, bool) {
logger := logging.WithContext(ctx)

Expand Down Expand Up @@ -123,7 +117,6 @@ func (hce *hardCodedEngine) EventLoop(ctx context.Context) {

hce.alertEgress <- alert
}

}
}
}
17 changes: 9 additions & 8 deletions internal/engine/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ import (
"go.uber.org/zap"
)

type Config struct {
WorkerCount int
}

// Manager ... Engine manager interface
type Manager interface {
GetInputType(ht core.HeuristicType) (core.RegisterType, error)
Expand Down Expand Up @@ -45,7 +49,7 @@ type engineManager struct {
alertEgress chan core.Alert

// Used to send execution requests to engine worker subscribers
workerEgress chan EngineInput
workerEgress chan ExecInput

metrics metrics.Metricer
engine RiskEngine
Expand All @@ -55,7 +59,7 @@ type engineManager struct {
}

// NewManager ... Initializer
func NewManager(ctx context.Context, engine RiskEngine, addr AddressingMap,
func NewManager(ctx context.Context, cfg *Config, engine RiskEngine, addr AddressingMap,
store SessionStore, it registry.HeuristicTable, alertEgress chan core.Alert) Manager {
ctx, cancel := context.WithCancel(ctx)

Expand All @@ -64,19 +68,16 @@ func NewManager(ctx context.Context, engine RiskEngine, addr AddressingMap,
cancel: cancel,
alertEgress: alertEgress,
etlIngress: make(chan core.HeuristicInput),
workerEgress: make(chan EngineInput),
workerEgress: make(chan ExecInput),
engine: engine,
addresser: addr,
store: store,
heuristics: it,
metrics: metrics.WithContext(ctx),
}

// TODO - Make this configurable
count := 4

// Start engine worker pool for concurrent heuristic execution
for i := 0; i < count; i++ {
for i := 0; i < cfg.WorkerCount; i++ {
logging.WithContext(ctx).Debug("Starting engine worker routine", zap.Int("worker", i))

engine.AddWorkerIngress(em.workerEgress)
Expand Down Expand Up @@ -285,7 +286,7 @@ func (em *engineManager) executeNonAddressHeuristics(ctx context.Context, data c

// executeHeuristic ... Sends heuristic input to engine worker pool for execution
func (em *engineManager) executeHeuristic(ctx context.Context, data core.HeuristicInput, h heuristic.Heuristic) {
ei := EngineInput{
ei := ExecInput{
ctx: ctx,
hi: data,
h: h,
Expand Down
3 changes: 2 additions & 1 deletion internal/engine/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ func Test_EventLoop(t *testing.T) {
ctx = context.WithValue(ctx, core.State, ss)

em := engine.NewManager(ctx,
engine.NewHardCodedEngine(make(chan core.Alert)),
&engine.Config{WorkerCount: 1},
engine.NewHardCodedEngine(alertChan),
engine.NewAddressingMap(),
engine.NewSessionStore(),
registry.NewHeuristicTable(),
Expand Down
2 changes: 1 addition & 1 deletion internal/etl/component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type Component interface {
Type() core.ComponentType

// AddRelay ... Adds an engine relay to component egress routing
AddRelay(relay *core.EngineInputRelay) error
AddRelay(relay *core.ExecInputRelay) error

// AddEgress ...
AddEgress(core.CUUID, chan core.TransitData) error
Expand Down
4 changes: 2 additions & 2 deletions internal/etl/component/egress.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
type egressHandler struct {
egresses map[core.ComponentPID]chan core.TransitData

relay *core.EngineInputRelay
relay *core.ExecInputRelay
}

// newEgress ... Initializer
Expand Down Expand Up @@ -87,7 +87,7 @@ func (eh *egressHandler) HasEngineRelay() bool {
}

// AddRelay ... Adds a relay assuming no existing ones
func (eh *egressHandler) AddRelay(relay *core.EngineInputRelay) error {
func (eh *egressHandler) AddRelay(relay *core.ExecInputRelay) error {
if eh.HasEngineRelay() {
return fmt.Errorf(engineEgressExistsErr)
}
Expand Down

0 comments on commit fb6fc18

Please sign in to comment.