Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(p2p/dht): Enable DHT server metrics #4071

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 133 additions & 0 deletions nodebuilder/p2p/dht_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package p2p

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

var (
// DHTMetrics contains all metrics for the DHT server
dhtMetrics = struct {
// General DHT metrics
PeersTotal *prometheus.GaugeVec
RequestsTotal *prometheus.CounterVec
RequestDuration *prometheus.HistogramVec

// Metrics for search operations
FindPeerTotal *prometheus.CounterVec
FindProvidersTotal *prometheus.CounterVec

// Metrics for storage operations
StoredRecordsTotal *prometheus.GaugeVec
StoreOperationsTotal *prometheus.CounterVec

// Metrics for network operations
InboundConnectionsTotal *prometheus.CounterVec
OutboundConnectionsTotal *prometheus.CounterVec

// Metrics for the routing table
RoutingTableSize *prometheus.GaugeVec
RoutingTableRefreshes *prometheus.CounterVec
}{
PeersTotal: promauto.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "celestia",
Subsystem: "dht",
Name: "peers_total",
Help: "The total number of peers in the DHT",
},
[]string{networkLabel, nodeTypeLabel},
),
RequestsTotal: promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: "celestia",
Subsystem: "dht",
Name: "requests_total",
Help: "The total number of DHT requests",
},
[]string{networkLabel, nodeTypeLabel, "type", "status"},
),
RequestDuration: promauto.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "celestia",
Subsystem: "dht",
Name: "request_duration_seconds",
Help: "Duration of DHT requests",
Buckets: prometheus.DefBuckets,
},
[]string{networkLabel, nodeTypeLabel, "type"},
),
FindPeerTotal: promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: "celestia",
Subsystem: "dht",
Name: "find_peer_total",
Help: "The total number of peer search operations",
},
[]string{networkLabel, nodeTypeLabel, "status"},
),
FindProvidersTotal: promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: "celestia",
Subsystem: "dht",
Name: "find_providers_total",
Help: "The total number of provider search operations",
},
[]string{networkLabel, nodeTypeLabel, "status"},
),
StoredRecordsTotal: promauto.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "celestia",
Subsystem: "dht",
Name: "stored_records_total",
Help: "The total number of stored records",
},
[]string{networkLabel, nodeTypeLabel},
),
StoreOperationsTotal: promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: "celestia",
Subsystem: "dht",
Name: "store_operations_total",
Help: "The total number of storage operations",
},
[]string{networkLabel, nodeTypeLabel, "status"},
),
InboundConnectionsTotal: promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: "celestia",
Subsystem: "dht",
Name: "inbound_connections_total",
Help: "The total number of inbound connections",
},
[]string{networkLabel, nodeTypeLabel},
),
OutboundConnectionsTotal: promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: "celestia",
Subsystem: "dht",
Name: "outbound_connections_total",
Help: "The total number of outbound connections",
},
[]string{networkLabel, nodeTypeLabel},
),
RoutingTableSize: promauto.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "celestia",
Subsystem: "dht",
Name: "routing_table_size",
Help: "The size of the routing table",
},
[]string{networkLabel, nodeTypeLabel},
),
RoutingTableRefreshes: promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: "celestia",
Subsystem: "dht",
Name: "routing_table_refreshes_total",
Help: "The total number of routing table refreshes",
},
[]string{networkLabel, nodeTypeLabel},
),
}
)
13 changes: 13 additions & 0 deletions share/shwap/p2p/discovery/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/prometheus/client_golang/prometheus"
)

const (
Expand All @@ -26,13 +27,25 @@ func NewDHT(
dataStore datastore.Batching,
mode dht.ModeOpt,
) (*dht.IpfsDHT, error) {
// Create metrics registry with our labels
reg := prometheus.NewRegistry()
labels := prometheus.Labels{
"network": prefix,
"node_type": mode.String(),
}
wrappedReg := prometheus.WrapRegistererWith(labels, reg)

opts := []dht.Option{
dht.BootstrapPeers(bootsrappers...),
dht.ProtocolPrefix(protocol.ID(fmt.Sprintf("/celestia/%s", prefix))),
dht.Datastore(dataStore),
dht.RoutingTableRefreshPeriod(defaultRoutingRefreshPeriod),
dht.Mode(mode),
dht.Validator(dht.DefaultValidator{}),
// Enable built-in DHT metrics
dht.EnabledMetrics(wrappedReg),
}

return dht.New(ctx, host, opts...)
}
я
85 changes: 85 additions & 0 deletions share/shwap/p2p/discovery/dht_wrapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package discovery

import (
"context"
"time"

dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
)

// MetricsDHT wraps DHT for collecting metrics
type MetricsDHT struct {
*dht.IpfsDHT
prefix string
mode string
}

// NewMetricsDHT creates a new DHT wrapper with metrics
func NewMetricsDHT(d *dht.IpfsDHT, prefix string, mode dht.ModeOpt) *MetricsDHT {
return &MetricsDHT{
IpfsDHT: d,
prefix: prefix,
mode: mode.String(),
}
}

// FindPeer wraps the original method for collecting metrics
func (m *MetricsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) {
ctx, done := trackDHTRequest(ctx, m.prefix, m.mode, "find_peer")
defer trackFindPeer(ctx, m.prefix, m.mode)(nil)

info, err := m.IpfsDHT.FindPeer(ctx, id)
done(err)
return info, err
}

// Provide wraps the original method for collecting metrics
func (m *MetricsDHT) Provide(ctx context.Context, key routing.Cid, announce bool) error {
ctx, done := trackDHTRequest(ctx, m.prefix, m.mode, "provide")
defer trackStoreOperation(ctx, m.prefix, m.mode)(nil)

err := m.IpfsDHT.Provide(ctx, key, announce)
done(err)
return err
}

// FindProvidersAsync wraps the original method for collecting metrics
func (m *MetricsDHT) FindProvidersAsync(ctx context.Context, key routing.Cid, count int) <-chan peer.AddrInfo {
ctx, done := trackDHTRequest(ctx, m.prefix, m.mode, "find_providers")
defer trackFindProviders(ctx, m.prefix, m.mode)(nil)

ch := m.IpfsDHT.FindProvidersAsync(ctx, key, count)
done(nil) // Here, we cannot track errors since the method is asynchronous
return ch
}

// Bootstrap wraps the original method for collecting metrics
func (m *MetricsDHT) Bootstrap(ctx context.Context) error {
ctx, done := trackDHTRequest(ctx, m.prefix, m.mode, "bootstrap")
defer trackRoutingTableRefresh(m.prefix, m.mode)

err := m.IpfsDHT.Bootstrap(ctx)
done(err)
return err
}

// PutValue wraps the original method for collecting metrics
func (m *MetricsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...routing.Option) error {
ctx, done := trackDHTRequest(ctx, m.prefix, m.mode, "put_value")
defer trackStoreOperation(ctx, m.prefix, m.mode)(nil)

err := m.IpfsDHT.PutValue(ctx, key, value, opts...)
done(err)
return err
}

// GetValue wraps the original method for collecting metrics
func (m *MetricsDHT) GetValue(ctx context.Context, key string, opts ...routing.Option) ([]byte, error) {
ctx, done := trackDHTRequest(ctx, m.prefix, m.mode, "get_value")

val, err := m.IpfsDHT.GetValue(ctx, key, opts...)
done(err)
return val, err
}