Skip to content
This repository has been archived by the owner on May 29, 2024. It is now read-only.

Commit

Permalink
Concurrent Heuristic Execution (#170)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ethen authored Oct 17, 2023
1 parent 35b4fab commit 75565f5
Show file tree
Hide file tree
Showing 11 changed files with 161 additions and 87 deletions.
12 changes: 8 additions & 4 deletions docs/architecture/engine.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ permalink: /architecture/risk-engine

## Overview

The Risk Engine is responsible for handling and executing active heuristics. It is the primary downstream consumer of ETL output. The Risk Engine will receive data from the ETL and execute the heuristics associated with the data. If an invalidation occurs, the Risk Engine will return an `InvalidationOutcome` to the `EngineManager`. The `EngineManager` will then create an `Alert` using the `InvalidationOutcome` and publish it to the Alerting system.
The Risk Engine is responsible for handling and executing active heuristics. It is the primary downstream consumer of ETL output. The Risk Engine will receive data from the ETL and execute the heuristics associated with the data. If an invalidation occurs, the Risk Engine will return an `InvalidationOutcome` to the `EngineManager`. The `EngineManager` will then create an `Alert` using the `Activation` and publish it to the Alerting system.

The Risk Engine will execute the heuristics associated with some ingested input data and return an `InvalidationOutcome` to the `EngineManager`. The `EngineManager` will then create an `Alert` using the `InvalidationOutcome` and publish it to the Alerting system.
The Risk Engine will execute the heuristics associated with some ingested input data and return an `Activation` to the `EngineManager`. The `EngineManager` will then create an `Alert` using the `Activation` and publish it to the Alerting subsystem.

The following diagram further exemplifies this key interaction:

Expand Down Expand Up @@ -45,7 +45,7 @@ The ETL publishes `Heuristic Input` to the Risk Engine using a relay channel. Th

## Heuristic Session

An heuristic session refers to the execution and representation of a single heuristic. An heuristic session is uniquely identified by a `SUUID` and is associated with a single `PUUID`. An heuristic session is created by the `EngineManager` when a user requests to run an active session. The `EngineManager` will create a new `HeuristicSession` and pass it to the `RiskEngine` to be executed. The `RiskEngine` will then execute the heuristic session and return an `InvalidationOutcome` to the `EngineManager`. The `EngineManager` will then create an `Alert` using the `InvalidationOutcome` and publish it to the Alerting system.
A heuristic session refers to the execution and representation of a single heuristic. A heuristic session is uniquely identified by a `SUUID` and is associated with a single `PUUID`. A heuristic session is created by the `EngineManager` when a user requests to run an active session. The `EngineManager` will create a new `HeuristicSession` and pass it to the `RiskEngine` to be executed. The `RiskEngine` will then execute the heuristic session and return an `InvalidationOutcome` to the `EngineManager`. The `EngineManager` will then create an `Alert` using the `InvalidationOutcome` and publish it to the Alerting system.

## Session UUID (SUUID)

Expand All @@ -71,7 +71,7 @@ The heuristic input is a struct that contains the following fields:

## Heuristic

An heuristic is a logical execution module that defines some set of invalidation criteria. The heuristic is responsible for processing the input data and determining if an invalidation has occurred. If an invalidation has occurred, the heuristic will return a `InvalidationOutcome` that contains relevant metadata necessary for the `EngineManager` to create an `Alert`.
A heuristic is a logical execution module that defines some set of invalidation criteria. The heuristic is responsible for processing the input data and determining if an invalidation has occurred. If an invalidation has occurred, the heuristic will return a `InvalidationOutcome` that contains relevant metadata necessary for the `EngineManager` to create an `Alert`.

### Hardcoded Base Heuristic

Expand All @@ -98,6 +98,10 @@ All heuristics have a boolean property `Addressing` which determines if the heur

For example, a `balance_enforcement` heuristic session will be addressable because it only executes invalidation logic for the native ETH balance of a single address.

### Parallelism
Heuristics are executed by different worker routines in parallel to ensure that a heuristic assessment operation doesn't block upstream processing or other heuristic operations. The number of worker routines that are spawned to execute a heuristic is defined by the `ENGINE_WORKER_COUNT` environment variable. The default value is currently `6`.


### Heuristic States

State is used to represent the current state of a heuristic. The state of a heuristic is represented by a `HeuristicState` type. The following states are supported:
Expand Down
23 changes: 12 additions & 11 deletions e2e/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/base-org/pessimism/internal/client"
"github.com/base-org/pessimism/internal/config"
"github.com/base-org/pessimism/internal/core"
"github.com/base-org/pessimism/internal/engine"
"github.com/base-org/pessimism/internal/logging"
"github.com/base-org/pessimism/internal/metrics"
"github.com/base-org/pessimism/internal/state"
Expand Down Expand Up @@ -90,8 +91,6 @@ func CreateL2TestSuite(t *testing.T) *L2TestSuite {

pagerdutyServer := NewTestPagerDutyServer("127.0.0.1", 0)

appCfg.AlertConfig.RoutingCfgPath = ""

slackURL := fmt.Sprintf("http://127.0.0.1:%d", slackServer.Port)
pagerdutyURL := fmt.Sprintf("http://127.0.0.1:%d", pagerdutyServer.Port)

Expand Down Expand Up @@ -170,8 +169,6 @@ func CreateSysTestSuite(t *testing.T) *SysTestSuite {

pagerdutyServer := NewTestPagerDutyServer("127.0.0.1", 0)

appCfg.AlertConfig.RoutingCfgPath = ""

slackURL := fmt.Sprintf("http://127.0.0.1:%d", slackServer.Port)
pagerdutyURL := fmt.Sprintf("http://127.0.0.1:%d", pagerdutyServer.Port)

Expand Down Expand Up @@ -214,14 +211,17 @@ func DefaultTestConfig() *config.Config {
l1PollInterval := 900
l2PollInterval := 300
maxPipelines := 10
workerCount := 4

return &config.Config{
Environment: core.Development,
BootStrapPath: "",
SystemConfig: &subsystem.Config{
MaxPipelineCount: maxPipelines,
L2PollInterval: l2PollInterval,
L1PollInterval: l1PollInterval,
AlertConfig: &alert.Config{
PagerdutyAlertEventsURL: "",
RoutingCfgPath: "",
},
EngineConfig: &engine.Config{
WorkerCount: workerCount,
},
MetricsConfig: &metrics.Config{
Enabled: false,
Expand All @@ -232,9 +232,10 @@ func DefaultTestConfig() *config.Config {
Host: "localhost",
Port: 0,
},
AlertConfig: &alert.Config{
PagerdutyAlertEventsURL: "",
RoutingCfgPath: "",
SystemConfig: &subsystem.Config{
MaxPipelineCount: maxPipelines,
L2PollInterval: l2PollInterval,
L1PollInterval: l1PollInterval,
},
}
}
Expand Down
8 changes: 4 additions & 4 deletions internal/app/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,13 @@ func InitializeETL(ctx context.Context, transit chan core.HeuristicInput) pipeli
}

// InitializeEngine ... Performs dependency injection to build engine struct
func InitializeEngine(ctx context.Context, transit chan core.Alert) engine.Manager {
func InitializeEngine(ctx context.Context, cfg *config.Config, transit chan core.Alert) engine.Manager {
store := engine.NewSessionStore()
am := engine.NewAddressingMap()
re := engine.NewHardCodedEngine()
re := engine.NewHardCodedEngine(transit)
it := e_registry.NewHeuristicTable()

return engine.NewManager(ctx, re, am, store, it, transit)
return engine.NewManager(ctx, cfg.EngineConfig, re, am, store, it, transit)
}

// NewPessimismApp ... Performs dependency injection to build app struct
Expand All @@ -108,7 +108,7 @@ func NewPessimismApp(ctx context.Context, cfg *config.Config) (*Application, fun
return nil, nil, err
}

engine := InitializeEngine(ctx, alerting.Transit())
engine := InitializeEngine(ctx, cfg, alerting.Transit())
etl := InitializeETL(ctx, engine.Transit())

m := subsystem.NewManager(ctx, cfg.SystemConfig, etl, engine, alerting)
Expand Down
34 changes: 22 additions & 12 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/base-org/pessimism/internal/api/server"
"github.com/base-org/pessimism/internal/client"
"github.com/base-org/pessimism/internal/core"
"github.com/base-org/pessimism/internal/engine"
"github.com/base-org/pessimism/internal/logging"
"github.com/base-org/pessimism/internal/metrics"
"github.com/base-org/pessimism/internal/subsystem"
Expand All @@ -23,7 +24,10 @@ import (
)

// TrueEnvVal ... Represents the encoded string value for true (ie. 1)
const trueEnvVal = "1"
const (
trueEnvVal = "1"
maxEngineWorkerCount = 6
)

// Config ... Application level configuration defined by `FilePath` value
// TODO - Consider renaming to "environment config"
Expand All @@ -33,6 +37,7 @@ type Config struct {

AlertConfig *alert.Config
ClientConfig *client.Config
EngineConfig *engine.Config
MetricsConfig *metrics.Config
ServerConfig *server.Config
SystemConfig *subsystem.Config
Expand All @@ -55,10 +60,17 @@ func NewConfig(fileName core.FilePath) *Config {
RoutingParams: nil, // This is populated after the config is created (see IngestAlertConfig)
},

SystemConfig: &subsystem.Config{
MaxPipelineCount: getEnvInt("MAX_PIPELINE_COUNT"),
L1PollInterval: getEnvInt("L1_POLL_INTERVAL"),
L2PollInterval: getEnvInt("L2_POLL_INTERVAL"),
ClientConfig: &client.Config{
L1RpcEndpoint: getEnvStr("L1_RPC_ENDPOINT"),
L2RpcEndpoint: getEnvStr("L2_RPC_ENDPOINT"),
IndexerCfg: &indexer_client.Config{
BaseURL: getEnvStrWithDefault("INDEXER_URL", ""),
PaginationLimit: getEnvIntWithDefault("INDEXER_PAGINATION_LIMIT", 0),
},
},

EngineConfig: &engine.Config{
WorkerCount: getEnvIntWithDefault("ENGINE_WORKER_COUNT", maxEngineWorkerCount),
},

MetricsConfig: &metrics.Config{
Expand All @@ -75,13 +87,11 @@ func NewConfig(fileName core.FilePath) *Config {
ReadTimeout: getEnvInt("SERVER_READ_TIMEOUT"),
WriteTimeout: getEnvInt("SERVER_WRITE_TIMEOUT"),
},
ClientConfig: &client.Config{
L1RpcEndpoint: getEnvStr("L1_RPC_ENDPOINT"),
L2RpcEndpoint: getEnvStr("L2_RPC_ENDPOINT"),
IndexerCfg: &indexer_client.Config{
BaseURL: getEnvStrWithDefault("INDEXER_URL", ""),
PaginationLimit: getEnvIntWithDefault("INDEXER_PAGINATION_LIMIT", 0),
},

SystemConfig: &subsystem.Config{
MaxPipelineCount: getEnvInt("MAX_PIPELINE_COUNT"),
L1PollInterval: getEnvInt("L1_POLL_INTERVAL"),
L2PollInterval: getEnvInt("L2_POLL_INTERVAL"),
},
}

Expand Down
10 changes: 5 additions & 5 deletions internal/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,23 +77,23 @@ type HeuristicInput struct {
Input TransitData
}

// EngineInputRelay ... Represents a inter-subsystem
// ExecInputRelay ... Represents a inter-subsystem
// relay used to bind final ETL pipeline outputs to risk engine inputs
type EngineInputRelay struct {
type ExecInputRelay struct {
pUUID PUUID
outChan chan HeuristicInput
}

// NewEngineRelay ... Initializer
func NewEngineRelay(pUUID PUUID, outChan chan HeuristicInput) *EngineInputRelay {
return &EngineInputRelay{
func NewEngineRelay(pUUID PUUID, outChan chan HeuristicInput) *ExecInputRelay {
return &ExecInputRelay{
pUUID: pUUID,
outChan: outChan,
}
}

// RelayTransitData ... Creates heuristic input from transit data to send to risk engine
func (eir *EngineInputRelay) RelayTransitData(td TransitData) error {
func (eir *ExecInputRelay) RelayTransitData(td TransitData) error {
hi := HeuristicInput{
PUUID: eir.pUUID,
Input: td,
Expand Down
70 changes: 64 additions & 6 deletions internal/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package engine

import (
"context"
"time"

"github.com/base-org/pessimism/internal/core"
"github.com/base-org/pessimism/internal/engine/heuristic"
Expand All @@ -20,36 +21,53 @@ const (
Dynamic
)

// ExecInput ... Parameter wrapper for engine execution input
type ExecInput struct {
ctx context.Context
hi core.HeuristicInput
h heuristic.Heuristic
}

// RiskEngine ... Execution engine interface
type RiskEngine interface {
Type() Type
Execute(context.Context, core.TransitData,
heuristic.Heuristic) (*core.Activation, bool)
AddWorkerIngress(chan ExecInput)
EventLoop(context.Context)
}

// hardCodedEngine ... Hard coded execution engine
// IE: native hardcoded application code for heuristic implementation
type hardCodedEngine struct {
// TODO: Add any engine specific fields here
heuristicIn chan ExecInput
alertEgress chan core.Alert
}

// NewHardCodedEngine ... Initializer
func NewHardCodedEngine() RiskEngine {
return &hardCodedEngine{}
func NewHardCodedEngine(egress chan core.Alert) RiskEngine {
return &hardCodedEngine{
alertEgress: egress,
}
}

// Type ... Returns the engine type
func (e *hardCodedEngine) Type() Type {
func (hce *hardCodedEngine) Type() Type {
return HardCoded
}

// AddWorkerIngress ... Adds a worker ingress channel
func (hce *hardCodedEngine) AddWorkerIngress(ingress chan ExecInput) {
hce.heuristicIn = ingress
}

// Execute ... Executes the heuristic
func (e *hardCodedEngine) Execute(ctx context.Context, data core.TransitData,
func (hce *hardCodedEngine) Execute(ctx context.Context, data core.TransitData,
h heuristic.Heuristic) (*core.Activation, bool) {
logger := logging.WithContext(ctx)

logger.Debug("Performing heuristic activation",
zap.String("suuid", h.SUUID().String()))
zap.String(logging.SUUIDKey, h.SUUID().String()))
outcome, activated, err := h.Assess(data)
if err != nil {
logger.Error("Failed to perform activation option for heuristic", zap.Error(err))
Expand All @@ -62,3 +80,43 @@ func (e *hardCodedEngine) Execute(ctx context.Context, data core.TransitData,

return outcome, activated
}

// EventLoop ... Event loop for the risk engine
func (hce *hardCodedEngine) EventLoop(ctx context.Context) {
logger := logging.WithContext(ctx)

for {
select {
case <-ctx.Done(): // Context cancelled
logger.Info("Risk engine event loop cancelled")
return

case execInput := <-hce.heuristicIn: // Heuristic input received
logger.Debug("Heuristic input received",
zap.String(logging.SUUIDKey, execInput.h.SUUID().String()))

// (1) Execute heuristic
start := time.Now()
outcome, activated := hce.Execute(ctx, execInput.hi.Input, execInput.h)
metrics.WithContext(ctx).RecordHeuristicRun(execInput.h)
metrics.WithContext(ctx).RecordInvExecutionTime(execInput.h, float64(time.Since(start).Nanoseconds()))

// (2) Send alert if activated
if activated {
alert := core.Alert{
Timestamp: outcome.TimeStamp,
SUUID: execInput.h.SUUID(),
Content: outcome.Message,
PUUID: execInput.hi.PUUID,
Ptype: execInput.hi.PUUID.PipelineType(),
}

logger.Warn("Heuristic alert",
zap.String(logging.SUUIDKey, execInput.h.SUUID().String()),
zap.String("message", outcome.Message))

hce.alertEgress <- alert
}
}
}
}
2 changes: 1 addition & 1 deletion internal/engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func createTestSuite(t *testing.T) *testSuite {

return &testSuite{
ctrl: ctrl,
re: engine.NewHardCodedEngine(),
re: engine.NewHardCodedEngine(make(chan core.Alert)),
mockHeuristic: mocks.NewMockHeuristic(ctrl),
}
}
Expand Down
Loading

0 comments on commit 75565f5

Please sign in to comment.