From 6eddc58835f14109881cc49a257c6e658a167e6c Mon Sep 17 00:00:00 2001 From: Ian Pye Date: Tue, 12 Dec 2023 15:25:59 -0800 Subject: [PATCH 1/2] Adding a retry for 429 errors in common meraki apis --- pkg/inputs/snmp/x/meraki/meraki.go | 43 +++++++++++++++++++++--------- 1 file changed, 31 insertions(+), 12 deletions(-) diff --git a/pkg/inputs/snmp/x/meraki/meraki.go b/pkg/inputs/snmp/x/meraki/meraki.go index da75c3af..24b4d7c7 100644 --- a/pkg/inputs/snmp/x/meraki/meraki.go +++ b/pkg/inputs/snmp/x/meraki/meraki.go @@ -54,6 +54,7 @@ const ( ControllerKey = "meraki_controller_name" MerakiApiKey = "KENTIK_MERAKI_API_KEY" DeviceCacheDuration = time.Duration(24) * time.Hour + MAX_TIMEOUT_RETRY = 5 // Don't retry a call more than this many times. ) func NewMerakiClient(jchfChan chan []*kt.JCHF, gconf *kt.SnmpGlobalConfig, conf *kt.SnmpDeviceConfig, metrics *kt.SnmpDeviceMetric, log logger.ContextL) (*MerakiClient, error) { @@ -785,8 +786,8 @@ type deviceUplink struct { func (c *MerakiClient) getUplinks(dur time.Duration) ([]*kt.JCHF, error) { - var getUplinkStatus func(nextToken string, org orgDesc, uplinks *map[string]deviceUplink) error - getUplinkStatus = func(nextToken string, org orgDesc, uplinks *map[string]deviceUplink) error { + var getUplinkStatus func(nextToken string, org orgDesc, uplinks *map[string]deviceUplink, timeouts int) error + getUplinkStatus = func(nextToken string, org orgDesc, uplinks *map[string]deviceUplink, timeouts int) error { params := organizations.NewGetOrganizationUplinksStatusesParamsWithTimeout(c.timeout) params.SetOrganizationID(org.ID) if nextToken != "" { @@ -795,6 +796,12 @@ func (c *MerakiClient) getUplinks(dur time.Duration) ([]*kt.JCHF, error) { prod, err := c.client.Organizations.GetOrganizationUplinksStatuses(params, c.auth) if err != nil { + if strings.Contains(err.Error(), "(status 429)") && timeouts < MAX_TIMEOUT_RETRY { + c.log.Warnf("Uplink Status: %s 429, sleeping", org.Name) + time.Sleep(3 * time.Second) // For right now guess on this, need to add 429 to spec. + timeouts++ + return getUplinkStatus(nextToken, org, uplinks, timeouts) + } return err } @@ -822,7 +829,7 @@ func (c *MerakiClient) getUplinks(dur time.Duration) ([]*kt.JCHF, error) { // Recursion! nextLink := getNextLink(prod.Link) if nextLink != "" { - return getUplinkStatus(nextLink, org, uplinks) + return getUplinkStatus(nextLink, org, uplinks, timeouts) } else { return nil } @@ -830,7 +837,7 @@ func (c *MerakiClient) getUplinks(dur time.Duration) ([]*kt.JCHF, error) { uplinks := map[string]deviceUplink{} for _, org := range c.orgs { - err := getUplinkStatus("", org, &uplinks) + err := getUplinkStatus("", org, &uplinks, 0) if err != nil { if strings.Contains(err.Error(), "(status 400)") { // There are no valid uplinks to worry about here. return nil, nil @@ -1064,8 +1071,8 @@ type vpnStatus struct { func (c *MerakiClient) getVpnStatus(dur time.Duration) ([]*kt.JCHF, error) { - var getVpnStatus func(nextToken string, org orgDesc, vpns *[]*vpnStatus) error - getVpnStatus = func(nextToken string, org orgDesc, vpns *[]*vpnStatus) error { + var getVpnStatus func(nextToken string, org orgDesc, vpns *[]*vpnStatus, timeouts int) error + getVpnStatus = func(nextToken string, org orgDesc, vpns *[]*vpnStatus, timeouts int) error { params := appliance.NewGetOrganizationApplianceVpnStatusesParamsWithTimeout(c.timeout) params.SetOrganizationID(org.ID) if nextToken != "" { @@ -1074,6 +1081,12 @@ func (c *MerakiClient) getVpnStatus(dur time.Duration) ([]*kt.JCHF, error) { prod, err := c.client.Appliance.GetOrganizationApplianceVpnStatuses(params, c.auth) if err != nil { + if strings.Contains(err.Error(), "(status 429)") && timeouts < MAX_TIMEOUT_RETRY { + c.log.Warnf("Vpn Status: %s 429, sleeping", org.Name) + time.Sleep(3 * time.Second) // For right now guess on this, need to add 429 to spec. + timeouts++ + return getVpnStatus(nextToken, org, vpns, timeouts) + } return err } @@ -1103,7 +1116,7 @@ func (c *MerakiClient) getVpnStatus(dur time.Duration) ([]*kt.JCHF, error) { // Recursion! nextLink := getNextLink(prod.Link) if nextLink != "" { - return getVpnStatus(nextLink, org, vpns) + return getVpnStatus(nextLink, org, vpns, timeouts) } else { return nil } @@ -1111,7 +1124,7 @@ func (c *MerakiClient) getVpnStatus(dur time.Duration) ([]*kt.JCHF, error) { vpns := make([]*vpnStatus, 0) for _, org := range c.orgs { - err := getVpnStatus("", org, &vpns) + err := getVpnStatus("", org, &vpns, 0) if err != nil { if strings.Contains(err.Error(), "(status 400)") { // There are no valid vpns to worry about here. return nil, nil @@ -1236,8 +1249,8 @@ func (c *MerakiClient) getDeviceStatus(dur time.Duration) ([]*kt.JCHF, error) { productTypes[pt] = true } - var getDeviceStatus func(nextToken string, org orgDesc, devices *[]*deviceStatusWrapper) error - getDeviceStatus = func(nextToken string, org orgDesc, devices *[]*deviceStatusWrapper) error { + var getDeviceStatus func(nextToken string, org orgDesc, devices *[]*deviceStatusWrapper, timeouts int) error + getDeviceStatus = func(nextToken string, org orgDesc, devices *[]*deviceStatusWrapper, timeouts int) error { params := organizations.NewGetOrganizationDevicesStatusesParamsWithTimeout(c.timeout) params.SetOrganizationID(org.ID) if nextToken != "" { @@ -1246,6 +1259,12 @@ func (c *MerakiClient) getDeviceStatus(dur time.Duration) ([]*kt.JCHF, error) { prod, err := c.client.Organizations.GetOrganizationDevicesStatuses(params, c.auth) if err != nil { + if strings.Contains(err.Error(), "(status 429)") && timeouts < MAX_TIMEOUT_RETRY { + c.log.Warnf("Device Status: %s 429, sleeping", org.Name) + time.Sleep(3 * time.Second) // For right now guess on this, need to add 429 to spec. + timeouts++ + return getDeviceStatus(nextToken, org, devices, timeouts) + } return err } @@ -1274,7 +1293,7 @@ func (c *MerakiClient) getDeviceStatus(dur time.Duration) ([]*kt.JCHF, error) { // Recursion! nextLink := getNextLink(prod.Link) if nextLink != "" { - return getDeviceStatus(nextLink, org, devices) + return getDeviceStatus(nextLink, org, devices, timeouts) } else { return nil } @@ -1282,7 +1301,7 @@ func (c *MerakiClient) getDeviceStatus(dur time.Duration) ([]*kt.JCHF, error) { devices := make([]*deviceStatusWrapper, 0) for _, org := range c.orgs { - err := getDeviceStatus("", org, &devices) + err := getDeviceStatus("", org, &devices, 0) if err != nil { if strings.Contains(err.Error(), "(status 400)") { // There are no valid devices to worry about here. return nil, nil From 3a77d54d425662c9223f9406c91fd123a081f5f3 Mon Sep 17 00:00:00 2001 From: Ian Pye Date: Wed, 13 Dec 2023 11:20:23 -0800 Subject: [PATCH 2/2] new retry logic --- pkg/inputs/snmp/x/meraki/meraki.go | 44 ++++++++++++++++++++---------- pkg/kt/snmp.go | 9 +++--- 2 files changed, 34 insertions(+), 19 deletions(-) diff --git a/pkg/inputs/snmp/x/meraki/meraki.go b/pkg/inputs/snmp/x/meraki/meraki.go index 24b4d7c7..f6c302a0 100644 --- a/pkg/inputs/snmp/x/meraki/meraki.go +++ b/pkg/inputs/snmp/x/meraki/meraki.go @@ -34,6 +34,7 @@ type MerakiClient struct { orgs []orgDesc timeout time.Duration cache *clientCache + maxRetry int } type orgDesc struct { @@ -51,10 +52,12 @@ type networkDesc struct { } const ( - ControllerKey = "meraki_controller_name" - MerakiApiKey = "KENTIK_MERAKI_API_KEY" - DeviceCacheDuration = time.Duration(24) * time.Hour - MAX_TIMEOUT_RETRY = 5 // Don't retry a call more than this many times. + ControllerKey = "meraki_controller_name" + MerakiApiKey = "KENTIK_MERAKI_API_KEY" + DeviceCacheDuration = time.Duration(24) * time.Hour + MAX_TIMEOUT_RETRY = 10 // Don't retry a call more than this many times. + MAX_TIMEOUT_SEC = 5 // Sleep this many sec each 429. + DEFAULT_TIMEOUT_RETRY = 2 ) func NewMerakiClient(jchfChan chan []*kt.JCHF, gconf *kt.SnmpGlobalConfig, conf *kt.SnmpDeviceConfig, metrics *kt.SnmpDeviceMetric, log logger.ContextL) (*MerakiClient, error) { @@ -68,6 +71,7 @@ func NewMerakiClient(jchfChan chan []*kt.JCHF, gconf *kt.SnmpGlobalConfig, conf auth: httptransport.APIKeyAuth("X-Cisco-Meraki-API-Key", "header", kt.LookupEnvString(MerakiApiKey, conf.Ext.MerakiConfig.ApiKey)), timeout: 30 * time.Second, cache: newClientCache(log), + maxRetry: conf.Ext.MerakiConfig.MaxAPIRetry, } host := conf.Ext.MerakiConfig.Host @@ -75,6 +79,11 @@ func NewMerakiClient(jchfChan chan []*kt.JCHF, gconf *kt.SnmpGlobalConfig, conf host = apiclient.DefaultHost } + // Figure out max retries. + if c.maxRetry == 0 || c.maxRetry > MAX_TIMEOUT_RETRY { + c.maxRetry = DEFAULT_TIMEOUT_RETRY + } + // Figure out global or local timeout here. if conf.TimeoutMS > 0 { c.timeout = time.Duration(conf.TimeoutMS) * time.Millisecond @@ -796,9 +805,10 @@ func (c *MerakiClient) getUplinks(dur time.Duration) ([]*kt.JCHF, error) { prod, err := c.client.Organizations.GetOrganizationUplinksStatuses(params, c.auth) if err != nil { - if strings.Contains(err.Error(), "(status 429)") && timeouts < MAX_TIMEOUT_RETRY { - c.log.Warnf("Uplink Status: %s 429, sleeping", org.Name) - time.Sleep(3 * time.Second) // For right now guess on this, need to add 429 to spec. + if strings.Contains(err.Error(), "(status 429)") && timeouts < c.maxRetry { + sleepDur := time.Duration(MAX_TIMEOUT_SEC) * time.Second + c.log.Warnf("Uplink Status: %s 429, sleeping %v", org.Name, sleepDur) + time.Sleep(sleepDur) // For right now guess on this, need to add 429 to spec. timeouts++ return getUplinkStatus(nextToken, org, uplinks, timeouts) } @@ -915,8 +925,9 @@ func (c *MerakiClient) getUplinkUsage(dur time.Duration, uplinkMap map[string]de prod, err := c.client.Appliance.GetOrganizationApplianceUplinksUsageByNetwork(params, c.auth) if err != nil { if strings.Contains(err.Error(), "status 429") { - c.log.Infof("Uplink Usage: %s 429, sleeping", org.Name) - time.Sleep(3 * time.Second) // For right now guess on this, need to add 429 to spec. + sleepDur := time.Duration(MAX_TIMEOUT_SEC) * time.Second + c.log.Infof("Uplink Usage: %s 429, sleeping %v", org.Name, sleepDur) + time.Sleep(sleepDur) // For right now guess on this, need to add 429 to spec. return getUsage(params, org) } else { c.log.Warnf("Cannot get Uplink Usage: %s %v", org.Name, err) @@ -1081,9 +1092,10 @@ func (c *MerakiClient) getVpnStatus(dur time.Duration) ([]*kt.JCHF, error) { prod, err := c.client.Appliance.GetOrganizationApplianceVpnStatuses(params, c.auth) if err != nil { - if strings.Contains(err.Error(), "(status 429)") && timeouts < MAX_TIMEOUT_RETRY { - c.log.Warnf("Vpn Status: %s 429, sleeping", org.Name) - time.Sleep(3 * time.Second) // For right now guess on this, need to add 429 to spec. + if strings.Contains(err.Error(), "(status 429)") && timeouts < c.maxRetry { + sleepDur := time.Duration(MAX_TIMEOUT_SEC) * time.Second + c.log.Warnf("Vpn Status: %s 429, sleeping %v", org.Name, sleepDur) + time.Sleep(sleepDur) // For right now guess on this, need to add 429 to spec. timeouts++ return getVpnStatus(nextToken, org, vpns, timeouts) } @@ -1258,10 +1270,12 @@ func (c *MerakiClient) getDeviceStatus(dur time.Duration) ([]*kt.JCHF, error) { } prod, err := c.client.Organizations.GetOrganizationDevicesStatuses(params, c.auth) + err = fmt.Errorf("(status 429)") if err != nil { - if strings.Contains(err.Error(), "(status 429)") && timeouts < MAX_TIMEOUT_RETRY { - c.log.Warnf("Device Status: %s 429, sleeping", org.Name) - time.Sleep(3 * time.Second) // For right now guess on this, need to add 429 to spec. + if strings.Contains(err.Error(), "(status 429)") && timeouts < c.maxRetry { + sleepDur := time.Duration(MAX_TIMEOUT_SEC) * time.Second + c.log.Warnf("Device Status: %s 429, sleeping %v", org.Name, sleepDur) + time.Sleep(sleepDur) // For right now guess on this, need to add 429 to spec. timeouts++ return getDeviceStatus(nextToken, org, devices, timeouts) } diff --git a/pkg/kt/snmp.go b/pkg/kt/snmp.go index 7e838b31..f108d4bf 100644 --- a/pkg/kt/snmp.go +++ b/pkg/kt/snmp.go @@ -166,10 +166,11 @@ type MerakiConfig struct { MonitorOrgChanges bool `yaml:"monitor_org_changes"` MonitorNetworkClients bool `yaml:"monitor_clients"` MonitorVpnStatus bool `yaml:"monitor_vpn_status"` - Orgs []string `yaml:"organizations"` // Only monitor orgs in this list, if set. - Networks []string `yaml:"networks"` // Only monitor networks in this list, if set. - Prefs map[string]bool `yaml:"preferences"` // Additional fine tuning of what data is returned. - ProductTypes []string `yaml:"product_types"` // Only monitor these product types, if set. + Orgs []string `yaml:"organizations"` // Only monitor orgs in this list, if set. + Networks []string `yaml:"networks"` // Only monitor networks in this list, if set. + Prefs map[string]bool `yaml:"preferences"` // Additional fine tuning of what data is returned. + ProductTypes []string `yaml:"product_types"` // Only monitor these product types, if set. + MaxAPIRetry int `yaml:"max_http_retry"` // retry up to this many times on 429 errors. Default 2. } // Contain various extensions to snmp which can be used to get data.