Skip to content

Commit

Permalink
Merge pull request #1011 from probe-lab/request-hook
Browse files Browse the repository at this point in the history
feat: add request callback config option
  • Loading branch information
guillaumemichel authored Jan 15, 2025
2 parents 1331ba7 + 70dbacb commit dbb8d9e
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 0 deletions.
3 changes: 3 additions & 0 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ type IpfsDHT struct {
// addrFilter is used to filter the addresses we put into the peer store.
// Mostly used to filter out localhost and local addresses.
addrFilter func([]ma.Multiaddr) []ma.Multiaddr

onRequestHook func(ctx context.Context, s network.Stream, req pb.Message)
}

// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
Expand Down Expand Up @@ -306,6 +308,7 @@ func makeDHT(h host.Host, cfg dhtcfg.Config) (*IpfsDHT, error) {
routingTablePeerFilter: cfg.RoutingTable.PeerFilter,
rtPeerDiversityFilter: cfg.RoutingTable.DiversityFilter,
addrFilter: cfg.AddressFilter,
onRequestHook: cfg.OnRequestHook,

fixLowPeersChan: make(chan struct{}, 1),

Expand Down
4 changes: 4 additions & 0 deletions dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
metrics.ReceivedBytes.M(int64(msgLen)),
)

if dht.onRequestHook != nil {
dht.onRequestHook(ctx, s, req)
}

handler := dht.handlerForMsgType(req.GetType())
if handler == nil {
stats.Record(ctx, metrics.ReceivedMessageErrors.M(1))
Expand Down
13 changes: 13 additions & 0 deletions dht_options.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dht

import (
"context"
"fmt"
"testing"
"time"
Expand All @@ -12,6 +13,7 @@ import (
"github.com/libp2p/go-libp2p-kbucket/peerdiversity"
record "github.com/libp2p/go-libp2p-record"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"

Expand Down Expand Up @@ -368,3 +370,14 @@ func WithCustomMessageSender(messageSenderBuilder func(h host.Host, protos []pro
return nil
}
}

// OnRequestHook registers a callback function that will be invoked for every
// incoming DHT protocol message.
// Note: Ensure that the callback executes efficiently, as it will block the
// entire message handler.
func OnRequestHook(f func(ctx context.Context, s network.Stream, req pb.Message)) Option {
return func(c *dhtcfg.Config) error {
c.OnRequestHook = f
return nil
}
}
3 changes: 3 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package config

import (
"context"
"fmt"
"time"

Expand All @@ -14,6 +15,7 @@ import (
"github.com/libp2p/go-libp2p-kbucket/peerdiversity"
record "github.com/libp2p/go-libp2p-record"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
ma "github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -63,6 +65,7 @@ type Config struct {

BootstrapPeers func() []peer.AddrInfo
AddressFilter func([]ma.Multiaddr) []ma.Multiaddr
OnRequestHook func(ctx context.Context, s network.Stream, req pb.Message)

// test specific Config options
DisableFixLowPeers bool
Expand Down

0 comments on commit dbb8d9e

Please sign in to comment.