Skip to content

Commit

Permalink
Merge develop branch into main (#123)
Browse files Browse the repository at this point in the history
* Add cpu_used (all-cpu_idle) to CpustatCollector

* Update cc-metric-collector.init

* Allow selection of timestamp precision in HttpSink

* Add comment about precision requirement for cc-metric-store

* Fix for API changes in [email protected]

* Update requirements to latest version

* Read sensors through redfish

* Update golang toolchain to 1.21

* Remove stray error check

* Update main config in configuration.md

* Update Release action to use golang 1.22 stable release, no golang RPMs anymore

* Update runonce action to use golang 1.22 stable release, no golang RPMs anymore

* Update README.md

Use right JSON type in configuration

* Update sink's README

* Test whether ipmitool or ipmi-sensors can be executed without errors

* Little fixes to the prometheus sink (#115)

* Add uint64 to float64 cast option

* Add prometheus sink to the list of available sinks

* Add aggregated counters by gpu for nvlink errors

---------

Co-authored-by: Michael Schwarz <[email protected]>

* Ccmessage migration (#119)

* Add cpu_used (all-cpu_idle) to CpustatCollector

* Update cc-metric-collector.init

* Allow selection of timestamp precision in HttpSink

* Add comment about precision requirement for cc-metric-store

* Fix for API changes in [email protected]

* Update requirements to latest version

* Read sensors through redfish

* Update golang toolchain to 1.21

* Remove stray error check

* Update main config in configuration.md

* Update Release action to use golang 1.22 stable release, no golang RPMs anymore

* Update runonce action to use golang 1.22 stable release, no golang RPMs anymore

* Switch to CCMessage for all files.

---------

Co-authored-by: Holger Obermaier <[email protected]>
Co-authored-by: Holger Obermaier <[email protected]>

* Switch to ccmessage also for latest additions in nvidiaMetric

* New Message processor (#118)

* Add cpu_used (all-cpu_idle) to CpustatCollector

* Update cc-metric-collector.init

* Allow selection of timestamp precision in HttpSink

* Add comment about precision requirement for cc-metric-store

* Fix for API changes in [email protected]

* Update requirements to latest version

* Read sensors through redfish

* Update golang toolchain to 1.21

* Remove stray error check

* Update main config in configuration.md

* Update Release action to use golang 1.22 stable release, no golang RPMs anymore

* Update runonce action to use golang 1.22 stable release, no golang RPMs anymore

* New message processor to check whether a message should be dropped or manipulate it in flight

* Create a copy of message before manipulation

---------

Co-authored-by: Holger Obermaier <[email protected]>
Co-authored-by: Holger Obermaier <[email protected]>

* Update collector's Makefile and go.mod/sum files

* Use message processor in router, all sinks and all receivers

* Add support for credential file (NKEY) to NATS sink and receiver

* Fix JSON keys in message processor configuration

* Update docs for message processor, router and the default router config file

* Add link to expr syntax and fix regex matching docs

* Update sample collectors

* Minor style change in collector manager

* Some helpers for ccTopology

* LIKWID collector: write log owner change only once

* Fix for metrics without units and reduce debugging messages for messageProcessor

* Use shorted hostname for hostname added by router

* Define default port for NATS

* CPUstat collector: only add unit for applicable metrics

* Add precision option to all sinks using Influx's encoder

* Add message processor to all sink documentation

* Add units to documentation of cpustat collector

---------

Co-authored-by: Holger Obermaier <[email protected]>
Co-authored-by: Holger Obermaier <[email protected]>
Co-authored-by: oscarminus <[email protected]>
Co-authored-by: Michael Schwarz <[email protected]>
  • Loading branch information
5 people authored Dec 19, 2024
1 parent 21646e1 commit 7840de7
Show file tree
Hide file tree
Showing 74 changed files with 1,905 additions and 1,020 deletions.
1 change: 0 additions & 1 deletion .github/workflows/runonce.yml
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ jobs:
submodules: recursive
fetch-depth: 0
# Use official golang package

# See: https://github.com/marketplace/actions/setup-go-environment
- name: Setup Golang
uses: actions/setup-go@v4
Expand Down
10 changes: 5 additions & 5 deletions cc-metric-collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

mr "github.com/ClusterCockpit/cc-metric-collector/internal/metricRouter"
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
mct "github.com/ClusterCockpit/cc-metric-collector/pkg/multiChanTicker"
)

Expand Down Expand Up @@ -54,7 +54,7 @@ type RuntimeConfig struct {
ReceiveManager receivers.ReceiveManager
MultiChanTicker mct.MultiChanTicker

Channels []chan lp.CCMetric
Channels []chan lp.CCMessage
Sync sync.WaitGroup
}

Expand Down Expand Up @@ -242,7 +242,7 @@ func mainFunc() int {
}

// Connect metric router to sink manager
RouterToSinksChannel := make(chan lp.CCMetric, 200)
RouterToSinksChannel := make(chan lp.CCMessage, 200)
rcfg.SinkManager.AddInput(RouterToSinksChannel)
rcfg.MetricRouter.AddOutput(RouterToSinksChannel)

Expand All @@ -254,7 +254,7 @@ func mainFunc() int {
}

// Connect collector manager to metric router
CollectToRouterChannel := make(chan lp.CCMetric, 200)
CollectToRouterChannel := make(chan lp.CCMessage, 200)
rcfg.CollectManager.AddOutput(CollectToRouterChannel)
rcfg.MetricRouter.AddCollectorInput(CollectToRouterChannel)

Expand All @@ -267,7 +267,7 @@ func mainFunc() int {
}

// Connect receive manager to metric router
ReceiveToRouterChannel := make(chan lp.CCMetric, 200)
ReceiveToRouterChannel := make(chan lp.CCMessage, 200)
rcfg.ReceiveManager.AddOutput(ReceiveToRouterChannel)
rcfg.MetricRouter.AddReceiverInput(ReceiveToRouterChannel)
use_recv = true
Expand Down
4 changes: 2 additions & 2 deletions collectors/Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# LIKWID version
LIKWID_VERSION := 5.2.2
LIKWID_VERSION := 5.4.1
LIKWID_INSTALLED_FOLDER := $(shell dirname $$(which likwid-topology 2>/dev/null) 2>/dev/null)

LIKWID_FOLDER := $(CURDIR)/likwid
Expand All @@ -23,7 +23,7 @@ likwid:
mkdir --parents --verbose "$${BUILD_FOLDER}"
wget --output-document=- http://ftp.rrze.uni-erlangen.de/mirrors/likwid/likwid-$(LIKWID_VERSION).tar.gz |
tar --directory="$${BUILD_FOLDER}" --extract --gz
install -D --verbose --preserve-timestamps --mode=0644 --target-directory="$(LIKWID_FOLDER)" "$${BUILD_FOLDER}/likwid-$(LIKWID_VERSION)/src/includes"/likwid*.h "$${BUILD_FOLDER}/likwid-$(LIKWID_VERSION)/src/includes"/bstrlib.h
install -D --verbose --preserve-timestamps --mode=0644 --target-directory="$(LIKWID_FOLDER)" "$${BUILD_FOLDER}/likwid-$(LIKWID_VERSION)/src/includes"/likwid*.h
rm --recursive "$${BUILD_FOLDER}"
fi

Expand Down
6 changes: 3 additions & 3 deletions collectors/beegfsmetaMetric.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"time"

cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
)

const DEFAULT_BEEGFS_CMD = "beegfs-ctl"
Expand Down Expand Up @@ -110,7 +110,7 @@ func (m *BeegfsMetaCollector) Init(config json.RawMessage) error {
return nil
}

func (m *BeegfsMetaCollector) Read(interval time.Duration, output chan lp.CCMetric) {
func (m *BeegfsMetaCollector) Read(interval time.Duration, output chan lp.CCMessage) {
if !m.init {
return
}
Expand Down Expand Up @@ -216,7 +216,7 @@ func (m *BeegfsMetaCollector) Read(interval time.Duration, output chan lp.CCMetr

for key, data := range m.matches {
value, _ := strconv.ParseFloat(data, 32)
y, err := lp.New(key, m.tags, m.meta, map[string]interface{}{"value": value}, time.Now())
y, err := lp.NewMessage(key, m.tags, m.meta, map[string]interface{}{"value": value}, time.Now())
if err == nil {
output <- y
}
Expand Down
6 changes: 3 additions & 3 deletions collectors/beegfsstorageMetric.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"time"

cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
)

// Struct for the collector-specific JSON config
Expand Down Expand Up @@ -103,7 +103,7 @@ func (m *BeegfsStorageCollector) Init(config json.RawMessage) error {
return nil
}

func (m *BeegfsStorageCollector) Read(interval time.Duration, output chan lp.CCMetric) {
func (m *BeegfsStorageCollector) Read(interval time.Duration, output chan lp.CCMessage) {
if !m.init {
return
}
Expand Down Expand Up @@ -208,7 +208,7 @@ func (m *BeegfsStorageCollector) Read(interval time.Duration, output chan lp.CCM

for key, data := range m.matches {
value, _ := strconv.ParseFloat(data, 32)
y, err := lp.New(key, m.tags, m.meta, map[string]interface{}{"value": value}, time.Now())
y, err := lp.NewMessage(key, m.tags, m.meta, map[string]interface{}{"value": value}, time.Now())
if err == nil {
output <- y
}
Expand Down
8 changes: 4 additions & 4 deletions collectors/collectorManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"sync"
"time"

lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
mct "github.com/ClusterCockpit/cc-metric-collector/pkg/multiChanTicker"
)

Expand Down Expand Up @@ -47,7 +47,7 @@ var AvailableCollectors = map[string]MetricCollector{
type collectorManager struct {
collectors []MetricCollector // List of metric collectors to read in parallel
serial []MetricCollector // List of metric collectors to read serially
output chan lp.CCMetric // Output channels
output chan lp.CCMessage // Output channels
done chan bool // channel to finish / stop metric collector manager
ticker mct.MultiChanTicker // periodically ticking once each interval
duration time.Duration // duration (for metrics that measure over a given duration)
Expand All @@ -60,7 +60,7 @@ type collectorManager struct {
// Metric collector manager access functions
type CollectorManager interface {
Init(ticker mct.MultiChanTicker, duration time.Duration, wg *sync.WaitGroup, collectConfigFile string) error
AddOutput(output chan lp.CCMetric)
AddOutput(output chan lp.CCMessage)
Start()
Close()
}
Expand Down Expand Up @@ -187,7 +187,7 @@ func (cm *collectorManager) Start() {
}

// AddOutput adds the output channel to the metric collector manager
func (cm *collectorManager) AddOutput(output chan lp.CCMetric) {
func (cm *collectorManager) AddOutput(output chan lp.CCMessage) {
cm.output = output
}

Expand Down
8 changes: 4 additions & 4 deletions collectors/cpufreqCpuinfoMetric.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
"strings"
"time"

lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
)

// CPUFreqCollector
Expand Down Expand Up @@ -112,14 +112,14 @@ func (m *CPUFreqCpuInfoCollector) Init(config json.RawMessage) error {

// Check if at least one CPU with frequency information was detected
if len(m.topology) == 0 {
return fmt.Errorf("No CPU frequency info found in %s", cpuInfoFile)
return fmt.Errorf("no CPU frequency info found in %s", cpuInfoFile)
}

m.init = true
return nil
}

func (m *CPUFreqCpuInfoCollector) Read(interval time.Duration, output chan lp.CCMetric) {
func (m *CPUFreqCpuInfoCollector) Read(interval time.Duration, output chan lp.CCMessage) {
// Check if already initialized
if !m.init {
return
Expand Down Expand Up @@ -154,7 +154,7 @@ func (m *CPUFreqCpuInfoCollector) Read(interval time.Duration, output chan lp.CC
fmt.Sprintf("Read(): Failed to convert cpu MHz '%s' to float64: %v", lineSplit[1], err))
return
}
if y, err := lp.New("cpufreq", t.tagSet, m.meta, map[string]interface{}{"value": value}, now); err == nil {
if y, err := lp.NewMessage("cpufreq", t.tagSet, m.meta, map[string]interface{}{"value": value}, now); err == nil {
output <- y
}
}
Expand Down
6 changes: 3 additions & 3 deletions collectors/cpufreqMetric.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"time"

cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
"github.com/ClusterCockpit/cc-metric-collector/pkg/ccTopology"
"golang.org/x/sys/unix"
)
Expand Down Expand Up @@ -91,7 +91,7 @@ func (m *CPUFreqCollector) Init(config json.RawMessage) error {
return nil
}

func (m *CPUFreqCollector) Read(interval time.Duration, output chan lp.CCMetric) {
func (m *CPUFreqCollector) Read(interval time.Duration, output chan lp.CCMessage) {
// Check if already initialized
if !m.init {
return
Expand All @@ -117,7 +117,7 @@ func (m *CPUFreqCollector) Read(interval time.Duration, output chan lp.CCMetric)
continue
}

if y, err := lp.New("cpufreq", t.tagSet, m.meta, map[string]interface{}{"value": cpuFreq}, now); err == nil {
if y, err := lp.NewMessage("cpufreq", t.tagSet, m.meta, map[string]interface{}{"value": cpuFreq}, now); err == nil {
output <- y
}
}
Expand Down
16 changes: 9 additions & 7 deletions collectors/cpustatMetric.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"strings"
"time"

lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
sysconf "github.com/tklauser/go-sysconf"
)

Expand All @@ -34,7 +34,7 @@ func (m *CpustatCollector) Init(config json.RawMessage) error {
m.name = "CpustatCollector"
m.setup()
m.parallel = true
m.meta = map[string]string{"source": m.name, "group": "CPU", "unit": "Percent"}
m.meta = map[string]string{"source": m.name, "group": "CPU"}
m.nodetags = map[string]string{"type": "node"}
if len(config) > 0 {
err := json.Unmarshal(config, &m.config)
Expand Down Expand Up @@ -105,7 +105,7 @@ func (m *CpustatCollector) Init(config json.RawMessage) error {
return nil
}

func (m *CpustatCollector) parseStatLine(linefields []string, tags map[string]string, output chan lp.CCMetric, now time.Time, tsdelta time.Duration) {
func (m *CpustatCollector) parseStatLine(linefields []string, tags map[string]string, output chan lp.CCMessage, now time.Time, tsdelta time.Duration) {
values := make(map[string]float64)
clktck, _ := sysconf.Sysconf(sysconf.SC_CLK_TCK)
for match, index := range m.matches {
Expand All @@ -122,21 +122,23 @@ func (m *CpustatCollector) parseStatLine(linefields []string, tags map[string]st
sum := float64(0)
for name, value := range values {
sum += value
y, err := lp.New(name, tags, m.meta, map[string]interface{}{"value": value * 100}, now)
y, err := lp.NewMessage(name, tags, m.meta, map[string]interface{}{"value": value * 100}, now)
if err == nil {
y.AddTag("unit", "Percent")
output <- y
}
}
if v, ok := values["cpu_idle"]; ok {
sum -= v
y, err := lp.New("cpu_used", tags, m.meta, map[string]interface{}{"value": sum * 100}, now)
y, err := lp.NewMessage("cpu_used", tags, m.meta, map[string]interface{}{"value": sum * 100}, now)
if err == nil {
y.AddTag("unit", "Percent")
output <- y
}
}
}

func (m *CpustatCollector) Read(interval time.Duration, output chan lp.CCMetric) {
func (m *CpustatCollector) Read(interval time.Duration, output chan lp.CCMessage) {
if !m.init {
return
}
Expand All @@ -162,7 +164,7 @@ func (m *CpustatCollector) Read(interval time.Duration, output chan lp.CCMetric)
}
}

num_cpus_metric, err := lp.New("num_cpus",
num_cpus_metric, err := lp.NewMessage("num_cpus",
m.nodetags,
m.meta,
map[string]interface{}{"value": int(num_cpus)},
Expand Down
23 changes: 12 additions & 11 deletions collectors/cpustatMetric.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ The `cpustat` collector reads data from `/proc/stat` and outputs a handful **nod

Metrics:

* `cpu_user`
* `cpu_nice`
* `cpu_system`
* `cpu_idle`
* `cpu_iowait`
* `cpu_irq`
* `cpu_softirq`
* `cpu_steal`
* `cpu_guest`
* `cpu_guest_nice`
* `cpu_used` = `cpu_* - cpu_idle`
* `cpu_user` with `unit=Percent`
* `cpu_nice` with `unit=Percent`
* `cpu_system` with `unit=Percent`
* `cpu_idle` with `unit=Percent`
* `cpu_iowait` with `unit=Percent`
* `cpu_irq` with `unit=Percent`
* `cpu_softirq` with `unit=Percent`
* `cpu_steal` with `unit=Percent`
* `cpu_guest` with `unit=Percent`
* `cpu_guest_nice` with `unit=Percent`
* `cpu_used` = `cpu_* - cpu_idle` with `unit=Percent`
* `num_cpus`
4 changes: 2 additions & 2 deletions collectors/customCmdMetric.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"strings"
"time"

lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
influx "github.com/influxdata/line-protocol"
)

Expand Down Expand Up @@ -75,7 +75,7 @@ var DefaultTime = func() time.Time {
return time.Unix(42, 0)
}

func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMetric) {
func (m *CustomCmdCollector) Read(interval time.Duration, output chan lp.CCMessage) {
if !m.init {
return
}
Expand Down
10 changes: 5 additions & 5 deletions collectors/diskstatMetric.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"time"

cclog "github.com/ClusterCockpit/cc-metric-collector/pkg/ccLogger"
lp "github.com/ClusterCockpit/cc-metric-collector/pkg/ccMetric"
lp "github.com/ClusterCockpit/cc-energy-manager/pkg/cc-message"
)

// "log"
Expand Down Expand Up @@ -48,7 +48,7 @@ func (m *DiskstatCollector) Init(config json.RawMessage) error {
return nil
}

func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMetric) {
func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMessage) {
if !m.init {
return
}
Expand Down Expand Up @@ -92,13 +92,13 @@ func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMetric
}
tags := map[string]string{"type": "node", "device": linefields[0]}
total := (stat.Blocks * uint64(stat.Bsize)) / uint64(1000000000)
y, err := lp.New("disk_total", tags, m.meta, map[string]interface{}{"value": total}, time.Now())
y, err := lp.NewMessage("disk_total", tags, m.meta, map[string]interface{}{"value": total}, time.Now())
if err == nil {
y.AddMeta("unit", "GBytes")
output <- y
}
free := (stat.Bfree * uint64(stat.Bsize)) / uint64(1000000000)
y, err = lp.New("disk_free", tags, m.meta, map[string]interface{}{"value": free}, time.Now())
y, err = lp.NewMessage("disk_free", tags, m.meta, map[string]interface{}{"value": free}, time.Now())
if err == nil {
y.AddMeta("unit", "GBytes")
output <- y
Expand All @@ -110,7 +110,7 @@ func (m *DiskstatCollector) Read(interval time.Duration, output chan lp.CCMetric
}
}
}
y, err := lp.New("part_max_used", map[string]string{"type": "node"}, m.meta, map[string]interface{}{"value": int(part_max_used)}, time.Now())
y, err := lp.NewMessage("part_max_used", map[string]string{"type": "node"}, m.meta, map[string]interface{}{"value": int(part_max_used)}, time.Now())
if err == nil {
y.AddMeta("unit", "percent")
output <- y
Expand Down
Loading

0 comments on commit 7840de7

Please sign in to comment.