-
Notifications
You must be signed in to change notification settings - Fork 7
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Amazon Elasticache: enable the agent to automatically discover Elasti…
…cache instances and apply the necessary instrumentation using the redis or memcached exporters
- Loading branch information
Showing
11 changed files
with
776 additions
and
174 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,139 @@ | ||
package elasticache | ||
|
||
import ( | ||
"fmt" | ||
"github.com/aws/aws-sdk-go/aws" | ||
"github.com/aws/aws-sdk-go/aws/session" | ||
"github.com/aws/aws-sdk-go/service/elasticache" | ||
"github.com/coroot/coroot-aws-agent/flags" | ||
"github.com/coroot/coroot-aws-agent/utils" | ||
"github.com/coroot/logger" | ||
"github.com/oliver006/redis_exporter/exporter" | ||
"github.com/prometheus/client_golang/prometheus" | ||
mcExporter "github.com/prometheus/memcached_exporter/pkg/exporter" | ||
"net" | ||
"strconv" | ||
"time" | ||
) | ||
|
||
var ( | ||
dInfo = utils.Desc("aws_elasticache_info", "Elasticache instance info", | ||
"region", "availability_zone", "endpoint", "ipv4", "port", | ||
"engine", "engine_version", "instance_type", "cluster_id", | ||
) | ||
dStatus = utils.Desc("aws_elasticache_status", "Status of the Elasticache instance", "status") | ||
) | ||
|
||
type Collector struct { | ||
sess *session.Session | ||
|
||
metricCollector prometheus.Collector | ||
cluster elasticache.CacheCluster | ||
node elasticache.CacheNode | ||
|
||
logger logger.Logger | ||
} | ||
|
||
func NewCollector(sess *session.Session, cluster *elasticache.CacheCluster, node *elasticache.CacheNode) (*Collector, error) { | ||
if node.Endpoint == nil || node.Endpoint.Address == nil { | ||
return nil, fmt.Errorf("endpoint is not defined") | ||
} | ||
c := &Collector{ | ||
sess: sess, | ||
cluster: *cluster, | ||
node: *node, | ||
logger: logger.NewKlog(aws.StringValue(cluster.CacheClusterId)), | ||
} | ||
|
||
c.startMetricCollector() | ||
return c, nil | ||
} | ||
|
||
func (c *Collector) update(cluster *elasticache.CacheCluster, n *elasticache.CacheNode) { | ||
if aws.Int64Value(c.node.Endpoint.Port) != aws.Int64Value(n.Endpoint.Port) || aws.StringValue(c.node.Endpoint.Address) != aws.StringValue(n.Endpoint.Address) { | ||
c.cluster = *cluster | ||
c.node = *n | ||
c.startMetricCollector() | ||
} | ||
c.cluster = *cluster | ||
c.node = *n | ||
} | ||
|
||
func (c *Collector) startMetricCollector() { | ||
switch aws.StringValue(c.cluster.Engine) { | ||
case "redis": | ||
url := fmt.Sprintf("redis://%s:%d", aws.StringValue(c.node.Endpoint.Address), aws.Int64Value(c.node.Endpoint.Port)) | ||
opts := exporter.Options{ | ||
Namespace: "redis", | ||
ConfigCommandName: "CONFIG", | ||
IsCluster: false, | ||
ConnectionTimeouts: *flags.ElasticacheConnectTimeout, | ||
RedisMetricsOnly: true, | ||
} | ||
if collector, err := exporter.NewRedisExporter(url, opts); err != nil { | ||
c.logger.Warning("failed to init redis collector:", err) | ||
} else { | ||
c.logger.Info("redis collector ->", url) | ||
c.metricCollector = collector | ||
} | ||
case "memcached": | ||
address := fmt.Sprintf("%s:%d", aws.StringValue(c.node.Endpoint.Address), aws.Int64Value(c.node.Endpoint.Port)) | ||
c.metricCollector = mcExporter.New( | ||
address, | ||
*flags.ElasticacheConnectTimeout, | ||
&promLogger{c.logger}, | ||
nil, | ||
) | ||
c.logger.Info("memcached collector ->", address) | ||
} | ||
} | ||
|
||
func (c *Collector) Close() {} | ||
|
||
func (c *Collector) Collect(ch chan<- prometheus.Metric) { | ||
ch <- utils.Gauge(dStatus, 1, aws.StringValue(c.node.CacheNodeStatus)) | ||
|
||
var ip string | ||
if a, err := net.ResolveIPAddr("", aws.StringValue(c.node.Endpoint.Address)); err != nil { | ||
c.logger.Warning(err) | ||
} else { | ||
ip = a.String() | ||
} | ||
|
||
cluster := aws.StringValue(c.cluster.ReplicationGroupId) | ||
if cluster == "" { | ||
cluster = aws.StringValue(c.cluster.CacheClusterId) | ||
} | ||
|
||
ch <- utils.Gauge(dInfo, 1, | ||
aws.StringValue(c.sess.Config.Region), | ||
aws.StringValue(c.node.CustomerAvailabilityZone), | ||
aws.StringValue(c.node.Endpoint.Address), | ||
ip, | ||
strconv.Itoa(int(aws.Int64Value(c.node.Endpoint.Port))), | ||
aws.StringValue(c.cluster.Engine), | ||
aws.StringValue(c.cluster.EngineVersion), | ||
aws.StringValue(c.cluster.CacheNodeType), | ||
cluster, | ||
) | ||
|
||
if c.metricCollector != nil { | ||
t := time.Now() | ||
c.metricCollector.Collect(ch) | ||
c.logger.Info("cache metrics collected in:", time.Since(t)) | ||
} | ||
} | ||
|
||
func (c *Collector) Describe(ch chan<- *prometheus.Desc) { | ||
ch <- dInfo | ||
ch <- dStatus | ||
} | ||
|
||
type promLogger struct { | ||
l logger.Logger | ||
} | ||
|
||
func (l *promLogger) Log(keyvals ...interface{}) error { | ||
l.l.Info(keyvals...) | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
package elasticache | ||
|
||
import ( | ||
"github.com/aws/aws-sdk-go/aws" | ||
"github.com/aws/aws-sdk-go/aws/session" | ||
"github.com/aws/aws-sdk-go/service/elasticache" | ||
"github.com/coroot/coroot-aws-agent/flags" | ||
"github.com/coroot/coroot-aws-agent/utils" | ||
"github.com/coroot/logger" | ||
"github.com/prometheus/client_golang/prometheus" | ||
"time" | ||
) | ||
|
||
type Discoverer struct { | ||
reg prometheus.Registerer | ||
|
||
awsSession *session.Session | ||
|
||
instances map[string]*Collector | ||
|
||
logger logger.Logger | ||
} | ||
|
||
func NewDiscoverer(reg prometheus.Registerer, awsSession *session.Session) *Discoverer { | ||
d := &Discoverer{ | ||
reg: reg, | ||
awsSession: awsSession, | ||
instances: map[string]*Collector{}, | ||
logger: logger.NewKlog(""), | ||
} | ||
return d | ||
} | ||
|
||
func (d *Discoverer) Run() { | ||
api := elasticache.New(d.awsSession) | ||
|
||
if err := d.refresh(api); err != nil { | ||
d.logger.Warning(err) | ||
} | ||
|
||
ticker := time.Tick(*flags.DiscoveryInterval) | ||
|
||
for range ticker { | ||
if err := d.refresh(api); err != nil { | ||
d.logger.Warning(err) | ||
} | ||
} | ||
} | ||
|
||
func (d *Discoverer) refresh(api *elasticache.ElastiCache) error { | ||
t := time.Now() | ||
defer func() { | ||
d.logger.Info("elasticache clusters refreshed in:", time.Since(t)) | ||
}() | ||
|
||
var clusters []*elasticache.CacheCluster | ||
var err error | ||
|
||
input := &elasticache.DescribeCacheClustersInput{} | ||
input.ShowCacheNodeInfo = aws.Bool(true) | ||
|
||
for _, v := range []bool{false, true} { | ||
input.ShowCacheClustersNotInReplicationGroups = aws.Bool(v) | ||
output, err := api.DescribeCacheClusters(input) | ||
if err != nil { | ||
return err | ||
} | ||
clusters = append(clusters, output.CacheClusters...) | ||
} | ||
|
||
actualInstances := map[string]bool{} | ||
for _, cluster := range clusters { | ||
for _, node := range cluster.CacheNodes { | ||
id := aws.StringValue(cluster.CacheClusterId) + "/" + aws.StringValue(node.CacheNodeId) | ||
actualInstances[id] = true | ||
i, ok := d.instances[id] | ||
if !ok { | ||
d.logger.Info("new Elasticache instance found:", id) | ||
i, err = NewCollector(d.awsSession, cluster, node) | ||
if err != nil { | ||
d.logger.Warning("failed to init Elasticache collector:", err) | ||
continue | ||
} | ||
if err := d.wrappedReg(id).Register(i); err != nil { | ||
d.logger.Warning(err) | ||
continue | ||
} | ||
d.instances[id] = i | ||
} | ||
i.update(cluster, node) | ||
} | ||
} | ||
|
||
for id, i := range d.instances { | ||
if !actualInstances[id] { | ||
d.logger.Info("Elasticache instance no longer exists:", id) | ||
d.wrappedReg(id).Unregister(i) | ||
i.Close() | ||
delete(d.instances, id) | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
func (d *Discoverer) wrappedReg(instanceId string) prometheus.Registerer { | ||
id := utils.IdWithRegion(aws.StringValue(d.awsSession.Config.Region), instanceId) | ||
return prometheus.WrapRegistererWith(prometheus.Labels{"ec_instance_id": id}, d.reg) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
package flags | ||
|
||
import "gopkg.in/alecthomas/kingpin.v2" | ||
|
||
var ( | ||
AwsRegion = kingpin.Flag("aws-region", "AWS region (env: AWS_REGION)").Envar("AWS_REGION").Required().String() | ||
DiscoveryInterval = kingpin.Flag("discovery-interval", "discovery interval").Default("60s").Duration() | ||
RdsDbUser = kingpin.Flag("rds-db-user", "RDS db user (env: RDS_DB_USER)").Envar("RDS_DB_USER").String() | ||
RdsDbPassword = kingpin.Flag("rds-db-password", "RDS db password (env: RDS_DB_PASSWORD)").Envar("RDS_DB_PASSWORD").String() | ||
RdsDbConnectTimeout = kingpin.Flag("rds-db-connect-timeout", "RDS db connect timeout").Default("1s").Duration() | ||
RdsDbQueryTimeout = kingpin.Flag("rds-db-query-timeout", "RDS db query timeout").Default("30s").Duration() | ||
RdsLogsScrapeInterval = kingpin.Flag("rds-logs-scrape-interval", "RDS logs scrape interval (0 to disable)").Default("30s").Duration() | ||
DbScrapeInterval = kingpin.Flag("db-scrape-interval", "How often to scrape DB system views").Default("30s").Duration() | ||
ElasticacheConnectTimeout = kingpin.Flag("ec-connect-timeout", "Elasticache connect timeout").Default("1s").Duration() | ||
ListenAddress = kingpin.Flag("listen-address", `Listen address (env: LISTEN_ADDRESS) - "<ip>:<port>" or ":<port>".`).Envar("LISTEN_ADDRESS").Default("0.0.0.0:80").String() | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.