Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding a retry for 429 errors in common meraki apis #642

Merged
merged 2 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 50 additions & 17 deletions pkg/inputs/snmp/x/meraki/meraki.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type MerakiClient struct {
orgs []orgDesc
timeout time.Duration
cache *clientCache
maxRetry int
}

type orgDesc struct {
Expand All @@ -51,9 +52,12 @@ type networkDesc struct {
}

const (
ControllerKey = "meraki_controller_name"
MerakiApiKey = "KENTIK_MERAKI_API_KEY"
DeviceCacheDuration = time.Duration(24) * time.Hour
ControllerKey = "meraki_controller_name"
MerakiApiKey = "KENTIK_MERAKI_API_KEY"
DeviceCacheDuration = time.Duration(24) * time.Hour
MAX_TIMEOUT_RETRY = 10 // Don't retry a call more than this many times.
MAX_TIMEOUT_SEC = 5 // Sleep this many sec each 429.
DEFAULT_TIMEOUT_RETRY = 2
)

func NewMerakiClient(jchfChan chan []*kt.JCHF, gconf *kt.SnmpGlobalConfig, conf *kt.SnmpDeviceConfig, metrics *kt.SnmpDeviceMetric, log logger.ContextL) (*MerakiClient, error) {
Expand All @@ -67,13 +71,19 @@ func NewMerakiClient(jchfChan chan []*kt.JCHF, gconf *kt.SnmpGlobalConfig, conf
auth: httptransport.APIKeyAuth("X-Cisco-Meraki-API-Key", "header", kt.LookupEnvString(MerakiApiKey, conf.Ext.MerakiConfig.ApiKey)),
timeout: 30 * time.Second,
cache: newClientCache(log),
maxRetry: conf.Ext.MerakiConfig.MaxAPIRetry,
}

host := conf.Ext.MerakiConfig.Host
if host == "" {
host = apiclient.DefaultHost
}

// Figure out max retries.
if c.maxRetry == 0 || c.maxRetry > MAX_TIMEOUT_RETRY {
c.maxRetry = DEFAULT_TIMEOUT_RETRY
}

// Figure out global or local timeout here.
if conf.TimeoutMS > 0 {
c.timeout = time.Duration(conf.TimeoutMS) * time.Millisecond
Expand Down Expand Up @@ -785,8 +795,8 @@ type deviceUplink struct {

func (c *MerakiClient) getUplinks(dur time.Duration) ([]*kt.JCHF, error) {

var getUplinkStatus func(nextToken string, org orgDesc, uplinks *map[string]deviceUplink) error
getUplinkStatus = func(nextToken string, org orgDesc, uplinks *map[string]deviceUplink) error {
var getUplinkStatus func(nextToken string, org orgDesc, uplinks *map[string]deviceUplink, timeouts int) error
getUplinkStatus = func(nextToken string, org orgDesc, uplinks *map[string]deviceUplink, timeouts int) error {
params := organizations.NewGetOrganizationUplinksStatusesParamsWithTimeout(c.timeout)
params.SetOrganizationID(org.ID)
if nextToken != "" {
Expand All @@ -795,6 +805,13 @@ func (c *MerakiClient) getUplinks(dur time.Duration) ([]*kt.JCHF, error) {

prod, err := c.client.Organizations.GetOrganizationUplinksStatuses(params, c.auth)
if err != nil {
if strings.Contains(err.Error(), "(status 429)") && timeouts < c.maxRetry {
sleepDur := time.Duration(MAX_TIMEOUT_SEC) * time.Second
c.log.Warnf("Uplink Status: %s 429, sleeping %v", org.Name, sleepDur)
time.Sleep(sleepDur) // For right now guess on this, need to add 429 to spec.
timeouts++
return getUplinkStatus(nextToken, org, uplinks, timeouts)
}
return err
}

Expand Down Expand Up @@ -822,15 +839,15 @@ func (c *MerakiClient) getUplinks(dur time.Duration) ([]*kt.JCHF, error) {
// Recursion!
nextLink := getNextLink(prod.Link)
if nextLink != "" {
return getUplinkStatus(nextLink, org, uplinks)
return getUplinkStatus(nextLink, org, uplinks, timeouts)
} else {
return nil
}
}

uplinks := map[string]deviceUplink{}
for _, org := range c.orgs {
err := getUplinkStatus("", org, &uplinks)
err := getUplinkStatus("", org, &uplinks, 0)
if err != nil {
if strings.Contains(err.Error(), "(status 400)") { // There are no valid uplinks to worry about here.
return nil, nil
Expand Down Expand Up @@ -908,8 +925,9 @@ func (c *MerakiClient) getUplinkUsage(dur time.Duration, uplinkMap map[string]de
prod, err := c.client.Appliance.GetOrganizationApplianceUplinksUsageByNetwork(params, c.auth)
if err != nil {
if strings.Contains(err.Error(), "status 429") {
c.log.Infof("Uplink Usage: %s 429, sleeping", org.Name)
time.Sleep(3 * time.Second) // For right now guess on this, need to add 429 to spec.
sleepDur := time.Duration(MAX_TIMEOUT_SEC) * time.Second
c.log.Infof("Uplink Usage: %s 429, sleeping %v", org.Name, sleepDur)
time.Sleep(sleepDur) // For right now guess on this, need to add 429 to spec.
return getUsage(params, org)
} else {
c.log.Warnf("Cannot get Uplink Usage: %s %v", org.Name, err)
Expand Down Expand Up @@ -1064,8 +1082,8 @@ type vpnStatus struct {

func (c *MerakiClient) getVpnStatus(dur time.Duration) ([]*kt.JCHF, error) {

var getVpnStatus func(nextToken string, org orgDesc, vpns *[]*vpnStatus) error
getVpnStatus = func(nextToken string, org orgDesc, vpns *[]*vpnStatus) error {
var getVpnStatus func(nextToken string, org orgDesc, vpns *[]*vpnStatus, timeouts int) error
getVpnStatus = func(nextToken string, org orgDesc, vpns *[]*vpnStatus, timeouts int) error {
params := appliance.NewGetOrganizationApplianceVpnStatusesParamsWithTimeout(c.timeout)
params.SetOrganizationID(org.ID)
if nextToken != "" {
Expand All @@ -1074,6 +1092,13 @@ func (c *MerakiClient) getVpnStatus(dur time.Duration) ([]*kt.JCHF, error) {

prod, err := c.client.Appliance.GetOrganizationApplianceVpnStatuses(params, c.auth)
if err != nil {
if strings.Contains(err.Error(), "(status 429)") && timeouts < c.maxRetry {
sleepDur := time.Duration(MAX_TIMEOUT_SEC) * time.Second
c.log.Warnf("Vpn Status: %s 429, sleeping %v", org.Name, sleepDur)
time.Sleep(sleepDur) // For right now guess on this, need to add 429 to spec.
timeouts++
return getVpnStatus(nextToken, org, vpns, timeouts)
}
return err
}

Expand Down Expand Up @@ -1103,15 +1128,15 @@ func (c *MerakiClient) getVpnStatus(dur time.Duration) ([]*kt.JCHF, error) {
// Recursion!
nextLink := getNextLink(prod.Link)
if nextLink != "" {
return getVpnStatus(nextLink, org, vpns)
return getVpnStatus(nextLink, org, vpns, timeouts)
} else {
return nil
}
}

vpns := make([]*vpnStatus, 0)
for _, org := range c.orgs {
err := getVpnStatus("", org, &vpns)
err := getVpnStatus("", org, &vpns, 0)
if err != nil {
if strings.Contains(err.Error(), "(status 400)") { // There are no valid vpns to worry about here.
return nil, nil
Expand Down Expand Up @@ -1236,16 +1261,24 @@ func (c *MerakiClient) getDeviceStatus(dur time.Duration) ([]*kt.JCHF, error) {
productTypes[pt] = true
}

var getDeviceStatus func(nextToken string, org orgDesc, devices *[]*deviceStatusWrapper) error
getDeviceStatus = func(nextToken string, org orgDesc, devices *[]*deviceStatusWrapper) error {
var getDeviceStatus func(nextToken string, org orgDesc, devices *[]*deviceStatusWrapper, timeouts int) error
getDeviceStatus = func(nextToken string, org orgDesc, devices *[]*deviceStatusWrapper, timeouts int) error {
params := organizations.NewGetOrganizationDevicesStatusesParamsWithTimeout(c.timeout)
params.SetOrganizationID(org.ID)
if nextToken != "" {
params.SetStartingAfter(&nextToken)
}

prod, err := c.client.Organizations.GetOrganizationDevicesStatuses(params, c.auth)
err = fmt.Errorf("(status 429)")
if err != nil {
if strings.Contains(err.Error(), "(status 429)") && timeouts < c.maxRetry {
sleepDur := time.Duration(MAX_TIMEOUT_SEC) * time.Second
c.log.Warnf("Device Status: %s 429, sleeping %v", org.Name, sleepDur)
time.Sleep(sleepDur) // For right now guess on this, need to add 429 to spec.
timeouts++
return getDeviceStatus(nextToken, org, devices, timeouts)
}
return err
}

Expand Down Expand Up @@ -1274,15 +1307,15 @@ func (c *MerakiClient) getDeviceStatus(dur time.Duration) ([]*kt.JCHF, error) {
// Recursion!
nextLink := getNextLink(prod.Link)
if nextLink != "" {
return getDeviceStatus(nextLink, org, devices)
return getDeviceStatus(nextLink, org, devices, timeouts)
} else {
return nil
}
}

devices := make([]*deviceStatusWrapper, 0)
for _, org := range c.orgs {
err := getDeviceStatus("", org, &devices)
err := getDeviceStatus("", org, &devices, 0)
if err != nil {
if strings.Contains(err.Error(), "(status 400)") { // There are no valid devices to worry about here.
return nil, nil
Expand Down
9 changes: 5 additions & 4 deletions pkg/kt/snmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,11 @@ type MerakiConfig struct {
MonitorOrgChanges bool `yaml:"monitor_org_changes"`
MonitorNetworkClients bool `yaml:"monitor_clients"`
MonitorVpnStatus bool `yaml:"monitor_vpn_status"`
Orgs []string `yaml:"organizations"` // Only monitor orgs in this list, if set.
Networks []string `yaml:"networks"` // Only monitor networks in this list, if set.
Prefs map[string]bool `yaml:"preferences"` // Additional fine tuning of what data is returned.
ProductTypes []string `yaml:"product_types"` // Only monitor these product types, if set.
Orgs []string `yaml:"organizations"` // Only monitor orgs in this list, if set.
Networks []string `yaml:"networks"` // Only monitor networks in this list, if set.
Prefs map[string]bool `yaml:"preferences"` // Additional fine tuning of what data is returned.
ProductTypes []string `yaml:"product_types"` // Only monitor these product types, if set.
MaxAPIRetry int `yaml:"max_http_retry"` // retry up to this many times on 429 errors. Default 2.
}

// Contain various extensions to snmp which can be used to get data.
Expand Down
Loading