Skip to content

Commit

Permalink
fix: auto discovery for consul
Browse files Browse the repository at this point in the history
  • Loading branch information
kongfei605 committed Sep 4, 2024
1 parent 8bf4f72 commit 660baef
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 126 deletions.
175 changes: 57 additions & 118 deletions inputs/prometheus/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"log"
"net/url"
"strings"
"sync"
"text/template"
"time"

Expand Down Expand Up @@ -51,156 +50,98 @@ type ConsulQuery struct {
lastQueryFailed bool
}

func (ins *Instance) InitConsulClient() error {
func (ins *Instance) InitConsulClient(ctx context.Context) error {
consulAPIConfig := api.DefaultConfig()
if ins.ConsulConfig.Agent != "" {
consulAPIConfig.Address = ins.ConsulConfig.Agent
}
consul, err := api.NewClient(consulAPIConfig)
if err != nil {
return fmt.Errorf("cannot connect to the Consul agent: %w", err)
}

i := 0
// Parse the template for metrics URL, drop queries with template parse errors
for i := range ins.ConsulConfig.Queries {
for _, q := range ins.ConsulConfig.Queries {
serviceURLTemplate, err := template.New("URL").Parse(ins.ConsulConfig.Queries[i].ServiceURL)
if err != nil {
return fmt.Errorf("failed to parse the Consul query URL template (%s): %s", ins.ConsulConfig.Queries[i].ServiceURL, err)
}
ins.ConsulConfig.Queries[i].serviceURLTemplate = serviceURLTemplate
q.serviceURLTemplate = serviceURLTemplate

// Allow to use join function in tags
templateFunctions := template.FuncMap{"join": strings.Join}
// Parse the tag value templates
ins.ConsulConfig.Queries[i].serviceExtraTagsTemplate = make(map[string]*template.Template)
q.serviceExtraTagsTemplate = make(map[string]*template.Template)
for tagName, tagTemplateString := range ins.ConsulConfig.Queries[i].ServiceExtraTags {
tagTemplate, err := template.New(tagName).Funcs(templateFunctions).Parse(tagTemplateString)
if err != nil {
return fmt.Errorf("failed to parse the Consul query Extra Tag template (%s): %s", tagTemplateString, err)
log.Println("failed to parse the Consul query Extra Tag template (%s): %s", tagTemplateString, err)
continue
}
ins.ConsulConfig.Queries[i].serviceExtraTagsTemplate[tagName] = tagTemplate
q.serviceExtraTagsTemplate[tagName] = tagTemplate
}
ins.ConsulConfig.Queries[i] = q
i++
}

// Prevent memory leak by erasing truncated values
// for j := i; j < len(ins.ConsulConfig.Queries); j++ {
// ins.ConsulConfig.Queries[j] = nil
// }
// ins.ConsulConfig.Queries = ins.ConsulConfig.Queries[:i]

consul, err := api.NewClient(consulAPIConfig)
if err != nil {
return fmt.Errorf("failed to connect the Consul agent(%s): %v", consulAPIConfig.Address, err)
for j := i; j < len(ins.ConsulConfig.Queries); j++ {
ins.ConsulConfig.Queries[j] = nil
}
ins.ConsulConfig.Queries = ins.ConsulConfig.Queries[:i]

ins.ConsulConfig.Catalog = consul.Catalog()
catalog := consul.Catalog()

return nil
}

func (ins *Instance) UrlsFromConsul(ctx context.Context) ([]ScrapeUrl, error) {
if !ins.ConsulConfig.Enabled {
return []ScrapeUrl{}, nil
}

if ins.DebugMod {
log.Println("D! get urls from consul:", ins.ConsulConfig.Agent)
}

urlset := map[string]struct{}{}
var returls []ScrapeUrl

for _, q := range ins.ConsulConfig.Queries {
queryOptions := api.QueryOptions{}
if q.ServiceDc != "" {
queryOptions.Datacenter = q.ServiceDc
}

// Request services from Consul
consulServices, _, err := ins.ConsulConfig.Catalog.Service(q.ServiceName, q.ServiceTag, &queryOptions)
ins.wg.Add(1)
go func() {
// Store last error status and change log level depending on repeated occurence
var refreshFailed = false
defer ins.wg.Done()
err := ins.refreshConsulServices(catalog)
if err != nil {
return nil, err
refreshFailed = true
log.Printf("Unable to refreh Consul services: %v", err)
}

if len(consulServices) == 0 {
if ins.DebugMod {
log.Println("D! query consul did not find any instances, service:", q.ServiceName, " tag:", q.ServiceTag)
for {
select {
case <-ctx.Done():
return
case <-time.After(time.Duration(ins.ConsulConfig.QueryInterval)):
err := ins.refreshConsulServices(catalog)
if err != nil {
message := fmt.Sprintf("Unable to refreh Consul services: %v", err)
if refreshFailed {
log.Println("E!", message)
} else {
log.Println("W!", message)
}
refreshFailed = true
} else if refreshFailed {
refreshFailed = false
log.Println("Successfully refreshed Consul services after previous errors")
}
}
continue
}
}()

if ins.DebugMod {
log.Println("D! query consul found", len(consulServices), "instances, service:", q.ServiceName, " tag:", q.ServiceTag)
}

for _, consulService := range consulServices {
su, err := ins.getConsulServiceURL(q, consulService)
if err != nil {
return nil, fmt.Errorf("unable to get scrape URLs from Consul for Service (%s, %s): %s", q.ServiceName, q.ServiceTag, err)
}

if _, has := urlset[su.URL.String()]; has {
continue
}

urlset[su.URL.String()] = struct{}{}
returls = append(returls, *su)
}
}
return nil
}

if ins.firstRun {
var wg sync.WaitGroup
consulAPIConfig := api.DefaultConfig()
if ins.ConsulConfig.Agent != "" {
consulAPIConfig.Address = ins.ConsulConfig.Agent
}
func (ins *Instance) UrlsFromConsul() ([]*ScrapeUrl, error) {
ins.lock.Lock()
defer ins.lock.Unlock()

consul, err := api.NewClient(consulAPIConfig)
if err != nil {
return []ScrapeUrl{}, fmt.Errorf("cannot connect to the Consul agent: %w", err)
}
catalog := consul.Catalog()

wg.Add(1)
go func() {
// Store last error status and change log level depending on repeated occurrence
var refreshFailed = false
defer wg.Done()
err := ins.refreshConsulServices(catalog)
if err != nil {
refreshFailed = true
log.Printf("Unable to refresh Consul services: %v\n", err)
}
refreshLoop:
for {
select {
case <-ctx.Done():
return
case <-time.After(time.Duration(ins.ConsulConfig.QueryInterval)):
err := ins.refreshConsulServices(catalog)
if err != nil {
message := fmt.Sprintf("Unable to refresh Consul services: %v", err)
if refreshFailed {
log.Println("E!", message)
} else {
log.Println("W!", message)
}
refreshFailed = true
} else {
if refreshFailed {
refreshFailed = false
log.Println("Successfully refreshed Consul services after previous errors")
}
break refreshLoop
}
}
}
}()
ins.firstRun = false
wg.Wait()
urls := make([]*ScrapeUrl, 0, len(ins.consulServices))
for _, u := range ins.consulServices {
urls = append(urls, u)
}

return returls, nil
return urls, nil
}

func (ins *Instance) refreshConsulServices(c *api.Catalog) error {
consulServiceURLs := make(map[string]ScrapeUrl)
consulServiceURLs := make(map[string]*ScrapeUrl)

if ins.DebugMod {
log.Println("Refreshing Consul services")
Expand Down Expand Up @@ -244,14 +185,12 @@ func (ins *Instance) refreshConsulServices(c *api.Catalog) error {
}
q.lastQueryFailed = false
log.Printf("Adding scrape URL from Consul for Service (%s, %s): %s\n", q.ServiceName, q.ServiceTag, uaa.URL.String())
consulServiceURLs[uaa.URL.String()] = *uaa
consulServiceURLs[uaa.URL.String()] = uaa
}
}

ins.lock.Lock()
for _, u := range consulServiceURLs {
ins.URLs = append(ins.URLs, u.URL.String())
}
ins.consulServices = consulServiceURLs
ins.lock.Unlock()

return nil
Expand Down
25 changes: 17 additions & 8 deletions inputs/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@ type Instance struct {
ignoreLabelKeysFilter filter.Filter
cancel context.CancelFunc
lock sync.Mutex
firstRun bool
tls.ClientConfig
client *http.Client

wg sync.WaitGroup
consulServices map[string]*ScrapeUrl
}

func (ins *Instance) Empty() bool {
Expand All @@ -67,8 +69,11 @@ func (ins *Instance) Init() error {
return types.ErrInstancesEmpty
}

var ctx context.Context
ctx, ins.cancel = context.WithCancel(context.Background())

if ins.ConsulConfig.Enabled && len(ins.ConsulConfig.Queries) > 0 {
if err := ins.InitConsulClient(); err != nil {
if err := ins.InitConsulClient(ctx); err != nil {
return err
}
}
Expand All @@ -80,7 +85,6 @@ func (ins *Instance) Init() error {
if ins.Timeout <= 0 {
ins.Timeout = config.Duration(time.Second * 3)
}
ins.firstRun = true

client, err := ins.createHTTPClient()
if err != nil {
Expand Down Expand Up @@ -156,6 +160,12 @@ func (p *Prometheus) GetInstances() []inputs.Instance {
return ret
}

func (p *Prometheus) Drop() {
for _, ins := range p.Instances {
ins.Drop()
}
}

func (ins *Instance) Gather(slist *types.SampleList) {
var ctx context.Context

Check failure on line 170 in inputs/prometheus/prometheus.go

View workflow job for this annotation

GitHub Actions / Go Build Check

ctx declared and not used

Check failure on line 170 in inputs/prometheus/prometheus.go

View workflow job for this annotation

GitHub Actions / Code Analysis

ctx declared and not used
urlwg := new(sync.WaitGroup)
Expand All @@ -170,12 +180,10 @@ func (ins *Instance) Gather(slist *types.SampleList) {

urlwg.Add(1)

go ins.gatherUrl(urlwg, slist, ScrapeUrl{URL: u, Tags: map[string]string{}})
go ins.gatherUrl(urlwg, slist, &ScrapeUrl{URL: u, Tags: map[string]string{}})
}

ctx, ins.cancel = context.WithCancel(context.Background())
defer ins.cancel()
urls, err := ins.UrlsFromConsul(ctx)
urls, err := ins.UrlsFromConsul()
if err != nil {
log.Println("E! failed to query urls from consul:", err)
return
Expand All @@ -187,7 +195,7 @@ func (ins *Instance) Gather(slist *types.SampleList) {
}
}

func (ins *Instance) gatherUrl(urlwg *sync.WaitGroup, slist *types.SampleList, uri ScrapeUrl) {
func (ins *Instance) gatherUrl(urlwg *sync.WaitGroup, slist *types.SampleList, uri *ScrapeUrl) {
defer urlwg.Done()

u := uri.URL
Expand Down Expand Up @@ -272,4 +280,5 @@ func (ins *Instance) setHeaders(req *http.Request) {

func (ins *Instance) Drop() {
ins.cancel()
ins.wg.Wait()
}

0 comments on commit 660baef

Please sign in to comment.