Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add refresh_interval parameter #383

Merged
merged 1 commit into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions plugins/okta/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

# v 0.9.0

* Add `refresh_interval` parameter
* Add deduplication method to improve the count of MFA attempts

## v0.8.1


Expand Down
5 changes: 5 additions & 0 deletions plugins/okta/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ Only `init` accepts settings:
* `api_token`: your API Token to access Okta API
* `cache_expiration`: TTL in seconds for keys in cache for MFA events (default: 600)
* `cache_usermaxsize`: Max size by user for the cache (default: 200)
* `refresh_interval`: Delay in seconds between two calls to the Okta API (default: 10)

> **Warning**
Don't set a too low value for `refresh_interval` too avoid `Too many requests` errors.

# Configurations

Expand All @@ -107,6 +111,7 @@ Only `init` accepts settings:
api_token: xxxxxxxxxxx
cache_expiration: 84600 #24h
cache_usermaxsize: 200
refresh_interval: 10 #in seconds
open_params: ''

load_plugins: [okta]
Expand Down
53 changes: 38 additions & 15 deletions plugins/okta/pkg/okta/okta.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"io"
"net/http"
"strings"
"time"
Expand Down Expand Up @@ -113,6 +113,7 @@ type Plugin struct {
Organization string `json:"organization" jsonschema:"title=Organization,description=Your Okta organization"`
CacheExpiration uint64 `json:"cache_expiration" jsonschema:"title=Cache Expiration,description=TTL in seconds for keys in cache for MFA events (default: 600)"`
CacheUserMaxSize uint64 `json:"cache_usermaxsize" jsonschema:"title=Cache User Max Size,description=Max size by user for the cache (default: 200)"`
RefreshInterval uint64 `json:"refresh_interval" jsonschema:"title=Refresh Interval,description=Delay in seconds between two calls to the Okta API (default: 10)"`
lastLogEvent LogEvent
lastEventNum uint64
cache gcache.Cache
Expand All @@ -121,10 +122,11 @@ type Plugin struct {
// PluginInstance represents a opened stream based on our Plugin
type PluginInstance struct {
source.BaseInstance
client *http.Client
request *http.Request
cancel context.CancelFunc
lastReqTime time.Time
client *http.Client
request *http.Request
cancel context.CancelFunc
lastReqTime time.Time
refreshInterval uint64
}

const oktaBaseURL string = "okta.com/api/v1/logs"
Expand All @@ -136,7 +138,7 @@ func (oktaPlugin *Plugin) Info() *plugins.Info {
Name: "okta",
Description: "Okta Log Events",
Contact: "github.com/falcosecurity/plugins/",
Version: "0.8.1",
Version: "0.9.0",
EventSource: "okta",
}
}
Expand All @@ -161,6 +163,7 @@ func (oktaPlugin *Plugin) InitSchema() *sdk.SchemaInfo {
func (oktaPlugin *Plugin) Init(config string) error {
oktaPlugin.CacheExpiration = 84600
oktaPlugin.CacheUserMaxSize = 200
oktaPlugin.RefreshInterval = 10
err := json.Unmarshal([]byte(config), &oktaPlugin)
if err != nil {
return err
Expand Down Expand Up @@ -227,7 +230,7 @@ func (oktaPlugin *Plugin) Extract(req sdk.ExtractRequest, evt sdk.EventReader) e
data := oktaPlugin.lastLogEvent

if evt.EventNum() != oktaPlugin.lastEventNum {
rawData, err := ioutil.ReadAll(evt.Reader())
rawData, err := io.ReadAll(evt.Reader())
if err != nil {
return err
}
Expand All @@ -250,6 +253,7 @@ func (oktaPlugin *Plugin) Extract(req sdk.ExtractRequest, evt sdk.EventReader) e
}
}
valueList = append(valueList, evt.Timestamp())
valueList = removeDuplicateUint64(valueList)
if uint64(len(valueList)) > oktaPlugin.CacheUserMaxSize {
valueList = valueList[1:]
}
Expand Down Expand Up @@ -416,7 +420,7 @@ func (oktaPlugin *Plugin) Open(params string) (source.Instance, error) {
return nil, err
}

since := time.Now().UTC().Add(-60 * time.Second)
since := time.Now().UTC().Add(time.Duration(-30) * time.Second)
values := req.URL.Query()
values.Add("since", since.Format(time.RFC3339))
req.URL.RawQuery = values.Encode()
Expand All @@ -426,16 +430,18 @@ func (oktaPlugin *Plugin) Open(params string) (source.Instance, error) {
req.Header.Add("Authorization", "SSWS "+oktaPlugin.APIToken)

return &PluginInstance{
client: &http.Client{},
request: req,
cancel: cancel,
client: &http.Client{},
request: req,
cancel: cancel,
refreshInterval: oktaPlugin.RefreshInterval,
lastReqTime: since,
}, nil
}

// String represents the raw value of on event
// todo: optimize this to cache by event number
func (oktaPlugin *Plugin) String(evt sdk.EventReader) (string, error) {
evtBytes, err := ioutil.ReadAll(evt.Reader())
evtBytes, err := io.ReadAll(evt.Reader())
if err != nil {
return "", err
}
Expand All @@ -446,9 +452,10 @@ func (oktaPlugin *Plugin) String(evt sdk.EventReader) (string, error) {

// NextBatch is called by Falco plugin framework to get a batch of events from the instance
func (oktaInstance *PluginInstance) NextBatch(pState sdk.PluginState, evts sdk.EventWriters) (int, error) {
now := time.Now()
if now.Before(oktaInstance.lastReqTime.Add(5 * time.Second)) {
time.Sleep(oktaInstance.lastReqTime.Add(5 * time.Second).Sub(now))
now := time.Now().UTC()
if duration := now.Sub(oktaInstance.lastReqTime); duration < (time.Duration(oktaInstance.refreshInterval) * time.Second) {
time.Sleep(time.Duration(oktaInstance.refreshInterval)*time.Second - duration)
now = time.Now()
}

var logEvents []LogEvent
Expand All @@ -463,6 +470,10 @@ func (oktaInstance *PluginInstance) NextBatch(pState sdk.PluginState, evts sdk.E
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return 0, sdk.ErrTimeout
}

err = json.NewDecoder(resp.Body).Decode(&logEvents)
if err != nil {
return 0, err
Expand Down Expand Up @@ -492,3 +503,15 @@ func (oktaInstance *PluginInstance) NextBatch(pState sdk.PluginState, evts sdk.E
func (oktaInstance *PluginInstance) Close() {
oktaInstance.cancel()
}

func removeDuplicateUint64(intSlice []uint64) []uint64 {
allKeys := make(map[uint64]bool)
list := []uint64{}
for _, item := range intSlice {
if _, value := allKeys[item]; !value {
allKeys[item] = true
list = append(list, item)
}
}
return list
}
Loading