Skip to content

Commit

Permalink
feat: Siren notification api custom template integration (#205)
Browse files Browse the repository at this point in the history
* feat : integrate with siren custom templating support
  • Loading branch information
Mryashbhardwaj authored Apr 3, 2024
1 parent 291c4e3 commit 659f137
Show file tree
Hide file tree
Showing 10 changed files with 135 additions and 96 deletions.
9 changes: 4 additions & 5 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@ type Optimus struct {
Project Project `mapstructure:"project"`
Namespaces []*Namespace `mapstructure:"namespaces"`

Server Serve `mapstructure:"serve"`
Log LogConfig `mapstructure:"log"`
Telemetry TelemetryConfig `mapstructure:"telemetry"`
EventManager EventManagerConfig `mapstructure:"event_manager"`
Dashboard string `mapstructure:"dashboard"`
Server Serve `mapstructure:"serve"`
Log LogConfig `mapstructure:"log"`
Telemetry TelemetryConfig `mapstructure:"telemetry"`
Alerting AlertingConfig `mapstructure:"alerting"`

namespaceNameToNamespace map[string]*Namespace
}
Expand Down
25 changes: 15 additions & 10 deletions config/config_server.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
package config

type ServerConfig struct {
Version Version `mapstructure:"version"`
Log LogConfig `mapstructure:"log"`
Serve Serve `mapstructure:"serve"`
Telemetry TelemetryConfig `mapstructure:"telemetry"`
EventManager EventManagerConfig `mapstructure:"event_manager"`
Dashboard string `mapstructure:"dashboard"`
ResourceManagers []ResourceManager `mapstructure:"resource_managers"`
Plugin PluginConfig `mapstructure:"plugin"`
Replay ReplayConfig `mapstructure:"replay"`
Publisher *Publisher `mapstructure:"publisher"`
Version Version `mapstructure:"version"`
Log LogConfig `mapstructure:"log"`
Serve Serve `mapstructure:"serve"`
Telemetry TelemetryConfig `mapstructure:"telemetry"`
Alerting AlertingConfig `mapstructure:"alerting"`
ResourceManagers []ResourceManager `mapstructure:"resource_managers"`
Plugin PluginConfig `mapstructure:"plugin"`
Replay ReplayConfig `mapstructure:"replay"`
Publisher *Publisher `mapstructure:"publisher"`
}

type Serve struct {
Expand All @@ -34,6 +33,12 @@ type TelemetryConfig struct {
MetricServerAddr string `mapstructure:"telegraf_addr"`
}

type AlertingConfig struct {
EventManager EventManagerConfig `mapstructure:"alert_manager"`
Dashboard string `mapstructure:"dashboard"`
DataConsole string `mapstructure:"data_console"`
}

type EventManagerConfig struct {
Host string `mapstructure:"host"`
Endpoint string `mapstructure:"endpoint"`
Expand Down
11 changes: 6 additions & 5 deletions core/scheduler/job_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,12 @@ type OperatorRun struct {
}

type AlertAttrs struct {
Owner string
JobURN string
Title string
Status EventStatus
JobEvent *Event
Owner string
JobURN string
Title string
SchedulerHost string
Status EventStatus
JobEvent *Event
}

type WebhookAttrs struct {
Expand Down
19 changes: 14 additions & 5 deletions core/scheduler/service/events_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,22 @@ func (e *EventsService) Relay(ctx context.Context, event *scheduler.Event) error
e.l.Error("error getting detail for job [%s]: %s", event.JobName, err)
return err
}
tenantWithDetails, err := e.tenantService.GetDetails(ctx, event.Tenant)
if err != nil {
return err
}
schedulerHost, err := tenantWithDetails.GetConfig(tenant.ProjectSchedulerHost)
if err != nil {
return err
}
if event.Type == scheduler.JobFailureEvent || event.Type == scheduler.SLAMissEvent {
e.alertManager.Relay(&scheduler.AlertAttrs{
Owner: jobDetails.JobMetadata.Owner,
JobURN: jobDetails.Job.URN(),
Title: "Optimus Job Alert",
Status: scheduler.StatusFiring,
JobEvent: event,
Owner: jobDetails.JobMetadata.Owner,
JobURN: jobDetails.Job.URN(),
Title: "Optimus Job Alert",
SchedulerHost: schedulerHost,
Status: scheduler.StatusFiring,
JobEvent: event,
})
}
return nil
Expand Down
17 changes: 11 additions & 6 deletions core/scheduler/service/events_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,19 @@ func TestNotificationService(t *testing.T) {

alertManager := new(mockAlertManager)
alertManager.On("Relay", &scheduler.AlertAttrs{
Owner: "jobOwnerName",
JobURN: job.URN(),
Title: "Optimus Job Alert",
Status: scheduler.StatusFiring,
JobEvent: event,
Owner: "jobOwnerName",
JobURN: job.URN(),
Title: "Optimus Job Alert",
SchedulerHost: "localhost",
Status: scheduler.StatusFiring,
JobEvent: event,
})
tenantService := new(mockTenantService)
tenantWithDetails, _ := tenant.NewTenantDetails(project, namespace, []*tenant.PlainTextSecret{})
tenantService.On("GetDetails", ctx, tnnt).Return(tenantWithDetails, nil)
defer tenantService.AssertExpectations(t)

notifyService := service.NewEventsService(logger, jobRepo, nil, nil, nil, nil, alertManager)
notifyService := service.NewEventsService(logger, jobRepo, tenantService, nil, nil, nil, alertManager)

err := notifyService.Relay(ctx, event)
assert.Nil(t, err)
Expand Down
48 changes: 36 additions & 12 deletions core/scheduler/service/job_run_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,22 +400,30 @@ func (*JobRunService) getMonitoringValues(event *scheduler.Event) map[string]any
return output
}

func (s *JobRunService) updateJobRunSLA(ctx context.Context, event *scheduler.Event) error {
if len(event.SLAObjectList) < 1 {
return nil
}
var scheduleTimesList []time.Time
for _, SLAObject := range event.SLAObjectList {
scheduleTimesList = append(scheduleTimesList, SLAObject.JobScheduledAt)
func (s *JobRunService) filterSLAObjects(ctx context.Context, event *scheduler.Event) ([]*scheduler.SLAObject, []time.Time) {
scheduleTimesList := make([]time.Time, len(event.SLAObjectList))
unfilteredSLAObj := make([]*scheduler.SLAObject, len(event.SLAObjectList))
var slaBreachedJobRunScheduleTimes []time.Time

for i, SLAObject := range event.SLAObjectList {
scheduleTimesList[i] = SLAObject.JobScheduledAt
unfilteredSLAObj[i] = &scheduler.SLAObject{JobName: SLAObject.JobName, JobScheduledAt: SLAObject.JobScheduledAt}
}
jobRuns, err := s.repo.GetByScheduledTimes(ctx, event.Tenant, event.JobName, scheduleTimesList)
if err != nil {
s.l.Error("error getting job runs by schedule time", err)
return err
s.l.Error("error getting job runs by schedule time, skipping the filter", err)
return unfilteredSLAObj, slaBreachedJobRunScheduleTimes
}
if len(jobRuns) == 0 {
s.l.Error("no job runs found for given schedule time, skipping the filter (perhaps the sla is due to schedule delay, in such cases the job wont be persisted in optimus DB)", err)
event.Status = scheduler.StateNotScheduled
event.JobScheduledAt = event.SLAObjectList[0].JobScheduledAt // pick the first reported sla
return unfilteredSLAObj, slaBreachedJobRunScheduleTimes
}

var slaBreachedJobRunScheduleTimes []time.Time
var filteredSLAObject []*scheduler.SLAObject
var latestScheduleTime time.Time
var latestJobRun *scheduler.JobRun
for _, jobRun := range jobRuns {
if !jobRun.HasSLABreached() {
s.l.Error("received sla miss callback for job run that has not breached SLA, jobName: %s, scheduled_at: %s, start_time: %s, end_time: %s, SLA definition: %s",
Expand All @@ -426,12 +434,28 @@ func (s *JobRunService) updateJobRunSLA(ctx context.Context, event *scheduler.Ev
JobName: jobRun.JobName,
JobScheduledAt: jobRun.ScheduledAt,
})
if jobRun.ScheduledAt.Unix() > latestScheduleTime.Unix() {
latestScheduleTime = jobRun.ScheduledAt
latestJobRun = jobRun
}
slaBreachedJobRunScheduleTimes = append(slaBreachedJobRunScheduleTimes, jobRun.ScheduledAt)
}
if latestJobRun != nil {
event.Status = latestJobRun.State
event.JobScheduledAt = latestJobRun.ScheduledAt
}

event.SLAObjectList = filteredSLAObject
return filteredSLAObject, slaBreachedJobRunScheduleTimes
}

func (s *JobRunService) updateJobRunSLA(ctx context.Context, event *scheduler.Event) error {
if len(event.SLAObjectList) < 1 {
return nil
}
var slaBreachedJobRunScheduleTimes []time.Time
event.SLAObjectList, slaBreachedJobRunScheduleTimes = s.filterSLAObjects(ctx, event)

err = s.repo.UpdateSLA(ctx, event.JobName, event.Tenant.ProjectName(), slaBreachedJobRunScheduleTimes)
err := s.repo.UpdateSLA(ctx, event.JobName, event.Tenant.ProjectName(), slaBreachedJobRunScheduleTimes)
if err != nil {
s.l.Error("error updating job run sla status", err)
return err
Expand Down
3 changes: 3 additions & 0 deletions core/scheduler/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const (
StateSuccess State = "success"
StateFailed State = "failed"

StateNotScheduled State = "waiting_to_schedule"
StateWaitUpstream State = "wait_upstream"
StateInProgress State = "in_progress"

Expand Down Expand Up @@ -52,6 +53,8 @@ func StateFromString(state string) (State, error) {
return StateWaitUpstream, nil
case string(StateInProgress):
return StateInProgress, nil
case string(StateNotScheduled):
return StateNotScheduled, nil
default:
return "", errors.InvalidArgument(EntityJobRun, "invalid state for run "+state)
}
Expand Down
77 changes: 33 additions & 44 deletions ext/notify/alertmanager/alertManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ const (
eventBatchInterval = time.Second * 10
httpTimeout = time.Second * 10
radarTimeFormat = "2006/01/02 15:04:05"
failureAlertTemplate = "optimus-job-failure"
slaAlertTemplate = "optimus-job-sla-miss"
)

var (
Expand Down Expand Up @@ -49,25 +51,16 @@ type AlertManager struct {
wg sync.WaitGroup
workerErrChan chan error

host string
endpoint string
dashboard string
host string
endpoint string
dashboard string
dataConsole string

eventBatchInterval time.Duration
}

type alertData struct {
EventType scheduler.JobEventType `json:"num_alerts_firing"`
Status scheduler.EventStatus `json:"status"`

Severity string `json:"severity"`
Title string `json:"alert_name"`
Summary string `json:"summary"`
Dashboard string `json:"dashboard"`
}

type AlertPayload struct {
Data alertData `json:"data"`
Data map[string]string `json:"data"`
Template string `json:"template"`
Labels map[string]string `json:"labels"`
}
Expand All @@ -83,26 +76,9 @@ func (a *AlertManager) Relay(alert *scheduler.AlertAttrs) {
}()
}

func RelayEvent(e *scheduler.AlertAttrs, host, endpoint, dashboardURL string) error {
var notificationMsg string
switch e.JobEvent.Type {
case scheduler.JobFailureEvent:
notificationMsg = fmt.Sprintf("*[Job]* `%s` :alert:\n"+
"*Project*\t\t:\t%s\t\t\t*Namespace*\t:\t%s\n"+
"*Owner*\t\t:\t<%s>\t\t*Job*\t\t\t:\t`%s`\n"+
"*Task ID*\t\t:\t%s\t\t\t*Scheduled At*:\t`%s`\n",
e.JobEvent.Status, e.JobEvent.Tenant.ProjectName(), e.JobEvent.Tenant.NamespaceName(),
e.Owner, e.JobEvent.JobName, e.JobEvent.OperatorName, e.JobEvent.JobScheduledAt.Format(time.RFC822))
case scheduler.SLAMissEvent:
notificationMsg = fmt.Sprintf("[Job] SLA MISS :alert:\n"+
"*Project*\t\t:\t%s\t\t\t*Namespace*\t:\t%s\n"+
"*Owner*\t\t:\t<%s>\t\t*Job*\t\t\t:\t`%s`\nPending Tasks:\n",
e.JobEvent.Tenant.ProjectName(), e.JobEvent.Tenant.NamespaceName(),
e.Owner, e.JobEvent.JobName)
for _, object := range e.JobEvent.SLAObjectList {
notificationMsg += fmt.Sprintf("Task: %s\n", object.JobName)
}
}
func RelayEvent(e *scheduler.AlertAttrs, host, endpoint, dashboardURL, dataConsole string) error {
var template string
var templateContext map[string]string

dashURL, _ := url.Parse(dashboardURL)
q := dashURL.Query()
Expand All @@ -111,16 +87,28 @@ func RelayEvent(e *scheduler.AlertAttrs, host, endpoint, dashboardURL string) er
q.Set("var-job", e.JobEvent.JobName.String())
q.Set("var-schedule_time", e.JobEvent.JobScheduledAt.Format(radarTimeFormat))
dashURL.RawQuery = q.Encode()
templateContext = map[string]string{
"project": e.JobEvent.Tenant.ProjectName().String(),
"namespace": e.JobEvent.Tenant.NamespaceName().String(),
"job_name": e.JobEvent.JobName.String(),
"owner": e.Owner,
"scheduled_at": e.JobEvent.JobScheduledAt.Format(radarTimeFormat),
"console_link": fmt.Sprintf("%s/%s/%s", dataConsole, "optimus", e.JobEvent.JobName),
"dashboard": dashURL.String(),
"airflow_logs": fmt.Sprintf("%s/dags/%s/grid", e.SchedulerHost, e.JobEvent.JobName),
}
switch e.JobEvent.Type {
case scheduler.JobFailureEvent:
template = failureAlertTemplate
templateContext["task_id"] = e.JobEvent.OperatorName
case scheduler.SLAMissEvent:
template = slaAlertTemplate
templateContext["state"] = e.JobEvent.Status.String()
}

payload := AlertPayload{
Data: alertData{
EventType: e.JobEvent.Type,
Title: e.Title,
Status: e.Status,
Severity: "CRITICAL",
Summary: notificationMsg,
Dashboard: dashURL.String(),
},
Data: templateContext,
Template: template,
Labels: map[string]string{
"job_urn": e.JobURN,
"event_type": e.JobEvent.Type.String(),
Expand Down Expand Up @@ -161,7 +149,7 @@ func (a *AlertManager) worker(ctx context.Context) {
for {
select {
case e := <-a.alertChan:
err := RelayEvent(e, a.host, a.endpoint, a.dashboard) // nolint:contextcheck
err := RelayEvent(e, a.host, a.endpoint, a.dashboard, a.dataConsole) // nolint:contextcheck
if err != nil {
a.workerErrChan <- fmt.Errorf("alert worker: %w", err)
eventWorkerSendErrCounter.Inc()
Expand All @@ -183,7 +171,7 @@ func (a *AlertManager) Close() error { // nolint: unparam
return nil
}

func New(ctx context.Context, errHandler func(error), host, endpoint, dashboard string) *AlertManager {
func New(ctx context.Context, errHandler func(error), host, endpoint, dashboard, dataConsole string) *AlertManager {
if host == "" {
return &AlertManager{}
}
Expand All @@ -195,6 +183,7 @@ func New(ctx context.Context, errHandler func(error), host, endpoint, dashboard
host: host,
endpoint: endpoint,
dashboard: dashboard,
dataConsole: dataConsole,
eventBatchInterval: eventBatchInterval,
}

Expand Down
15 changes: 9 additions & 6 deletions ext/notify/alertmanager/alertManager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,14 @@ func TestAlertManager(t *testing.T) {
assert.Nil(t, json.NewDecoder(r.Body).Decode(&payload))

// Check if the payload is properly formed
assert.NotEmpty(t, payload.Data.Dashboard)
assert.NotEmpty(t, payload.Data.Status)
assert.Equal(t, payload.Data.EventType, scheduler.SLAMissEvent)
assert.NotEmpty(t, payload.Data.Summary)
assert.NotEmpty(t, payload.Data.Severity)
assert.NotEmpty(t, payload.Data["project"])
assert.NotEmpty(t, payload.Data["namespace"])
assert.NotEmpty(t, payload.Data["job_name"])
assert.NotEmpty(t, payload.Data["owner"])
assert.NotEmpty(t, payload.Data["scheduled_at"])
assert.NotEmpty(t, payload.Data["console_link"])
assert.NotEmpty(t, payload.Data["dashboard"])
assert.NotEmpty(t, payload.Data["airflow_logs"])

w.WriteHeader(http.StatusOK)
})
Expand Down Expand Up @@ -67,7 +70,7 @@ func TestAlertManager(t *testing.T) {
},
},
},
}, mockServer.URL, alertManagerEndPoint, "dashboard_url")
}, mockServer.URL, alertManagerEndPoint, "dashboard_url", "data_console_url")
assert.Nil(t, err)

assert.Equal(t, reqRecorder.Code, http.StatusOK)
Expand Down
7 changes: 4 additions & 3 deletions server/optimus.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,9 +320,10 @@ func (s *OptimusServer) setupHandlers() error {
func(err error) {
s.logger.Error("alert-manager error accumulator : " + err.Error())
},
s.conf.EventManager.Host,
s.conf.EventManager.Endpoint,
s.conf.Dashboard,
s.conf.Alerting.EventManager.Host,
s.conf.Alerting.EventManager.Endpoint,
s.conf.Alerting.Dashboard,
s.conf.Alerting.DataConsole,
)

newEngine := compiler.NewEngine()
Expand Down

0 comments on commit 659f137

Please sign in to comment.