Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RoutingDHT that implements the routing.Routing interface #947

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion v2/internal/coord/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func (c *Coordinator) QueryMessage(ctx context.Context, msg *pb.Message, fn coor
ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.QueryMessage")
defer span.End()
if msg == nil {
return coordt.QueryStats{}, fmt.Errorf("no message supplied for query")
return nil, coordt.QueryStats{}, fmt.Errorf("no message supplied for query")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this change? Was it not building before? Or is it a merge conflict?

}
c.cfg.Logger.Debug("starting query with message", tele.LogAttrKey(msg.Target()), slog.String("type", msg.Type.String()))

Expand Down
4 changes: 2 additions & 2 deletions v2/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestRTAdditionOnSuccessfulQuery(t *testing.T) {
require.ErrorIs(t, err, coordt.ErrNodeNotFound)

// // but when d3 queries d2, d1 and d3 discover each other
_, _ = d3.FindPeer(ctx, "something")
_, _ = NewRouting(d3).FindPeer(ctx, "something")
// ignore the error

// d3 should update its routing table to include d1 during the query
Expand Down Expand Up @@ -74,7 +74,7 @@ func TestRTEvictionOnFailedQuery(t *testing.T) {
require.NoError(t, err)

// failed queries should remove the queried peers from the routing table
_, _ = d1.FindPeer(ctx, "test")
_, _ = NewRouting(d1).FindPeer(ctx, "test")

// d1 should update its routing table to remove d2 because of the failure
_, err = top.ExpectRoutingRemoved(ctx, d1, d2.host.ID())
Expand Down
42 changes: 29 additions & 13 deletions v2/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,25 @@ import (
"github.com/libp2p/go-libp2p-kad-dht/v2/pb"
)

var _ routing.Routing = (*DHT)(nil)
// RoutingDHT is a wrapper around the [DHT] struct that implements the
// [routing.Routing] interface. As people have raised concerns about the
// interface, we decided to not "pollute" the DHTs public API surface with
// interface methods that we can already foresee will eventually change.
// Use the [NewRouting] convenience method to create a new RoutingDHT.
type RoutingDHT struct {
*DHT // the wrapped DHT
}

var _ routing.Routing = (*RoutingDHT)(nil)

// NewRouting wraps the given [DHT] in a [RoutingDHT] that implements the
// [routing.Routing] interface. See [RoutingDHT]'s documentation for more
// information.
func NewRouting(d *DHT) *RoutingDHT {
return &RoutingDHT{DHT: d}
}

func (d *DHT) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) {
func (d *RoutingDHT) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) {
ctx, span := d.tele.Tracer.Start(ctx, "DHT.FindPeer")
defer span.End()

Expand Down Expand Up @@ -64,7 +80,7 @@ func (d *DHT) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) {
return d.host.Peerstore().PeerInfo(foundPeer), nil
}

func (d *DHT) Provide(ctx context.Context, c cid.Cid, brdcst bool) error {
func (d *RoutingDHT) Provide(ctx context.Context, c cid.Cid, brdcst bool) error {
ctx, span := d.tele.Tracer.Start(ctx, "DHT.Provide", otel.WithAttributes(attribute.String("cid", c.String())))
defer span.End()

Expand Down Expand Up @@ -109,13 +125,13 @@ func (d *DHT) Provide(ctx context.Context, c cid.Cid, brdcst bool) error {
return d.kad.BroadcastRecord(ctx, msg)
}

func (d *DHT) FindProvidersAsync(ctx context.Context, c cid.Cid, count int) <-chan peer.AddrInfo {
func (d *RoutingDHT) FindProvidersAsync(ctx context.Context, c cid.Cid, count int) <-chan peer.AddrInfo {
peerOut := make(chan peer.AddrInfo)
go d.findProvidersAsyncRoutine(ctx, c, count, peerOut)
return peerOut
}

func (d *DHT) findProvidersAsyncRoutine(ctx context.Context, c cid.Cid, count int, out chan<- peer.AddrInfo) {
func (d *RoutingDHT) findProvidersAsyncRoutine(ctx context.Context, c cid.Cid, count int, out chan<- peer.AddrInfo) {
_, span := d.tele.Tracer.Start(ctx, "DHT.findProvidersAsyncRoutine", otel.WithAttributes(attribute.String("cid", c.String()), attribute.Int("count", count)))
defer span.End()

Expand Down Expand Up @@ -216,7 +232,7 @@ func (d *DHT) findProvidersAsyncRoutine(ctx context.Context, c cid.Cid, count in
// format `/$namespace/$binary_id`. Namespace examples are `pk` or `ipns`. To
// identify the closest peers to keyStr, that complete string will be SHA256
// hashed.
func (d *DHT) PutValue(ctx context.Context, keyStr string, value []byte, opts ...routing.Option) error {
func (d *RoutingDHT) PutValue(ctx context.Context, keyStr string, value []byte, opts ...routing.Option) error {
ctx, span := d.tele.Tracer.Start(ctx, "DHT.PutValue")
defer span.End()

Expand Down Expand Up @@ -255,7 +271,7 @@ func (d *DHT) PutValue(ctx context.Context, keyStr string, value []byte, opts ..

// putValueLocal stores a value in the local datastore without reaching out to
// the network.
func (d *DHT) putValueLocal(ctx context.Context, key string, value []byte) error {
func (d *RoutingDHT) putValueLocal(ctx context.Context, key string, value []byte) error {
ctx, span := d.tele.Tracer.Start(ctx, "DHT.PutValueLocal")
defer span.End()

Expand All @@ -280,7 +296,7 @@ func (d *DHT) putValueLocal(ctx context.Context, key string, value []byte) error
return nil
}

func (d *DHT) GetValue(ctx context.Context, key string, opts ...routing.Option) ([]byte, error) {
func (d *RoutingDHT) GetValue(ctx context.Context, key string, opts ...routing.Option) ([]byte, error) {
ctx, span := d.tele.Tracer.Start(ctx, "DHT.GetValue")
defer span.End()

Expand All @@ -307,7 +323,7 @@ func (d *DHT) GetValue(ctx context.Context, key string, opts ...routing.Option)

// SearchValue will search in the DHT for keyStr. keyStr must have the form
// `/$namespace/$binary_id`
func (d *DHT) SearchValue(ctx context.Context, keyStr string, options ...routing.Option) (<-chan []byte, error) {
func (d *RoutingDHT) SearchValue(ctx context.Context, keyStr string, options ...routing.Option) (<-chan []byte, error) {
_, span := d.tele.Tracer.Start(ctx, "DHT.SearchValue")
defer span.End()

Expand Down Expand Up @@ -363,7 +379,7 @@ func (d *DHT) SearchValue(ctx context.Context, keyStr string, options ...routing
return out, nil
}

func (d *DHT) searchValueRoutine(ctx context.Context, backend Backend, ns string, path string, ropt *routing.Options, out chan<- []byte) {
func (d *RoutingDHT) searchValueRoutine(ctx context.Context, backend Backend, ns string, path string, ropt *routing.Options, out chan<- []byte) {
_, span := d.tele.Tracer.Start(ctx, "DHT.searchValueRoutine")
defer span.End()
defer close(out)
Expand Down Expand Up @@ -438,7 +454,7 @@ func (d *DHT) searchValueRoutine(ctx context.Context, backend Backend, ns string

_, _, err := d.kad.QueryMessage(ctx, req, fn, d.cfg.BucketSize)
if err != nil {
d.logErr(err, "Search value query failed")
d.warnErr(err, "Search value query failed")
return
}

Expand Down Expand Up @@ -487,7 +503,7 @@ func RoutingQuorum(n int) routing.Option {

// getQuorum extracts the quorum value from the given routing options and
// returns [Config.DefaultQuorum] if no quorum value is present.
func (d *DHT) getQuorum(opts *routing.Options) int {
func (d *RoutingDHT) getQuorum(opts *routing.Options) int {
quorum, ok := opts.Other[quorumOptionKey{}].(int)
if !ok {
quorum = d.cfg.Query.DefaultQuorum
Expand All @@ -496,7 +512,7 @@ func (d *DHT) getQuorum(opts *routing.Options) int {
return quorum
}

func (d *DHT) Bootstrap(ctx context.Context) error {
func (d *RoutingDHT) Bootstrap(ctx context.Context) error {
ctx, span := d.tele.Tracer.Start(ctx, "DHT.Bootstrap")
defer span.End()
d.log.Info("Starting bootstrap")
Expand Down
Loading