Skip to content

Commit

Permalink
Adding a retry for 429 errors in common meraki apis (#642)
Browse files Browse the repository at this point in the history
  • Loading branch information
i3149 authored Dec 14, 2023
1 parent 0779c49 commit e5c428c
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 21 deletions.
67 changes: 50 additions & 17 deletions pkg/inputs/snmp/x/meraki/meraki.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type MerakiClient struct {
orgs []orgDesc
timeout time.Duration
cache *clientCache
maxRetry int
}

type orgDesc struct {
Expand All @@ -51,9 +52,12 @@ type networkDesc struct {
}

const (
ControllerKey = "meraki_controller_name"
MerakiApiKey = "KENTIK_MERAKI_API_KEY"
DeviceCacheDuration = time.Duration(24) * time.Hour
ControllerKey = "meraki_controller_name"
MerakiApiKey = "KENTIK_MERAKI_API_KEY"
DeviceCacheDuration = time.Duration(24) * time.Hour
MAX_TIMEOUT_RETRY = 10 // Don't retry a call more than this many times.
MAX_TIMEOUT_SEC = 5 // Sleep this many sec each 429.
DEFAULT_TIMEOUT_RETRY = 2
)

func NewMerakiClient(jchfChan chan []*kt.JCHF, gconf *kt.SnmpGlobalConfig, conf *kt.SnmpDeviceConfig, metrics *kt.SnmpDeviceMetric, log logger.ContextL) (*MerakiClient, error) {
Expand All @@ -67,13 +71,19 @@ func NewMerakiClient(jchfChan chan []*kt.JCHF, gconf *kt.SnmpGlobalConfig, conf
auth: httptransport.APIKeyAuth("X-Cisco-Meraki-API-Key", "header", kt.LookupEnvString(MerakiApiKey, conf.Ext.MerakiConfig.ApiKey)),
timeout: 30 * time.Second,
cache: newClientCache(log),
maxRetry: conf.Ext.MerakiConfig.MaxAPIRetry,
}

host := conf.Ext.MerakiConfig.Host
if host == "" {
host = apiclient.DefaultHost
}

// Figure out max retries.
if c.maxRetry == 0 || c.maxRetry > MAX_TIMEOUT_RETRY {
c.maxRetry = DEFAULT_TIMEOUT_RETRY
}

// Figure out global or local timeout here.
if conf.TimeoutMS > 0 {
c.timeout = time.Duration(conf.TimeoutMS) * time.Millisecond
Expand Down Expand Up @@ -785,8 +795,8 @@ type deviceUplink struct {

func (c *MerakiClient) getUplinks(dur time.Duration) ([]*kt.JCHF, error) {

var getUplinkStatus func(nextToken string, org orgDesc, uplinks *map[string]deviceUplink) error
getUplinkStatus = func(nextToken string, org orgDesc, uplinks *map[string]deviceUplink) error {
var getUplinkStatus func(nextToken string, org orgDesc, uplinks *map[string]deviceUplink, timeouts int) error
getUplinkStatus = func(nextToken string, org orgDesc, uplinks *map[string]deviceUplink, timeouts int) error {
params := organizations.NewGetOrganizationUplinksStatusesParamsWithTimeout(c.timeout)
params.SetOrganizationID(org.ID)
if nextToken != "" {
Expand All @@ -795,6 +805,13 @@ func (c *MerakiClient) getUplinks(dur time.Duration) ([]*kt.JCHF, error) {

prod, err := c.client.Organizations.GetOrganizationUplinksStatuses(params, c.auth)
if err != nil {
if strings.Contains(err.Error(), "(status 429)") && timeouts < c.maxRetry {
sleepDur := time.Duration(MAX_TIMEOUT_SEC) * time.Second
c.log.Warnf("Uplink Status: %s 429, sleeping %v", org.Name, sleepDur)
time.Sleep(sleepDur) // For right now guess on this, need to add 429 to spec.
timeouts++
return getUplinkStatus(nextToken, org, uplinks, timeouts)
}
return err
}

Expand Down Expand Up @@ -822,15 +839,15 @@ func (c *MerakiClient) getUplinks(dur time.Duration) ([]*kt.JCHF, error) {
// Recursion!
nextLink := getNextLink(prod.Link)
if nextLink != "" {
return getUplinkStatus(nextLink, org, uplinks)
return getUplinkStatus(nextLink, org, uplinks, timeouts)
} else {
return nil
}
}

uplinks := map[string]deviceUplink{}
for _, org := range c.orgs {
err := getUplinkStatus("", org, &uplinks)
err := getUplinkStatus("", org, &uplinks, 0)
if err != nil {
if strings.Contains(err.Error(), "(status 400)") { // There are no valid uplinks to worry about here.
return nil, nil
Expand Down Expand Up @@ -908,8 +925,9 @@ func (c *MerakiClient) getUplinkUsage(dur time.Duration, uplinkMap map[string]de
prod, err := c.client.Appliance.GetOrganizationApplianceUplinksUsageByNetwork(params, c.auth)
if err != nil {
if strings.Contains(err.Error(), "status 429") {
c.log.Infof("Uplink Usage: %s 429, sleeping", org.Name)
time.Sleep(3 * time.Second) // For right now guess on this, need to add 429 to spec.
sleepDur := time.Duration(MAX_TIMEOUT_SEC) * time.Second
c.log.Infof("Uplink Usage: %s 429, sleeping %v", org.Name, sleepDur)
time.Sleep(sleepDur) // For right now guess on this, need to add 429 to spec.
return getUsage(params, org)
} else {
c.log.Warnf("Cannot get Uplink Usage: %s %v", org.Name, err)
Expand Down Expand Up @@ -1064,8 +1082,8 @@ type vpnStatus struct {

func (c *MerakiClient) getVpnStatus(dur time.Duration) ([]*kt.JCHF, error) {

var getVpnStatus func(nextToken string, org orgDesc, vpns *[]*vpnStatus) error
getVpnStatus = func(nextToken string, org orgDesc, vpns *[]*vpnStatus) error {
var getVpnStatus func(nextToken string, org orgDesc, vpns *[]*vpnStatus, timeouts int) error
getVpnStatus = func(nextToken string, org orgDesc, vpns *[]*vpnStatus, timeouts int) error {
params := appliance.NewGetOrganizationApplianceVpnStatusesParamsWithTimeout(c.timeout)
params.SetOrganizationID(org.ID)
if nextToken != "" {
Expand All @@ -1074,6 +1092,13 @@ func (c *MerakiClient) getVpnStatus(dur time.Duration) ([]*kt.JCHF, error) {

prod, err := c.client.Appliance.GetOrganizationApplianceVpnStatuses(params, c.auth)
if err != nil {
if strings.Contains(err.Error(), "(status 429)") && timeouts < c.maxRetry {
sleepDur := time.Duration(MAX_TIMEOUT_SEC) * time.Second
c.log.Warnf("Vpn Status: %s 429, sleeping %v", org.Name, sleepDur)
time.Sleep(sleepDur) // For right now guess on this, need to add 429 to spec.
timeouts++
return getVpnStatus(nextToken, org, vpns, timeouts)
}
return err
}

Expand Down Expand Up @@ -1103,15 +1128,15 @@ func (c *MerakiClient) getVpnStatus(dur time.Duration) ([]*kt.JCHF, error) {
// Recursion!
nextLink := getNextLink(prod.Link)
if nextLink != "" {
return getVpnStatus(nextLink, org, vpns)
return getVpnStatus(nextLink, org, vpns, timeouts)
} else {
return nil
}
}

vpns := make([]*vpnStatus, 0)
for _, org := range c.orgs {
err := getVpnStatus("", org, &vpns)
err := getVpnStatus("", org, &vpns, 0)
if err != nil {
if strings.Contains(err.Error(), "(status 400)") { // There are no valid vpns to worry about here.
return nil, nil
Expand Down Expand Up @@ -1236,16 +1261,24 @@ func (c *MerakiClient) getDeviceStatus(dur time.Duration) ([]*kt.JCHF, error) {
productTypes[pt] = true
}

var getDeviceStatus func(nextToken string, org orgDesc, devices *[]*deviceStatusWrapper) error
getDeviceStatus = func(nextToken string, org orgDesc, devices *[]*deviceStatusWrapper) error {
var getDeviceStatus func(nextToken string, org orgDesc, devices *[]*deviceStatusWrapper, timeouts int) error
getDeviceStatus = func(nextToken string, org orgDesc, devices *[]*deviceStatusWrapper, timeouts int) error {
params := organizations.NewGetOrganizationDevicesStatusesParamsWithTimeout(c.timeout)
params.SetOrganizationID(org.ID)
if nextToken != "" {
params.SetStartingAfter(&nextToken)
}

prod, err := c.client.Organizations.GetOrganizationDevicesStatuses(params, c.auth)
err = fmt.Errorf("(status 429)")
if err != nil {
if strings.Contains(err.Error(), "(status 429)") && timeouts < c.maxRetry {
sleepDur := time.Duration(MAX_TIMEOUT_SEC) * time.Second
c.log.Warnf("Device Status: %s 429, sleeping %v", org.Name, sleepDur)
time.Sleep(sleepDur) // For right now guess on this, need to add 429 to spec.
timeouts++
return getDeviceStatus(nextToken, org, devices, timeouts)
}
return err
}

Expand Down Expand Up @@ -1274,15 +1307,15 @@ func (c *MerakiClient) getDeviceStatus(dur time.Duration) ([]*kt.JCHF, error) {
// Recursion!
nextLink := getNextLink(prod.Link)
if nextLink != "" {
return getDeviceStatus(nextLink, org, devices)
return getDeviceStatus(nextLink, org, devices, timeouts)
} else {
return nil
}
}

devices := make([]*deviceStatusWrapper, 0)
for _, org := range c.orgs {
err := getDeviceStatus("", org, &devices)
err := getDeviceStatus("", org, &devices, 0)
if err != nil {
if strings.Contains(err.Error(), "(status 400)") { // There are no valid devices to worry about here.
return nil, nil
Expand Down
9 changes: 5 additions & 4 deletions pkg/kt/snmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,11 @@ type MerakiConfig struct {
MonitorOrgChanges bool `yaml:"monitor_org_changes"`
MonitorNetworkClients bool `yaml:"monitor_clients"`
MonitorVpnStatus bool `yaml:"monitor_vpn_status"`
Orgs []string `yaml:"organizations"` // Only monitor orgs in this list, if set.
Networks []string `yaml:"networks"` // Only monitor networks in this list, if set.
Prefs map[string]bool `yaml:"preferences"` // Additional fine tuning of what data is returned.
ProductTypes []string `yaml:"product_types"` // Only monitor these product types, if set.
Orgs []string `yaml:"organizations"` // Only monitor orgs in this list, if set.
Networks []string `yaml:"networks"` // Only monitor networks in this list, if set.
Prefs map[string]bool `yaml:"preferences"` // Additional fine tuning of what data is returned.
ProductTypes []string `yaml:"product_types"` // Only monitor these product types, if set.
MaxAPIRetry int `yaml:"max_http_retry"` // retry up to this many times on 429 errors. Default 2.
}

// Contain various extensions to snmp which can be used to get data.
Expand Down

0 comments on commit e5c428c

Please sign in to comment.