From 145aabb5978fa360c901f107f2e89ca14dd64241 Mon Sep 17 00:00:00 2001 From: EHSchmitt4395 <58869136+EHSchmitt4395@users.noreply.github.com> Date: Mon, 2 Dec 2024 08:27:10 -1000 Subject: [PATCH] influxdb2otel receiver (#1760) * initial commit for influxdb2otel receiver * changelog * Update docs/sources/reference/components/otelcol/otelcol.receiver.influxdb.md Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com> * Update docs/sources/reference/components/otelcol/otelcol.receiver.influxdb.md Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com> * Update docs/sources/reference/components/otelcol/otelcol.receiver.influxdb.md Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com> * convert logic * Update CHANGELOG.md Co-authored-by: William Dumont * Update docs/sources/reference/components/otelcol/otelcol.receiver.influxdb.md Co-authored-by: William Dumont * Update docs/sources/reference/components/otelcol/otelcol.receiver.influxdb.md Co-authored-by: William Dumont * Update internal/component/all/all.go Co-authored-by: William Dumont * fixes from review * unrelated build failures - kick * caught issue along with compression change * caught issue along with compression change * Update docs/sources/reference/components/otelcol/otelcol.receiver.influxdb.md Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com> * Update docs/sources/reference/components/otelcol/otelcol.receiver.influxdb.md Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com> * compression test fix and suggestions * compression added in docs * changelog fix * changelog fix * changelog fix * changelog fix * format doc * docs --------- Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com> Co-authored-by: William Dumont --- CHANGELOG.md | 2 + .../sources/reference/compatibility/_index.md | 1 + .../otelcol/otelcol.receiver.influxdb.md | 167 +++++++++++++ go.mod | 9 + go.sum | 29 +++ internal/component/all/all.go | 1 + .../otelcol/receiver/influxdb/influxdb.go | 84 +++++++ .../receiver/influxdb/influxdb_test.go | 231 ++++++++++++++++++ .../converter_influxdbreceiver.go | 81 ++++++ .../otelcolconvert/testdata/influxdb.alloy | 11 + .../otelcolconvert/testdata/influxdb.yaml | 13 + 11 files changed, 629 insertions(+) create mode 100644 docs/sources/reference/components/otelcol/otelcol.receiver.influxdb.md create mode 100644 internal/component/otelcol/receiver/influxdb/influxdb.go create mode 100644 internal/component/otelcol/receiver/influxdb/influxdb_test.go create mode 100644 internal/converter/internal/otelcolconvert/converter_influxdbreceiver.go create mode 100644 internal/converter/internal/otelcolconvert/testdata/influxdb.alloy create mode 100644 internal/converter/internal/otelcolconvert/testdata/influxdb.yaml diff --git a/CHANGELOG.md b/CHANGELOG.md index e7cb641961..f6ecd058e2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,8 @@ Main (unreleased) - (_Experimental_) Add a `database_observability.mysql` component to collect mysql performance data. +- Add `otelcol.receiver.influxdb` to convert influx metric into OTEL. (@EHSchmitt4395) + ### Enhancements - Add second metrics sample to the support bundle to provide delta information (@dehaansa) diff --git a/docs/sources/reference/compatibility/_index.md b/docs/sources/reference/compatibility/_index.md index 432f2b8dd6..a11bd3ea1c 100644 --- a/docs/sources/reference/compatibility/_index.md +++ b/docs/sources/reference/compatibility/_index.md @@ -362,6 +362,7 @@ The following components, grouped by namespace, _consume_ OpenTelemetry `otelcol - [otelcol.processor.transform](../components/otelcol/otelcol.processor.transform) - [otelcol.receiver.datadog](../components/otelcol/otelcol.receiver.datadog) - [otelcol.receiver.file_stats](../components/otelcol/otelcol.receiver.file_stats) +- [otelcol.receiver.influxdb](../components/otelcol/otelcol.receiver.influxdb) - [otelcol.receiver.jaeger](../components/otelcol/otelcol.receiver.jaeger) - [otelcol.receiver.kafka](../components/otelcol/otelcol.receiver.kafka) - [otelcol.receiver.loki](../components/otelcol/otelcol.receiver.loki) diff --git a/docs/sources/reference/components/otelcol/otelcol.receiver.influxdb.md b/docs/sources/reference/components/otelcol/otelcol.receiver.influxdb.md new file mode 100644 index 0000000000..465eb67e20 --- /dev/null +++ b/docs/sources/reference/components/otelcol/otelcol.receiver.influxdb.md @@ -0,0 +1,167 @@ +--- +canonical: https://grafana.com/docs/alloy/latest/reference/components/otelcol/otelcol.receiver.influxdb/ +description: Learn about otelcol.receiver.influxdb +title: otelcol.receiver.influxdb +--- + +# otelcol.receiver.influxdb + +`otelcol.receiver.influxdb` receives InfluxDB metrics, converts them into OpenTelemetry (OTEL) format, and forwards them to other `otelcol.*` components over the network. + +You can specify multiple `otelcol.receiver.influxdb` components by giving them different labels. + +## Usage + +```alloy +otelcol.receiver.influxdb "influxdb_metrics" { + endpoint = "localhost:8086" // InfluxDB metrics ingestion endpoint + + output { + metrics = [...] + } +} +``` + +## Arguments + +`otelcol.receiver.influxdb` supports the following arguments: + +| Name | Type | Description | Default | Required | +| ------------------------ | -------------- | --------------------------------------------------------------- | ---------------------------------------------------------- | -------- | +| `endpoint` | `string` | `host:port` to listen for traffic on. | `"localhost:8086"` | no | +| `max_request_body_size` | `string` | Maximum request body size the server will allow. | `20MiB` | no | +| `include_metadata` | `boolean` | Propagate incoming connection metadata to downstream consumers. | | no | +| `compression_algorithms` | `list(string)` | A list of compression algorithms the server can accept. | `["", "gzip", "zstd", "zlib", "snappy", "deflate", "lz4"]` | no | + +By default, `otelcol.receiver.influxdb` listens for HTTP connections on `localhost`. +To expose the HTTP server to other machines on your network, configure `endpoint` with the IP address to listen on, or `0.0.0.0:8086` to listen on all network interfaces. + +## Blocks + +The following blocks are supported inside the definition of `otelcol.receiver.influxdb`: + +| Hierarchy | Block | Description | Required | +| ------------- | ----------------- | ----------------------------------------------------- | -------- | +| tls | [tls][] | Configures TLS for the HTTP server. | no | +| cors | [cors][] | Configures CORS for the HTTP server. | no | +| debug_metrics | [debug_metrics][] | Configures the metrics that this component generates. | no | +| output | [output][] | Configures where to send received metrics. | yes | + +[tls]: #tls-block +[cors]: #cors-block +[debug_metrics]: #debug_metrics-block +[output]: #output-block + +### tls block + +The `tls` block configures TLS settings used for a server. If the `tls` block +isn't provided, TLS won't be used for connections to the server. + +{{< docs/shared lookup="reference/components/otelcol-tls-server-block.md" source="alloy" version="" >}} + +### cors block + +The `cors` block configures CORS settings for an HTTP server. + +The following arguments are supported: + +| Name | Type | Description | Default | Required | +| ----------------- | -------------- | ---------------------------------------- | ---------------------- | -------- | +| `allowed_origins` | `list(string)` | Allowed values for the `Origin` header. | | no | +| `allowed_headers` | `list(string)` | Accepted headers from CORS requests. | `["X-Requested-With"]` | no | +| `max_age` | `number` | Configures the `Access-Control-Max-Age`. | | no | + +The `allowed_headers` argument specifies which headers are acceptable from a +CORS request. The following headers are always implicitly allowed: + +* `Accept` +* `Accept-Language` +* `Content-Type` +* `Content-Language` + +If `allowed_headers` includes `"*"`, all headers are permitted. + +### debug_metrics block + +{{< docs/shared lookup="reference/components/otelcol-debug-metrics-block.md" source="alloy" version="" >}} + +### output block + +{{< docs/shared lookup="reference/components/output-block.md" source="alloy" version="" >}} + +## Exported fields + +`otelcol.receiver.influxdb` doesn't export any fields. + +## Component health + +`otelcol.receiver.influxdb` is only reported as unhealthy if given an invalid configuration. + +## Debug information + +`otelcol.receiver.influxdb` doesn't expose any component-specific debug information. + +## Example + +This example forwards received telemetry through a batch processor before finally sending it to an OTLP-capable endpoint: + +```alloy +otelcol.receiver.influxdb "influxdb_metrics" { + output { + metrics = [otelcol.processor.batch.default.input] + } +} + +otelcol.processor.batch "default" { + output { + metrics = [otelcol.exporter.otlp.default.input] + } +} + +otelcol.exporter.otlp "default" { + client { + endpoint = sys.env("OTLP_ENDPOINT") + } +} +``` + +This example forwards received telemetry to Prometheus Remote Write (Mimir): + +```alloy +otelcol.receiver.influxdb "influxdb_metrics" { + output { + metrics = [otelcol.exporter.prometheus.influx_output.input] // Forward metrics to Prometheus exporter + } +} + +otelcol.exporter.prometheus "influx_output" { + forward_to = [prometheus.remote_write.mimir.receiver] // Forward metrics to Prometheus remote write (Mimir) +} + +prometheus.remote_write "mimir" { + endpoint { + url = "https://prometheus-xxx.grafana.net/api/prom/push" + + basic_auth { + username = "xxxxx" + password = "xxxx==" + } + } +} +``` + + + +## Compatible components + +`otelcol.receiver.influxdb` can accept arguments from the following components: + +- Components that export [OpenTelemetry `otelcol.Consumer`](../../../compatibility/#opentelemetry-otelcolconsumer-exporters) + + +{{< admonition type="note" >}} +Connecting some components may not be sensible or components may require further configuration to make the connection work correctly. +Refer to the linked documentation for more details. +{{< /admonition >}} + + diff --git a/go.mod b/go.mod index 56ae805887..acd1199831 100644 --- a/go.mod +++ b/go.mod @@ -831,10 +831,19 @@ require ( github.com/DataDog/datadog-agent/comp/core/log/def v0.57.1 // indirect github.com/antchfx/xmlquery v1.4.2 // indirect github.com/antchfx/xpath v1.3.2 // indirect + github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect github.com/ebitengine/purego v0.8.0 // indirect github.com/elastic/lunes v0.1.0 // indirect + github.com/influxdata/influxdb-client-go/v2 v2.14.0 + github.com/influxdata/influxdb-observability/common v0.5.12 // indirect + github.com/influxdata/influxdb-observability/influx2otel v0.5.12 // indirect + github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c + github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 // indirect + github.com/influxdata/line-protocol/v2 v2.2.1 // indirect github.com/moby/sys/userns v0.1.0 // indirect + github.com/oapi-codegen/runtime v1.0.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic v0.112.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/receiver/influxdbreceiver v0.112.0 github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect go.opentelemetry.io/collector/connector/connectorprofiles v0.112.0 // indirect go.opentelemetry.io/collector/consumer/consumererror v0.112.0 // indirect diff --git a/go.sum b/go.sum index fec097834b..850c4de59d 100644 --- a/go.sum +++ b/go.sum @@ -956,6 +956,7 @@ github.com/PuerkitoBio/rehttp v1.4.0 h1:rIN7A2s+O9fmHUM1vUcInvlHj9Ysql4hE+Y0wcl/ github.com/PuerkitoBio/rehttp v1.4.0/go.mod h1:LUwKPoDbDIA2RL5wYZCNsQ90cx4OJ4AWBmq6KzWZL1s= github.com/PuerkitoBio/urlesc v0.0.0-20160726150825-5bd2802263f2/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= +github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk= github.com/SAP/go-hdb v0.12.0/go.mod h1:etBT+FAi1t5k3K3tf5vQTnosgYmhDkRi8jEnQqCnxF0= github.com/SermoDigital/jose v0.0.0-20180104203859-803625baeddc/go.mod h1:ARgCUhI1MHQH+ONky/PAtmVHQrP5JlGY0F3poXOp/fA= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= @@ -1025,6 +1026,8 @@ github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb github.com/apache/thrift v0.16.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU= github.com/apache/thrift v0.21.0 h1:tdPmh/ptjE1IJnhbhrcl2++TauVjy242rkV/UzJChnE= github.com/apache/thrift v0.21.0/go.mod h1:W1H8aR/QRtYNvrPeFXBtobyRkd0/YVhTc6i07XIAgDw= +github.com/apapsch/go-jsonmerge/v2 v2.0.0 h1:axGnT1gRIfimI7gJifB699GoE/oq+F2MU7Dml6nw9rQ= +github.com/apapsch/go-jsonmerge/v2 v2.0.0/go.mod h1:lvDnEdqiQrp0O42VQGgmlKpxL1AP2+08jFMw88y4klk= github.com/aristanetworks/glog v0.0.0-20191112221043-67e8567f59f3/go.mod h1:KASm+qXFKs/xjSoWn30NrWBBvdTTQq+UjkhjEJHfSFA= github.com/aristanetworks/goarista v0.0.0-20190325233358-a123909ec740/go.mod h1:D/tb0zPVXnP7fmsLZjtdUhSsumbK/ij54UXjjVgMGxQ= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= @@ -1154,6 +1157,7 @@ github.com/bkaradzic/go-lz4 v1.0.0 h1:RXc4wYsyz985CkXXeX04y4VnZFGG8Rd43pRaHsOXAK github.com/bkaradzic/go-lz4 v1.0.0/go.mod h1:0YdlkowM3VswSROI7qDxhRvJ3sLhlFrRRwjwegp5jy4= github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM= github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ= +github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w= github.com/bmatcuk/doublestar v1.3.4 h1:gPypJ5xD31uhX6Tf54sDPUOBXTqKH4c9aPY66CyQrS0= github.com/bmatcuk/doublestar v1.3.4/go.mod h1:wiQtGV+rzVYxB7WIlirSN++5HPtPlXEo9MEoZQC/PmE= github.com/bmatcuk/doublestar/v4 v4.7.1 h1:fdDeAqgT47acgwd9bd9HxJRDmc9UAmPpc+2m0CXv75Q= @@ -1475,6 +1479,8 @@ github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVB github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20= github.com/frankban/quicktest v1.10.0/go.mod h1:ui7WezCLWMWxVWr1GETZY3smRy0G4KWq9vcPtJmFl7Y= github.com/frankban/quicktest v1.10.2/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s= +github.com/frankban/quicktest v1.11.0/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s= +github.com/frankban/quicktest v1.11.2/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s= github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= github.com/frankban/quicktest v1.13.0/go.mod h1:qLE0fzW0VuyUAJgPU19zByoIr0HtCHN/r/VLSOOIySU= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= @@ -2118,7 +2124,24 @@ github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLf github.com/infinityworks/go-common v0.0.0-20170820165359-7f20a140fd37 h1:Lm6kyC3JBiJQvJrus66He0E4viqDc/m5BdiFNSkIFfU= github.com/infinityworks/go-common v0.0.0-20170820165359-7f20a140fd37/go.mod h1:+OaHNKQvQ9oOCr+DgkF95PkiDx20fLHpzMp8SmRPQTg= github.com/influxdata/go-syslog/v2 v2.0.1/go.mod h1:hjvie1UTaD5E1fTnDmxaCw8RRDrT4Ve+XHr5O2dKSCo= +github.com/influxdata/influxdb-client-go/v2 v2.14.0 h1:AjbBfJuq+QoaXNcrova8smSjwJdUHnwvfjMF71M1iI4= +github.com/influxdata/influxdb-client-go/v2 v2.14.0/go.mod h1:Ahpm3QXKMJslpXl3IftVLVezreAUtBOTZssDrjZEFHI= +github.com/influxdata/influxdb-observability/common v0.5.12 h1:4YwZ+vsodz6VfoiX+ZqVotmnyCa9vCCPksSBK/WLjBs= +github.com/influxdata/influxdb-observability/common v0.5.12/go.mod h1:u+CABnGO/F1IK51pDlZQroh4+igJNo695XrbLGDBhVc= +github.com/influxdata/influxdb-observability/influx2otel v0.5.12 h1:u0lNE3+63rILk4mtmCYsNyczH/1wEXnM+1aBzBe5akk= +github.com/influxdata/influxdb-observability/influx2otel v0.5.12/go.mod h1:bM407XIJYnrJYJ9Q3q2ytDSOyFhiYmGm0Sz1Qf48RPk= github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= +github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c h1:qSHzRbhzK8RdXOsAdfDgO49TtqC1oZ+acxPrkfTxcCs= +github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= +github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU= +github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo= +github.com/influxdata/line-protocol-corpus v0.0.0-20210519164801-ca6fa5da0184/go.mod h1:03nmhxzZ7Xk2pdG+lmMd7mHDfeVOYFyhOgwO61qWU98= +github.com/influxdata/line-protocol-corpus v0.0.0-20210922080147-aa28ccfb8937 h1:MHJNQ+p99hFATQm6ORoLmpUCF7ovjwEFshs/NHzAbig= +github.com/influxdata/line-protocol-corpus v0.0.0-20210922080147-aa28ccfb8937/go.mod h1:BKR9c0uHSmRgM/se9JhFHtTT7JTO67X23MtKMHtZcpo= +github.com/influxdata/line-protocol/v2 v2.0.0-20210312151457-c52fdecb625a/go.mod h1:6+9Xt5Sq1rWx+glMgxhcg2c0DUaehK+5TDcPZ76GypY= +github.com/influxdata/line-protocol/v2 v2.1.0/go.mod h1:QKw43hdUBg3GTk2iC3iyCxksNj7PX9aUSeYOYE/ceHY= +github.com/influxdata/line-protocol/v2 v2.2.1 h1:EAPkqJ9Km4uAxtMRgUubJyqAr6zgWM0dznKMLRauQRE= +github.com/influxdata/line-protocol/v2 v2.2.1/go.mod h1:DmB3Cnh+3oxmG6LOBIxce4oaL4CPj3OmMPgvauXh+tM= github.com/influxdata/tail v1.0.1-0.20200707181643-03a791b270e4/go.mod h1:VeiWgI3qaGdJWust2fP27a6J+koITo/1c/UhxeOxgaM= github.com/influxdata/tdigest v0.0.2-0.20210216194612-fc98d27c9e8b h1:i44CesU68ZBRvtCjBi3QSosCIKrjmMbYlQMFAwVLds4= github.com/influxdata/tdigest v0.0.2-0.20210216194612-fc98d27c9e8b/go.mod h1:Z0kXnxzbTC2qrx4NaIzYkE1k66+6oEDQTvL95hQFh5Y= @@ -2246,6 +2269,7 @@ github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/X github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/juju/gnuflag v0.0.0-20171113085948-2ce1bb71843d/go.mod h1:2PavIy+JPciBPrBUjwbNvtwB6RQlve+hkpll6QSNmOE= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= @@ -2523,6 +2547,8 @@ github.com/npillmayer/nestext v0.1.3/go.mod h1:h2lrijH8jpicr25dFY+oAJLyzlya6jhnu github.com/nsqio/go-nsq v1.0.7/go.mod h1:XP5zaUs3pqf+Q71EqUJs3HYfBIqfK6G83WQMdNN+Ito= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= +github.com/oapi-codegen/runtime v1.0.0 h1:P4rqFX5fMFWqRzY9M/3YF9+aPSPPB06IzP2P7oOxrWo= +github.com/oapi-codegen/runtime v1.0.0/go.mod h1:LmCUMQuPB4M/nLXilQXhHw+BLZdDb18B34OO356yJ/A= github.com/ohler55/ojg v1.20.1 h1:Io65sHjMjYPI7yuhUr8VdNmIQdYU6asKeFhOs8xgBnY= github.com/ohler55/ojg v1.20.1/go.mod h1:uHcD1ErbErC27Zhb5Df2jUjbseLLcmOCo6oxSr3jZxo= github.com/oklog/oklog v0.3.2/go.mod h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtbWGs= @@ -2680,6 +2706,8 @@ github.com/open-telemetry/opentelemetry-collector-contrib/receiver/filestatsrece github.com/open-telemetry/opentelemetry-collector-contrib/receiver/filestatsreceiver v0.112.0/go.mod h1:qQMX7V36/wkJnWJB6r6MbSELMIy/njHfGBC4SowMFOM= github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver v0.112.0 h1:ZSSL9GUuCz70W5uWz3NyjxyhKDyEuv/Vd+93sPVK4Mc= github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver v0.112.0/go.mod h1:xlH6PiPu8MhR4R84Os2ubLKEkHJfNJh9AIaUiuGGan4= +github.com/open-telemetry/opentelemetry-collector-contrib/receiver/influxdbreceiver v0.112.0 h1:rY67JRHSGWtDZLF3LybWV06NMLCTByob7uiJ/AwVdds= +github.com/open-telemetry/opentelemetry-collector-contrib/receiver/influxdbreceiver v0.112.0/go.mod h1:YTuWqQp/2+5Td9oAoZttTyPR/y2oREzl8rDFDorBijo= github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jaegerreceiver v0.112.0 h1:CcZK9C7nmSAwt/+Vh0QENiVwPaCnHXma+IBep9SpjtE= github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jaegerreceiver v0.112.0/go.mod h1:FcEmKx4Zd14M0Y9pEZeGZOEIBLqhErtGk75C/ZZsquA= github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver v0.112.0 h1:s9SFpmaztDQASACSYd3+n3gC8X3zI+/XKwlBXH8CuvE= @@ -3054,6 +3082,7 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An github.com/spf13/viper v1.6.2/go.mod h1:t3iDnF5Jlj76alVNuyFBk5oUMCvsrkbvZK0WQdfDi5k= github.com/spf13/viper v1.19.0 h1:RWq5SEjt8o25SROyN3z2OrDB9l7RPd3lwTWU8EcEdcI= github.com/spf13/viper v1.19.0/go.mod h1:GQUN9bilAbhU/jgc1bKs99f/suXKeUMct8Adx5+Ntkg= +github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0= github.com/stormcat24/protodep v0.1.8 h1:FOycjjkjZiastf21aRoCjtoVdhsoBE8mZ0RvY6AHqFE= github.com/stormcat24/protodep v0.1.8/go.mod h1:6OoSZD5GGomKfmH1LvfJxNIRvYhewFXH5+eNv8h4wOM= github.com/streadway/amqp v0.0.0-20180528204448-e5adc2ada8b8/go.mod h1:1WNBiOZtZQLpVAyu0iTduoJL9hEsMloAK5XWrtW0xdY= diff --git a/internal/component/all/all.go b/internal/component/all/all.go index 4b4655a0c6..65d48a019e 100644 --- a/internal/component/all/all.go +++ b/internal/component/all/all.go @@ -96,6 +96,7 @@ import ( _ "github.com/grafana/alloy/internal/component/otelcol/processor/tail_sampling" // Import otelcol.processor.tail_sampling _ "github.com/grafana/alloy/internal/component/otelcol/processor/transform" // Import otelcol.processor.transform _ "github.com/grafana/alloy/internal/component/otelcol/receiver/datadog" // Import otelcol.receiver.datadog + _ "github.com/grafana/alloy/internal/component/otelcol/receiver/influxdb" // Import otelcol.receiver.influxdb _ "github.com/grafana/alloy/internal/component/otelcol/receiver/file_stats" // Import otelcol.receiver.file_stats _ "github.com/grafana/alloy/internal/component/otelcol/receiver/jaeger" // Import otelcol.receiver.jaeger _ "github.com/grafana/alloy/internal/component/otelcol/receiver/kafka" // Import otelcol.receiver.kafka diff --git a/internal/component/otelcol/receiver/influxdb/influxdb.go b/internal/component/otelcol/receiver/influxdb/influxdb.go new file mode 100644 index 0000000000..9f6f21472e --- /dev/null +++ b/internal/component/otelcol/receiver/influxdb/influxdb.go @@ -0,0 +1,84 @@ +// influxdb.go +package influxdb + +import ( + "fmt" + + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/component/otelcol" + otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config" + "github.com/grafana/alloy/internal/component/otelcol/receiver" + "github.com/grafana/alloy/internal/featuregate" + influxdbreceiver "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/influxdbreceiver" + otelcomponent "go.opentelemetry.io/collector/component" + otelextension "go.opentelemetry.io/collector/extension" + "go.opentelemetry.io/collector/pipeline" +) + +func init() { + component.Register(component.Registration{ + Name: "otelcol.receiver.influxdb", + Stability: featuregate.StabilityGenerallyAvailable, + Args: Arguments{}, + + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + fact := influxdbreceiver.NewFactory() + return receiver.New(opts, fact, args.(Arguments)) + }, + }) +} + +// Arguments configures the otelcol.receiver.influxdb component. +type Arguments struct { + HTTPServer otelcol.HTTPServerArguments `alloy:",squash"` + + DebugMetrics otelcolCfg.DebugMetricsArguments `alloy:"debug_metrics,block,optional"` + + Output *otelcol.ConsumerArguments `alloy:"output,block"` +} + +var _ receiver.Arguments = Arguments{} + +func (args *Arguments) SetToDefault() { + *args = Arguments{ + HTTPServer: otelcol.HTTPServerArguments{ + Endpoint: "localhost:8086", + CompressionAlgorithms: append([]string(nil), otelcol.DefaultCompressionAlgorithms...), + }, + } + args.DebugMetrics.SetToDefault() +} +// Validate ensures that the Arguments configuration is valid. +func (args *Arguments) Validate() error { + if args.HTTPServer.Endpoint == "" { + return fmt.Errorf("HTTP server endpoint cannot be empty") + } + return nil +} + +// Convert implements receiver.Arguments. +func (args Arguments) Convert() (otelcomponent.Config, error) { + return &influxdbreceiver.Config{ + ServerConfig: *args.HTTPServer.Convert(), + }, nil +} + +// Extensions implements receiver.Arguments. +func (args Arguments) Extensions() map[otelcomponent.ID]otelextension.Extension { + return nil +} + +// Exporters implements receiver.Arguments. +func (args Arguments) Exporters() map[pipeline.Signal]map[otelcomponent.ID]otelcomponent.Component { + return nil +} + +// NextConsumers implements receiver.Arguments. +func (args Arguments) NextConsumers() *otelcol.ConsumerArguments { + return args.Output +} + +// DebugMetricsConfig implements receiver.Arguments. +func (args Arguments) DebugMetricsConfig() otelcolCfg.DebugMetricsArguments { + return args.DebugMetrics +} diff --git a/internal/component/otelcol/receiver/influxdb/influxdb_test.go b/internal/component/otelcol/receiver/influxdb/influxdb_test.go new file mode 100644 index 0000000000..86fc7d83d0 --- /dev/null +++ b/internal/component/otelcol/receiver/influxdb/influxdb_test.go @@ -0,0 +1,231 @@ +// influxdb_test.go +package influxdb_test + +import ( + "context" + "fmt" + "testing" + "time" + "github.com/grafana/alloy/internal/component/otelcol/receiver/influxdb" + "github.com/grafana/alloy/internal/component/otelcol/internal/fakeconsumer" + "github.com/grafana/alloy/internal/component/otelcol" + "github.com/grafana/alloy/syntax" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + influxdb2 "github.com/influxdata/influxdb-client-go/v2" + influxdb1 "github.com/influxdata/influxdb1-client/v2" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/receiver/receivertest" + influxdbreceiver "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/influxdbreceiver" +) + +type mockConsumer struct { + lastMetricsConsumed pmetric.Metrics +} + +func (m *mockConsumer) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: false} +} + +func (m *mockConsumer) ConsumeMetrics(_ context.Context, md pmetric.Metrics) error { + m.lastMetricsConsumed = pmetric.NewMetrics() + md.CopyTo(m.lastMetricsConsumed) + return nil +} +// Helper function to create a free address for testing +func getFreeAddr(t *testing.T) string { + t.Helper() + port := 8086 // Use a suitable port for testing + return fmt.Sprintf("localhost:%d", port) +} +// provided channel. +func makeMetricsOutput(ch chan pmetric.Metrics) *otelcol.ConsumerArguments { + metricConsumer := fakeconsumer.Consumer{ + ConsumeMetricsFunc: func(ctx context.Context, t pmetric.Metrics) error { + select { + case <-ctx.Done(): + return ctx.Err() + case ch <- t: + return nil + } + }, + } + + return &otelcol.ConsumerArguments{ + Metrics: []otelcol.Consumer{&metricConsumer}, + } +} +// TestInfluxdbUnmarshal tests the unmarshaling of the Alloy configuration into influxdb.Arguments +func TestInfluxdbUnmarshal(t *testing.T) { + metricCh := make(chan pmetric.Metrics) + + influxdbConfig := ` + endpoint = "localhost:8086" + compression_algorithms = ["gzip", "zstd"] + + debug_metrics { + disable_high_cardinality_metrics = false + } + output { + } + ` + var args influxdb.Arguments + err := syntax.Unmarshal([]byte(influxdbConfig), &args) + require.NoError(t, err, "Unmarshaling should not produce an error") + + // Set up the metrics output for testing + args.Output = makeMetricsOutput(metricCh) + + // Validate HTTPServer block + assert.Equal(t, "localhost:8086", args.HTTPServer.Endpoint, "HTTPServer.Endpoint should match") + assert.ElementsMatch(t, []string{"gzip", "zstd"}, args.HTTPServer.CompressionAlgorithms, "HTTPServer.CompressionAlgorithms should match") + + // Validate debug_metrics block + assert.Equal(t, false, args.DebugMetrics.DisableHighCardinalityMetrics, "DebugMetrics.DisableHighCardinalityMetrics should be false") + + // Validate output block + require.NotNil(t, args.Output, "Output block should not be nil") + require.Len(t, args.Output.Metrics, 1, "There should be exactly one metrics output") +} + + +// TestWriteLineProtocol_Alloy tests the InfluxDB receiver's ability to process metrics +func TestWriteLineProtocol_Alloy(t *testing.T) { + addr := getFreeAddr(t) + config := &influxdbreceiver.Config{ + ServerConfig: confighttp.ServerConfig{ + Endpoint: addr, + }, + } + nextConsumer := new(mockConsumer) + + receiver, outerErr := influxdbreceiver.NewFactory().CreateMetrics(context.Background(), receivertest.NewNopSettings(), config, nextConsumer) + require.NoError(t, outerErr) + require.NotNil(t, receiver) + + require.NoError(t, receiver.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { require.NoError(t, receiver.Shutdown(context.Background())) }) + + // Send test data using InfluxDB client v1 + t.Run("influxdb-client-v1", func(t *testing.T) { + nextConsumer.lastMetricsConsumed = pmetric.NewMetrics() + + client, err := influxdb1.NewHTTPClient(influxdb1.HTTPConfig{ + Addr: "http://" + addr, + Timeout: time.Second, + }) + require.NoError(t, err) + + batchPoints, err := influxdb1.NewBatchPoints(influxdb1.BatchPointsConfig{Precision: "µs"}) + require.NoError(t, err) + point, err := influxdb1.NewPoint("cpu_temp", map[string]string{"foo": "bar"}, map[string]any{"gauge": 87.332}) + require.NoError(t, err) + batchPoints.AddPoint(point) + err = client.Write(batchPoints) + require.NoError(t, err) + + metrics := nextConsumer.lastMetricsConsumed + assert.NotNil(t, metrics) + assert.Equal(t, 1, metrics.MetricCount()) + metric := metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0) + assert.Equal(t, "cpu_temp", metric.Name()) + assert.InEpsilon(t, 87.332, metric.Gauge().DataPoints().At(0).DoubleValue(), 0.001) + }) + + // Send test data using InfluxDB client v2 + t.Run("influxdb-client-v2", func(t *testing.T) { + nextConsumer.lastMetricsConsumed = pmetric.NewMetrics() + + o := influxdb2.DefaultOptions() + o.SetPrecision(time.Microsecond) + client := influxdb2.NewClientWithOptions("http://"+addr, "", o) + t.Cleanup(client.Close) + + err := client.WriteAPIBlocking("my-org", "my-bucket").WriteRecord(context.Background(), "cpu_temp,foo=bar gauge=87.332") + require.NoError(t, err) + + metrics := nextConsumer.lastMetricsConsumed + assert.NotNil(t, metrics) + assert.Equal(t, 1, metrics.MetricCount()) + metric := metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0) + assert.Equal(t, "cpu_temp", metric.Name()) + assert.InEpsilon(t, 87.332, metric.Gauge().DataPoints().At(0).DoubleValue(), 0.001) + }) +} +func TestReceiverStart(t *testing.T) { + addr := getFreeAddr(t) + metricCh := make(chan pmetric.Metrics) + config := influxdb.Arguments{ + HTTPServer: otelcol.HTTPServerArguments{ + Endpoint: addr, + CompressionAlgorithms: []string{"gzip", "zstd"}, + }, + Output: makeMetricsOutput(metricCh), + } + + convertedConfig, err := config.Convert() + require.NoError(t, err, "Failed to convert configuration") + + receiver, err := influxdbreceiver.NewFactory().CreateMetrics( + context.Background(), + receivertest.NewNopSettings(), + convertedConfig, + new(mockConsumer), + ) + require.NoError(t, err, "Failed to create receiver") + + require.NoError(t, receiver.Start(context.Background(), componenttest.NewNopHost())) + defer func() { require.NoError(t, receiver.Shutdown(context.Background())) }() + + require.NoError(t, nil, "Receiver failed to start") +} +func TestReceiverProcessesMetrics(t *testing.T) { + addr := getFreeAddr(t) + nextConsumer := &mockConsumer{} + + config := influxdb.Arguments{ + HTTPServer: otelcol.HTTPServerArguments{ + Endpoint: addr, + CompressionAlgorithms: []string{"gzip"}, + }, + Output: nil, // Output will not be used since we are directly testing the consumer + } + + convertedConfig, err := config.Convert() + require.NoError(t, err, "Failed to convert configuration") + + receiver, err := influxdbreceiver.NewFactory().CreateMetrics( + context.Background(), + receivertest.NewNopSettings(), + convertedConfig, + nextConsumer, + ) + require.NoError(t, err, "Failed to create receiver") + + require.NoError(t, receiver.Start(context.Background(), componenttest.NewNopHost())) + defer func() { require.NoError(t, receiver.Shutdown(context.Background())) }() + + t.Log("Receiver started successfully") + + // Simulate sending data to the receiver + o := influxdb2.DefaultOptions().SetUseGZip(true) + o.SetPrecision(time.Microsecond) + client := influxdb2.NewClientWithOptions("http://"+addr, "", o) + defer client.Close() + + t.Log("Sending test payload") + err = client.WriteAPIBlocking("org", "bucket").WriteRecord(context.Background(), "cpu_temp,foo=bar gauge=87.332") + require.NoError(t, err, "Failed to send metrics") + + // Validate the output + t.Log("Waiting for metrics to be consumed") + require.NotNil(t, nextConsumer.lastMetricsConsumed, "No metrics consumed") + require.Equal(t, 1, nextConsumer.lastMetricsConsumed.MetricCount(), "Unexpected metric count") + + metric := nextConsumer.lastMetricsConsumed.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0) + assert.Equal(t, "cpu_temp", metric.Name()) + assert.InEpsilon(t, 87.332, metric.Gauge().DataPoints().At(0).DoubleValue(), 0.001) +} diff --git a/internal/converter/internal/otelcolconvert/converter_influxdbreceiver.go b/internal/converter/internal/otelcolconvert/converter_influxdbreceiver.go new file mode 100644 index 0000000000..70b6225e4d --- /dev/null +++ b/internal/converter/internal/otelcolconvert/converter_influxdbreceiver.go @@ -0,0 +1,81 @@ +// converter_influxdbreceiver.go +package otelcolconvert + +import ( + "fmt" + + "github.com/grafana/alloy/internal/component/otelcol" + "github.com/grafana/alloy/internal/component/otelcol/receiver/influxdb" + "github.com/grafana/alloy/internal/converter/diag" + "github.com/grafana/alloy/internal/converter/internal/common" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/influxdbreceiver" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componentstatus" + "go.opentelemetry.io/collector/pipeline" +) + +// Assume State and other dependencies are defined elsewhere + +func init() { + // Register the influxdb receiver converter + converters = append(converters, influxdbReceiverConverter{}) +} + +// influxdbReceiverConverter implements the logic to convert influxdb configurations +type influxdbReceiverConverter struct{} + +// Factory returns the factory for the influxdb receiver +func (influxdbReceiverConverter) Factory() component.Factory { + return influxdbreceiver.NewFactory() +} + +// InputComponentName returns an empty string since no specific input component name is required +func (influxdbReceiverConverter) InputComponentName() string { + return "" +} + +// ConvertAndAppend converts the influxdb receiver configuration and appends it to the state +func (influxdbReceiverConverter) ConvertAndAppend( + state *State, + id componentstatus.InstanceID, + cfg component.Config, +) diag.Diagnostics { + var diags diag.Diagnostics + + // Generate a label for the converted component + label := state.AlloyComponentLabel() + + // Convert the config into Arguments format + args := toInfluxdbReceiver(state, id, cfg.(*influxdbreceiver.Config)) + + // // Create a block with the converted arguments + block := common.NewBlockWithOverride([]string{"otelcol", "receiver", "influxdb"}, label, args) + // Append the block to the state directly + state.Body().AppendBlock(block) + + // Add a diagnostic log entry for the conversion + diags.Add( + diag.SeverityLevelInfo, + fmt.Sprintf("Converted %s into %s", StringifyInstanceID(id), StringifyBlock(block)), + ) + + return diags +} + +// toInfluxdbReceiver converts the influxdbreceiver.Config to influxdb.Arguments +func toInfluxdbReceiver( + state *State, + id componentstatus.InstanceID, + cfg *influxdbreceiver.Config, +) *influxdb.Arguments { + metricsConsumers := ToTokenizedConsumers(state.Next(id, pipeline.SignalMetrics)) + + args := &influxdb.Arguments{ + HTTPServer: *toHTTPServerArguments(&cfg.ServerConfig), + DebugMetrics: common.DefaultValue[influxdb.Arguments]().DebugMetrics, + Output: &otelcol.ConsumerArguments{ + Metrics: metricsConsumers, + }, + } + return args +} diff --git a/internal/converter/internal/otelcolconvert/testdata/influxdb.alloy b/internal/converter/internal/otelcolconvert/testdata/influxdb.alloy new file mode 100644 index 0000000000..e8265fe1b7 --- /dev/null +++ b/internal/converter/internal/otelcolconvert/testdata/influxdb.alloy @@ -0,0 +1,11 @@ +otelcol.receiver.influxdb "default" { + output { + metrics = [otelcol.exporter.otlp.default.input] + } +} + +otelcol.exporter.otlp "default" { + client { + endpoint = "database:4317" + } +} diff --git a/internal/converter/internal/otelcolconvert/testdata/influxdb.yaml b/internal/converter/internal/otelcolconvert/testdata/influxdb.yaml new file mode 100644 index 0000000000..5449a33d21 --- /dev/null +++ b/internal/converter/internal/otelcolconvert/testdata/influxdb.yaml @@ -0,0 +1,13 @@ +receivers: + influxdb: + +exporters: + otlp: + endpoint: database:4317 + +service: + pipelines: + metrics: + receivers: [influxdb] + processors: [] + exporters: [otlp]