From 68448df63a0b6629c4ca22ac70477074dc687a71 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Wed, 20 May 2020 18:50:09 +0530 Subject: [PATCH] persist Routing Table Snapshots --- dht.go | 13 + dht_options.go | 31 +++ dht_test.go | 53 ++++ go.mod | 3 +- go.sum | 4 + pb/dht.pb.go | 580 ++++++++++++++++++++++++++++++++++++--- pb/dht.proto | 23 ++ pb/message.go | 18 ++ persist/interfaces.go | 20 ++ persist/snapshot.go | 123 +++++++++ persist/snapshot_test.go | 87 ++++++ 11 files changed, 923 insertions(+), 32 deletions(-) create mode 100644 persist/interfaces.go create mode 100644 persist/snapshot.go create mode 100644 persist/snapshot_test.go diff --git a/dht.go b/dht.go index c72098cf8..0f3682087 100644 --- a/dht.go +++ b/dht.go @@ -30,6 +30,7 @@ import ( logging "github.com/ipfs/go-log" "github.com/jbenet/goprocess" goprocessctx "github.com/jbenet/goprocess/context" + periodicproc "github.com/jbenet/goprocess/periodic" "github.com/multiformats/go-base32" ma "github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multihash" @@ -180,6 +181,18 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error) } } + // schedule periodic snapshots + snapshotter := cfg.persist.snapshotter + snapshotInterval := cfg.persist.snapshotInterval + sproc := periodicproc.Tick(snapshotInterval, func(proc goprocess.Process) { + logger.Debug("persisting Routing Table snapshot") + err := snapshotter.Store(h, dht.routingTable) + if err != nil { + logger.Errorw("failed to persist Routing Table snapshot", "err", err) + } + }) + dht.proc.AddChild(sproc) + // register for event bus and network notifications sn, err := newSubscriberNotifiee(dht) if err != nil { diff --git a/dht_options.go b/dht_options.go index 783caafc4..7b45d4db0 100644 --- a/dht_options.go +++ b/dht_options.go @@ -4,6 +4,8 @@ import ( "fmt" "time" + "github.com/libp2p/go-libp2p-kad-dht/persist" + ds "github.com/ipfs/go-datastore" dssync "github.com/ipfs/go-datastore/sync" "github.com/ipfs/go-ipns" @@ -59,6 +61,11 @@ type config struct { peerFilter RouteTableFilterFunc } + persist struct { + snapshotter persist.Snapshotter + snapshotInterval time.Duration + } + // set to true if we're operating in v1 dht compatible mode v1CompatibleMode bool bootstrapPeers []peer.AddrInfo @@ -93,6 +100,15 @@ func (c *config) applyFallbacks(h host.Host) error { return fmt.Errorf("the default validator was changed without being marked as changed") } } + + if c.persist.snapshotter == nil { + s, err := persist.NewDatastoreSnapshotter(c.datastore, persist.DefaultSnapshotNS) + if err != nil { + return fmt.Errorf("failed to create snapshotter %w", err) + } + c.persist.snapshotter = s + } + return nil } @@ -124,6 +140,8 @@ var defaults = func(o *config) error { o.v1CompatibleMode = true + o.persist.snapshotInterval = 5 * time.Minute + return nil } @@ -408,3 +426,16 @@ func BootstrapPeers(addrs ...ma.Multiaddr) Option { return nil } } + +// Snapshotter configures the snapshotter and the interval to use to periodically +// persist the Routing Table. See the documentation for the `persist.Snapshotter` interface +// for more details. +// If you configure this, please ensure that you configure the DHT with a persistent Datastore +// to ensure that the routing table snapshots are persistent across DHT restarts. +func Snapshotter(s persist.Snapshotter, interval time.Duration) Option { + return func(c *config) error { + c.persist.snapshotter = s + c.persist.snapshotInterval = interval + return nil + } +} diff --git a/dht_test.go b/dht_test.go index 0fa80164d..afa3957e4 100644 --- a/dht_test.go +++ b/dht_test.go @@ -21,12 +21,15 @@ import ( test "github.com/libp2p/go-libp2p-kad-dht/internal/testing" pb "github.com/libp2p/go-libp2p-kad-dht/pb" + "github.com/libp2p/go-libp2p-kad-dht/persist" kb "github.com/libp2p/go-libp2p-kbucket" record "github.com/libp2p/go-libp2p-record" swarmt "github.com/libp2p/go-libp2p-swarm/testing" bhost "github.com/libp2p/go-libp2p/p2p/host/basic" "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" detectrace "github.com/ipfs/go-detect-race" u "github.com/ipfs/go-ipfs-util" ma "github.com/multiformats/go-multiaddr" @@ -387,6 +390,56 @@ func TestValueSetInvalid(t *testing.T) { testSetGet("valid", true, "newer", nil) } +func TestRoutingTableSnapshot(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + sdstore := dssync.MutexWrap(datastore.NewMapDatastore()) + snapShotter, err := persist.NewDatastoreSnapshotter(sdstore, "rand") + require.NoError(t, err) + snapShotterOpt := Snapshotter(snapShotter, 100*time.Millisecond) + + // start 4 other dht's we can connect to & add to our RT + dhts := setupDHTS(t, ctx, 4) + defer func() { + for i := 0; i < 4; i++ { + dhts[i].Close() + defer dhts[i].host.Close() + } + }() + + // connect them with each other + connect(t, ctx, dhts[0], dhts[1]) + connect(t, ctx, dhts[1], dhts[2]) + connect(t, ctx, dhts[2], dhts[3]) + + // create dht with snapshotter & connect it to one of the above dht's + selfDht := setupDHT(ctx, t, false, snapShotterOpt) + require.True(t, selfDht.routingTable.Size() == 0) + defer selfDht.Close() + defer selfDht.host.Close() + + connect(t, ctx, selfDht, dhts[0]) + + for selfDht.routingTable.Size() != 4 { + <-selfDht.ForceRefresh() + } + + // assert snapshot & close dht + time.Sleep(500 * time.Millisecond) // wait for one snapshot + snapshotPeers, err := snapShotter.Load() + require.Len(t, snapshotPeers, 4) + var peerIds []peer.ID + for i := range snapshotPeers { + peerIds = append(peerIds, snapshotPeers[i].ID) + } + require.Len(t, peerIds, 4) + + require.NoError(t, err) + for _, d := range dhts { + require.Contains(t, peerIds, d.self) + } +} + func TestContextShutDown(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/go.mod b/go.mod index f7e2fb9f1..82720fbdf 100644 --- a/go.mod +++ b/go.mod @@ -17,8 +17,9 @@ require ( github.com/jbenet/goprocess v0.1.4 github.com/libp2p/go-eventbus v0.1.0 github.com/libp2p/go-libp2p v0.8.2 + github.com/libp2p/go-libp2p-blankhost v0.1.4 github.com/libp2p/go-libp2p-core v0.5.4 - github.com/libp2p/go-libp2p-kbucket v0.4.2 + github.com/libp2p/go-libp2p-kbucket v0.4.3-0.20200515080035-b37bee77ff8d github.com/libp2p/go-libp2p-peerstore v0.2.4 github.com/libp2p/go-libp2p-record v0.1.2 github.com/libp2p/go-libp2p-routing-helpers v0.2.3 diff --git a/go.sum b/go.sum index e5756a887..44ea5c442 100644 --- a/go.sum +++ b/go.sum @@ -195,6 +195,8 @@ github.com/libp2p/go-libp2p-discovery v0.3.0 h1:+JnYBRLzZQtRq0mK3xhyjBwHytLmJXMT github.com/libp2p/go-libp2p-discovery v0.3.0/go.mod h1:o03drFnz9BVAZdzC/QUQ+NeQOu38Fu7LJGEOK2gQltw= github.com/libp2p/go-libp2p-kbucket v0.4.2 h1:wg+VPpCtY61bCasGRexCuXOmEmdKjN+k1w+JtTwu9gA= github.com/libp2p/go-libp2p-kbucket v0.4.2/go.mod h1:7sCeZx2GkNK1S6lQnGUW5JYZCFPnXzAZCCBBS70lytY= +github.com/libp2p/go-libp2p-kbucket v0.4.3-0.20200515080035-b37bee77ff8d h1:tLb9EZQonRbrMI3lo7/0LkhV0tiNIpddEK0hAk70dC4= +github.com/libp2p/go-libp2p-kbucket v0.4.3-0.20200515080035-b37bee77ff8d/go.mod h1:7sCeZx2GkNK1S6lQnGUW5JYZCFPnXzAZCCBBS70lytY= github.com/libp2p/go-libp2p-loggables v0.1.0 h1:h3w8QFfCt2UJl/0/NW4K829HX/0S4KD31PQ7m8UXXO8= github.com/libp2p/go-libp2p-loggables v0.1.0/go.mod h1:EyumB2Y6PrYjr55Q3/tiJ/o3xoDasoRYM7nOzEpoa90= github.com/libp2p/go-libp2p-mplex v0.2.0/go.mod h1:Ejl9IyjvXJ0T9iqUTE1jpYATQ9NM3g+OtR+EMMODbKo= @@ -266,6 +268,7 @@ github.com/libp2p/go-netroute v0.1.2 h1:UHhB35chwgvcRI392znJA3RCBtZ3MpE3ahNCN5MR github.com/libp2p/go-netroute v0.1.2/go.mod h1:jZLDV+1PE8y5XxBySEBgbuVAXbhtuHSdmLPL2n9MKbk= github.com/libp2p/go-openssl v0.0.2/go.mod h1:v8Zw2ijCSWBQi8Pq5GAixw6DbFfa9u6VIYDXnvOXkc0= github.com/libp2p/go-openssl v0.0.3/go.mod h1:unDrJpgy3oFr+rqXsarWifmJuNnJR4chtO1HmaZjggc= +github.com/libp2p/go-openssl v0.0.4 h1:d27YZvLoTyMhIN4njrkr8zMDOM4lfpHIp6A+TK9fovg= github.com/libp2p/go-openssl v0.0.4/go.mod h1:unDrJpgy3oFr+rqXsarWifmJuNnJR4chtO1HmaZjggc= github.com/libp2p/go-reuseport v0.0.1 h1:7PhkfH73VXfPJYKQ6JwS5I/eVcoyYi9IMNGc6FWpFLw= github.com/libp2p/go-reuseport v0.0.1/go.mod h1:jn6RmB1ufnQwl0Q1f+YxAj8isJgDCQzaaxIFYDhcYEA= @@ -375,6 +378,7 @@ github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/smola/gocompat v0.2.0/go.mod h1:1B0MlxbmoZNo3h8guHp8HztB3BSYR5itql9qtVc0ypY= github.com/spacemonkeygo/openssl v0.0.0-20181017203307-c2dcc5cca94a/go.mod h1:7AyxJNCJ7SBZ1MfVQCWD6Uqo2oubI2Eq2y2eqf+A5r0= +github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 h1:RC6RW7j+1+HkWaX/Yh71Ee5ZHaHYt7ZP4sQgUrm6cDU= github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572/go.mod h1:w0SWMsp6j9O/dk4/ZpIhL+3CkG8ofA2vuv7k+ltqUMc= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= diff --git a/pb/dht.pb.go b/pb/dht.pb.go index 51762c1c0..ab4663f56 100644 --- a/pb/dht.pb.go +++ b/pb/dht.pb.go @@ -254,47 +254,179 @@ func (m *Message_Peer) GetConnection() Message_ConnectionType { return Message_NOT_CONNECTED } +// Encapsulates a routing table snapshot for persistence. Not to be transmitted over the wire. +type RoutingTableSnapshot struct { + // The peers that were members of the routing table. + Peers []*RoutingTableSnapshot_Peer `protobuf:"bytes,1,rep,name=peers,proto3" json:"peers,omitempty"` + // The timestamp when this snapshot was taken. + TimestampNs int64 `protobuf:"varint,2,opt,name=timestampNs,proto3" json:"timestampNs,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *RoutingTableSnapshot) Reset() { *m = RoutingTableSnapshot{} } +func (m *RoutingTableSnapshot) String() string { return proto.CompactTextString(m) } +func (*RoutingTableSnapshot) ProtoMessage() {} +func (*RoutingTableSnapshot) Descriptor() ([]byte, []int) { + return fileDescriptor_616a434b24c97ff4, []int{1} +} +func (m *RoutingTableSnapshot) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *RoutingTableSnapshot) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_RoutingTableSnapshot.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *RoutingTableSnapshot) XXX_Merge(src proto.Message) { + xxx_messageInfo_RoutingTableSnapshot.Merge(m, src) +} +func (m *RoutingTableSnapshot) XXX_Size() int { + return m.Size() +} +func (m *RoutingTableSnapshot) XXX_DiscardUnknown() { + xxx_messageInfo_RoutingTableSnapshot.DiscardUnknown(m) +} + +var xxx_messageInfo_RoutingTableSnapshot proto.InternalMessageInfo + +func (m *RoutingTableSnapshot) GetPeers() []*RoutingTableSnapshot_Peer { + if m != nil { + return m.Peers + } + return nil +} + +func (m *RoutingTableSnapshot) GetTimestampNs() int64 { + if m != nil { + return m.TimestampNs + } + return 0 +} + +type RoutingTableSnapshot_Peer struct { + // ID of a given peer. + Id []byte `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + // multiaddrs for a given peer + Addrs [][]byte `protobuf:"bytes,2,rep,name=addrs,proto3" json:"addrs,omitempty"` + // timestamp for when the peer was added to the Routing Table. + // Unix epoch nano seconds. + AddedAtNs int64 `protobuf:"varint,3,opt,name=addedAtNs,proto3" json:"addedAtNs,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *RoutingTableSnapshot_Peer) Reset() { *m = RoutingTableSnapshot_Peer{} } +func (m *RoutingTableSnapshot_Peer) String() string { return proto.CompactTextString(m) } +func (*RoutingTableSnapshot_Peer) ProtoMessage() {} +func (*RoutingTableSnapshot_Peer) Descriptor() ([]byte, []int) { + return fileDescriptor_616a434b24c97ff4, []int{1, 0} +} +func (m *RoutingTableSnapshot_Peer) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *RoutingTableSnapshot_Peer) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_RoutingTableSnapshot_Peer.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *RoutingTableSnapshot_Peer) XXX_Merge(src proto.Message) { + xxx_messageInfo_RoutingTableSnapshot_Peer.Merge(m, src) +} +func (m *RoutingTableSnapshot_Peer) XXX_Size() int { + return m.Size() +} +func (m *RoutingTableSnapshot_Peer) XXX_DiscardUnknown() { + xxx_messageInfo_RoutingTableSnapshot_Peer.DiscardUnknown(m) +} + +var xxx_messageInfo_RoutingTableSnapshot_Peer proto.InternalMessageInfo + +func (m *RoutingTableSnapshot_Peer) GetId() []byte { + if m != nil { + return m.Id + } + return nil +} + +func (m *RoutingTableSnapshot_Peer) GetAddrs() [][]byte { + if m != nil { + return m.Addrs + } + return nil +} + +func (m *RoutingTableSnapshot_Peer) GetAddedAtNs() int64 { + if m != nil { + return m.AddedAtNs + } + return 0 +} + func init() { proto.RegisterEnum("dht.pb.Message_MessageType", Message_MessageType_name, Message_MessageType_value) proto.RegisterEnum("dht.pb.Message_ConnectionType", Message_ConnectionType_name, Message_ConnectionType_value) proto.RegisterType((*Message)(nil), "dht.pb.Message") proto.RegisterType((*Message_Peer)(nil), "dht.pb.Message.Peer") + proto.RegisterType((*RoutingTableSnapshot)(nil), "dht.pb.RoutingTableSnapshot") + proto.RegisterType((*RoutingTableSnapshot_Peer)(nil), "dht.pb.RoutingTableSnapshot.Peer") } func init() { proto.RegisterFile("dht.proto", fileDescriptor_616a434b24c97ff4) } var fileDescriptor_616a434b24c97ff4 = []byte{ - // 469 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x92, 0xb1, 0x6f, 0x9b, 0x40, - 0x18, 0xc5, 0x73, 0x80, 0xdd, 0xf8, 0x03, 0x3b, 0xe4, 0x94, 0x01, 0xb9, 0x92, 0x83, 0x3c, 0xd1, - 0xc1, 0x20, 0xd1, 0xb5, 0xaa, 0x6a, 0x03, 0x8d, 0x2c, 0xa5, 0xd8, 0xba, 0x38, 0xe9, 0x68, 0x19, - 0xb8, 0x12, 0x54, 0xd7, 0x87, 0x00, 0xa7, 0xf2, 0xd6, 0x3f, 0x2f, 0x63, 0xe7, 0x0e, 0x51, 0xe5, - 0xa9, 0x7f, 0x46, 0xc5, 0x11, 0x5a, 0xec, 0x25, 0x13, 0xef, 0x7d, 0xf7, 0x7e, 0xe2, 0xdd, 0xa7, - 0x83, 0x4e, 0x74, 0x5f, 0x98, 0x69, 0xc6, 0x0a, 0x86, 0xdb, 0x5c, 0x06, 0x7d, 0x3b, 0x4e, 0x8a, - 0xfb, 0x6d, 0x60, 0x86, 0xec, 0x9b, 0xb5, 0x4e, 0x82, 0xd4, 0x4e, 0xad, 0x98, 0x8d, 0x2a, 0x35, - 0xca, 0x68, 0xc8, 0xb2, 0xc8, 0x4a, 0x03, 0xab, 0x52, 0x15, 0xdb, 0x1f, 0x35, 0x98, 0x98, 0xc5, - 0xcc, 0xe2, 0xe3, 0x60, 0xfb, 0x85, 0x3b, 0x6e, 0xb8, 0xaa, 0xe2, 0xc3, 0x3f, 0x12, 0xbc, 0xfa, - 0x44, 0xf3, 0x7c, 0x15, 0x53, 0x6c, 0x81, 0x54, 0xec, 0x52, 0xaa, 0x21, 0x1d, 0x19, 0x3d, 0xfb, - 0xb5, 0x59, 0xb5, 0x30, 0x9f, 0x8f, 0xeb, 0xef, 0x62, 0x97, 0x52, 0xc2, 0x83, 0xd8, 0x80, 0xb3, - 0x70, 0xbd, 0xcd, 0x0b, 0x9a, 0x5d, 0xd3, 0x07, 0xba, 0x26, 0xab, 0xef, 0x1a, 0xe8, 0xc8, 0x68, - 0x91, 0xe3, 0x31, 0x56, 0x41, 0xfc, 0x4a, 0x77, 0x9a, 0xa0, 0x23, 0x43, 0x21, 0xa5, 0xc4, 0x6f, - 0xa0, 0x5d, 0xf5, 0xd6, 0x44, 0x1d, 0x19, 0xb2, 0x7d, 0x6e, 0xd6, 0xd7, 0x08, 0x4c, 0xc2, 0x15, - 0x79, 0x0e, 0xe0, 0x77, 0x20, 0x87, 0x6b, 0x96, 0xd3, 0x6c, 0x4e, 0x69, 0x96, 0x6b, 0xa7, 0xba, - 0x68, 0xc8, 0xf6, 0xc5, 0x71, 0xbd, 0xf2, 0x70, 0x22, 0x3d, 0x3e, 0x5d, 0x9e, 0x90, 0x66, 0x1c, - 0x7f, 0x80, 0x6e, 0x9a, 0xb1, 0x87, 0x24, 0xaa, 0xf9, 0xce, 0x8b, 0xfc, 0x21, 0xd0, 0xff, 0x81, - 0x40, 0x2a, 0x15, 0x1e, 0x82, 0x90, 0x44, 0x7c, 0x3d, 0xca, 0x04, 0x97, 0xc9, 0x5f, 0x4f, 0x97, - 0x10, 0xec, 0x0a, 0x7a, 0x53, 0x64, 0xc9, 0x26, 0x26, 0x42, 0x12, 0xe1, 0x0b, 0x68, 0xad, 0xa2, - 0x28, 0xcb, 0x35, 0x41, 0x17, 0x0d, 0x85, 0x54, 0x06, 0xbf, 0x07, 0x08, 0xd9, 0x66, 0x43, 0xc3, - 0x22, 0x61, 0x1b, 0x7e, 0xe3, 0x9e, 0x3d, 0x38, 0x6e, 0xe0, 0xfc, 0x4b, 0xf0, 0x1d, 0x37, 0x88, - 0x61, 0x02, 0x72, 0x63, 0xfd, 0xb8, 0x0b, 0x9d, 0xf9, 0xed, 0x62, 0x79, 0x37, 0xbe, 0xbe, 0xf5, - 0xd4, 0x93, 0xd2, 0x5e, 0x79, 0xb5, 0x45, 0x58, 0x05, 0x65, 0xec, 0xba, 0xcb, 0x39, 0x99, 0xdd, - 0x4d, 0x5d, 0x8f, 0xa8, 0x02, 0x3e, 0x87, 0x6e, 0x19, 0xa8, 0x27, 0x37, 0xaa, 0x58, 0x32, 0x1f, - 0xa7, 0xbe, 0xbb, 0xf4, 0x67, 0xae, 0xa7, 0x4a, 0xf8, 0x14, 0xa4, 0xf9, 0xd4, 0xbf, 0x52, 0x5b, - 0xc3, 0xcf, 0xd0, 0x3b, 0x2c, 0x52, 0xd2, 0xfe, 0x6c, 0xb1, 0x74, 0x66, 0xbe, 0xef, 0x39, 0x0b, - 0xcf, 0xad, 0xfe, 0xf8, 0xdf, 0x22, 0x7c, 0x06, 0xb2, 0x33, 0xf6, 0xeb, 0x84, 0x2a, 0x60, 0x0c, - 0x3d, 0x67, 0xec, 0x37, 0x28, 0x55, 0x9c, 0x28, 0x8f, 0xfb, 0x01, 0xfa, 0xb9, 0x1f, 0xa0, 0xdf, - 0xfb, 0x01, 0x0a, 0xda, 0xfc, 0xfd, 0xbd, 0xfd, 0x1b, 0x00, 0x00, 0xff, 0xff, 0x8d, 0x1a, 0xa1, - 0xbe, 0xf7, 0x02, 0x00, 0x00, + // 553 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x53, 0xbf, 0x6f, 0x9b, 0x40, + 0x18, 0xcd, 0x01, 0x4e, 0xe3, 0x0f, 0xc7, 0x21, 0xa7, 0x0c, 0xc8, 0xad, 0x1c, 0xea, 0x89, 0x0e, + 0x01, 0x89, 0x0e, 0x5d, 0xaa, 0xaa, 0x8e, 0xa1, 0x51, 0xaa, 0x14, 0x5b, 0x17, 0x27, 0x1d, 0x2d, + 0x7e, 0x5c, 0x09, 0xaa, 0xc3, 0x21, 0x38, 0xa7, 0xf2, 0xd6, 0x7f, 0xac, 0x7b, 0xc6, 0xce, 0x1d, + 0xa2, 0x2a, 0x53, 0xff, 0x8c, 0x8a, 0x23, 0x34, 0x24, 0xaa, 0xda, 0x89, 0xf7, 0xbe, 0x7b, 0x4f, + 0xbc, 0xfb, 0x1e, 0x40, 0x37, 0xbe, 0xe0, 0x56, 0x5e, 0x30, 0xce, 0xf0, 0xa6, 0x80, 0xe1, 0xc0, + 0x49, 0x52, 0x7e, 0xb1, 0x0a, 0xad, 0x88, 0x5d, 0xda, 0xcb, 0x34, 0xcc, 0x9d, 0xdc, 0x4e, 0xd8, + 0x41, 0x8d, 0x0e, 0x0a, 0x1a, 0xb1, 0x22, 0xb6, 0xf3, 0xd0, 0xae, 0x51, 0xed, 0x1d, 0x1c, 0xb4, + 0x3c, 0x09, 0x4b, 0x98, 0x2d, 0xc6, 0xe1, 0xea, 0x93, 0x60, 0x82, 0x08, 0x54, 0xcb, 0x47, 0xbf, + 0x14, 0x78, 0xf2, 0x81, 0x96, 0x65, 0x90, 0x50, 0x6c, 0x83, 0xc2, 0xd7, 0x39, 0xd5, 0x91, 0x81, + 0xcc, 0xbe, 0xf3, 0xd4, 0xaa, 0x53, 0x58, 0x77, 0xc7, 0xcd, 0x73, 0xbe, 0xce, 0x29, 0x11, 0x42, + 0x6c, 0xc2, 0x4e, 0xb4, 0x5c, 0x95, 0x9c, 0x16, 0x27, 0xf4, 0x8a, 0x2e, 0x49, 0xf0, 0x45, 0x07, + 0x03, 0x99, 0x1d, 0xf2, 0x78, 0x8c, 0x35, 0x90, 0x3f, 0xd3, 0xb5, 0x2e, 0x19, 0xc8, 0xec, 0x91, + 0x0a, 0xe2, 0x17, 0xb0, 0x59, 0xe7, 0xd6, 0x65, 0x03, 0x99, 0xaa, 0xb3, 0x6b, 0x35, 0xd7, 0x08, + 0x2d, 0x22, 0x10, 0xb9, 0x13, 0xe0, 0xd7, 0xa0, 0x46, 0x4b, 0x56, 0xd2, 0x62, 0x46, 0x69, 0x51, + 0xea, 0x5b, 0x86, 0x6c, 0xaa, 0xce, 0xde, 0xe3, 0x78, 0xd5, 0xe1, 0xa1, 0x72, 0x7d, 0xb3, 0xbf, + 0x41, 0xda, 0x72, 0xfc, 0x16, 0xb6, 0xf3, 0x82, 0x5d, 0xa5, 0x71, 0xe3, 0xef, 0xfe, 0xd7, 0xff, + 0xd0, 0x30, 0xf8, 0x8a, 0x40, 0xa9, 0x10, 0x1e, 0x81, 0x94, 0xc6, 0x62, 0x3d, 0xbd, 0x43, 0x5c, + 0x29, 0x7f, 0xdc, 0xec, 0x43, 0xb8, 0xe6, 0xf4, 0x94, 0x17, 0x69, 0x96, 0x10, 0x29, 0x8d, 0xf1, + 0x1e, 0x74, 0x82, 0x38, 0x2e, 0x4a, 0x5d, 0x32, 0x64, 0xb3, 0x47, 0x6a, 0x82, 0xdf, 0x00, 0x44, + 0x2c, 0xcb, 0x68, 0xc4, 0x53, 0x96, 0x89, 0x1b, 0xf7, 0x9d, 0xe1, 0xe3, 0x04, 0x93, 0x3f, 0x0a, + 0xb1, 0xe3, 0x96, 0x63, 0x94, 0x82, 0xda, 0x5a, 0x3f, 0xde, 0x86, 0xee, 0xec, 0x6c, 0xbe, 0x38, + 0x1f, 0x9f, 0x9c, 0x79, 0xda, 0x46, 0x45, 0x8f, 0xbc, 0x86, 0x22, 0xac, 0x41, 0x6f, 0xec, 0xba, + 0x8b, 0x19, 0x99, 0x9e, 0x1f, 0xbb, 0x1e, 0xd1, 0x24, 0xbc, 0x0b, 0xdb, 0x95, 0xa0, 0x99, 0x9c, + 0x6a, 0x72, 0xe5, 0x79, 0x77, 0xec, 0xbb, 0x0b, 0x7f, 0xea, 0x7a, 0x9a, 0x82, 0xb7, 0x40, 0x99, + 0x1d, 0xfb, 0x47, 0x5a, 0x67, 0xf4, 0x11, 0xfa, 0x0f, 0x83, 0x54, 0x6e, 0x7f, 0x3a, 0x5f, 0x4c, + 0xa6, 0xbe, 0xef, 0x4d, 0xe6, 0x9e, 0x5b, 0xbf, 0xf1, 0x9e, 0x22, 0xbc, 0x03, 0xea, 0x64, 0xec, + 0x37, 0x0a, 0x4d, 0xc2, 0x18, 0xfa, 0x93, 0xb1, 0xdf, 0x72, 0x69, 0xf2, 0xe8, 0x1b, 0x82, 0x3d, + 0xc2, 0x56, 0x3c, 0xcd, 0x92, 0x79, 0x10, 0x2e, 0xe9, 0x69, 0x16, 0xe4, 0xe5, 0x05, 0xe3, 0xf8, + 0x15, 0x74, 0x72, 0xd1, 0x0c, 0x12, 0xcd, 0x3c, 0x6f, 0xf6, 0xf2, 0x37, 0xb1, 0xa8, 0x89, 0xd4, + 0x7a, 0x6c, 0x80, 0xca, 0xd3, 0x4b, 0x5a, 0xf2, 0xe0, 0x32, 0xf7, 0x4b, 0xf1, 0x75, 0xc9, 0xa4, + 0x3d, 0x1a, 0xbc, 0xbf, 0x6b, 0xae, 0x7f, 0xdf, 0xdc, 0x3f, 0x5a, 0x7a, 0x06, 0xdd, 0x20, 0x8e, + 0x69, 0x3c, 0xe6, 0x7e, 0x29, 0x4a, 0x92, 0xc9, 0xfd, 0xe0, 0xb0, 0x77, 0x7d, 0x3b, 0x44, 0xdf, + 0x6f, 0x87, 0xe8, 0xe7, 0xed, 0x10, 0x85, 0x9b, 0xe2, 0xff, 0x79, 0xf9, 0x3b, 0x00, 0x00, 0xff, + 0xff, 0x62, 0xd2, 0x66, 0x2d, 0xb7, 0x03, 0x00, 0x00, } func (m *Message) Marshal() (dAtA []byte, err error) { @@ -432,6 +564,100 @@ func (m *Message_Peer) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *RoutingTableSnapshot) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *RoutingTableSnapshot) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *RoutingTableSnapshot) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if m.TimestampNs != 0 { + i = encodeVarintDht(dAtA, i, uint64(m.TimestampNs)) + i-- + dAtA[i] = 0x10 + } + if len(m.Peers) > 0 { + for iNdEx := len(m.Peers) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Peers[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintDht(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + +func (m *RoutingTableSnapshot_Peer) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *RoutingTableSnapshot_Peer) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *RoutingTableSnapshot_Peer) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if m.AddedAtNs != 0 { + i = encodeVarintDht(dAtA, i, uint64(m.AddedAtNs)) + i-- + dAtA[i] = 0x18 + } + if len(m.Addrs) > 0 { + for iNdEx := len(m.Addrs) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.Addrs[iNdEx]) + copy(dAtA[i:], m.Addrs[iNdEx]) + i = encodeVarintDht(dAtA, i, uint64(len(m.Addrs[iNdEx]))) + i-- + dAtA[i] = 0x12 + } + } + if len(m.Id) > 0 { + i -= len(m.Id) + copy(dAtA[i:], m.Id) + i = encodeVarintDht(dAtA, i, uint64(len(m.Id))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + func encodeVarintDht(dAtA []byte, offset int, v uint64) int { offset -= sovDht(v) base := offset @@ -504,6 +730,52 @@ func (m *Message_Peer) Size() (n int) { return n } +func (m *RoutingTableSnapshot) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.Peers) > 0 { + for _, e := range m.Peers { + l = e.Size() + n += 1 + l + sovDht(uint64(l)) + } + } + if m.TimestampNs != 0 { + n += 1 + sovDht(uint64(m.TimestampNs)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func (m *RoutingTableSnapshot_Peer) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Id) + if l > 0 { + n += 1 + l + sovDht(uint64(l)) + } + if len(m.Addrs) > 0 { + for _, b := range m.Addrs { + l = len(b) + n += 1 + l + sovDht(uint64(l)) + } + } + if m.AddedAtNs != 0 { + n += 1 + sovDht(uint64(m.AddedAtNs)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + func sovDht(x uint64) (n int) { return (math_bits.Len64(x|1) + 6) / 7 } @@ -878,6 +1150,252 @@ func (m *Message_Peer) Unmarshal(dAtA []byte) error { } return nil } +func (m *RoutingTableSnapshot) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDht + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: RoutingTableSnapshot: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: RoutingTableSnapshot: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Peers", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDht + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthDht + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthDht + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Peers = append(m.Peers, &RoutingTableSnapshot_Peer{}) + if err := m.Peers[len(m.Peers)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field TimestampNs", wireType) + } + m.TimestampNs = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDht + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.TimestampNs |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipDht(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthDht + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthDht + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *RoutingTableSnapshot_Peer) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDht + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Peer: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Peer: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Id", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDht + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthDht + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthDht + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Id = append(m.Id[:0], dAtA[iNdEx:postIndex]...) + if m.Id == nil { + m.Id = []byte{} + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Addrs", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDht + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthDht + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthDht + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Addrs = append(m.Addrs, make([]byte, postIndex-iNdEx)) + copy(m.Addrs[len(m.Addrs)-1], dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field AddedAtNs", wireType) + } + m.AddedAtNs = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDht + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.AddedAtNs |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipDht(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthDht + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthDht + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipDht(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 diff --git a/pb/dht.proto b/pb/dht.proto index 18bfd7419..4bbb5f164 100644 --- a/pb/dht.proto +++ b/pb/dht.proto @@ -70,3 +70,26 @@ message Message { // GET_VALUE, ADD_PROVIDER, GET_PROVIDERS repeated Peer providerPeers = 9 [(gogoproto.nullable) = false]; } + + +// Encapsulates a routing table snapshot for persistence. Not to be transmitted over the wire. +message RoutingTableSnapshot { + message Peer { + // ID of a given peer. + bytes id = 1; + + // multiaddrs for a given peer + repeated bytes addrs = 2; + + // timestamp for when the peer was added to the Routing Table. + // Unix epoch nano seconds. + int64 addedAtNs = 3; + } + + // The peers that were members of the routing table. + repeated Peer peers = 1; + + // The timestamp when this snapshot was taken. + // Unix epoch nano seconds. + int64 timestampNs = 2; +} \ No newline at end of file diff --git a/pb/message.go b/pb/message.go index d9d2aeda0..7b2fe4e92 100644 --- a/pb/message.go +++ b/pb/message.go @@ -117,6 +117,24 @@ func (m *Message_Peer) Addresses() []ma.Multiaddr { return maddrs } +func (p *RoutingTableSnapshot_Peer) Addresses() []ma.Multiaddr { + if p == nil { + return nil + } + + maddrs := make([]ma.Multiaddr, 0, len(p.Addrs)) + for _, addr := range p.Addrs { + maddr, err := ma.NewMultiaddrBytes(addr) + if err != nil { + log.Debugw("error decoding multiaddr for peer", "peer", peer.ID(p.Id), "error", err) + continue + } + + maddrs = append(maddrs, maddr) + } + return maddrs +} + // GetClusterLevel gets and adjusts the cluster level on the message. // a +/- 1 adjustment is needed to distinguish a valid first level (1) and // default "no value" protobuf behavior (0) diff --git a/persist/interfaces.go b/persist/interfaces.go new file mode 100644 index 000000000..5fd961ce1 --- /dev/null +++ b/persist/interfaces.go @@ -0,0 +1,20 @@ +package persist + +import ( + "github.com/libp2p/go-libp2p-core/host" + + kbucket "github.com/libp2p/go-libp2p-kbucket" + + "github.com/ipfs/go-log" +) + +var logSnapshot = log.Logger("dht/snapshot") + +// A Snapshotter provides the ability to save and restore a routing table from a Persistent medium. +type Snapshotter interface { + // Load recovers a snapshot from storage, and returns candidates to integrate in a fresh routing table. + Load() ([]*RtSnapshotPeerInfo, error) + + // Store persists the current state of the routing table. + Store(h host.Host, rt *kbucket.RoutingTable) error +} diff --git a/persist/snapshot.go b/persist/snapshot.go new file mode 100644 index 000000000..41061295b --- /dev/null +++ b/persist/snapshot.go @@ -0,0 +1,123 @@ +package persist + +import ( + "errors" + "fmt" + "time" + + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/peer" + + dht_pb "github.com/libp2p/go-libp2p-kad-dht/pb" + kb "github.com/libp2p/go-libp2p-kbucket" + + ds "github.com/ipfs/go-datastore" + nsds "github.com/ipfs/go-datastore/namespace" + ma "github.com/multiformats/go-multiaddr" +) + +// RtSnapshotPeerInfo is the information for a peer in the Routing Table that we persist. +type RtSnapshotPeerInfo struct { + ID peer.ID + Addrs []ma.Multiaddr + AddedAt time.Time +} + +var ( + dsSnapshotKey = ds.NewKey("routing_table") +) + +// DefaultSnapshotNS is the default namespace to use for the key with which +// the routing table snapshot will be persisted to the data store. +var DefaultSnapshotNS = "/kad-dht/snapshot" + +type dsSnapshotter struct { + ds.Datastore +} + +var _ Snapshotter = (*dsSnapshotter)(nil) + +// NewDatastoreSnapshotter returns a Snapshotter backed by a datastore, under the specified non-optional namespace. +func NewDatastoreSnapshotter(dstore ds.Datastore, namespace string) (Snapshotter, error) { + if dstore == nil { + return nil, errors.New("datastore is nil when creating a datastore snapshotter") + } + if namespace == "" { + return nil, errors.New("blank namespace when creating a datastore snapshotter") + } + dstore = nsds.Wrap(dstore, ds.NewKey(namespace)) + return &dsSnapshotter{dstore}, nil +} + +func (dsp *dsSnapshotter) Load() ([]*RtSnapshotPeerInfo, error) { + val, err := dsp.Get(dsSnapshotKey) + + switch err { + case nil: + case ds.ErrNotFound: + return nil, nil + default: + return nil, err + } + + s := &dht_pb.RoutingTableSnapshot{} + if err := s.Unmarshal(val); err != nil { + return nil, fmt.Errorf("failed to unmarshal snapshot: %w", err) + } + + result := make([]*RtSnapshotPeerInfo, 0, len(s.Peers)) + for i := range s.Peers { + p := s.Peers[i] + var id peer.ID + if err := id.Unmarshal(p.Id); err != nil { + logSnapshot.Warnw("failed to unmarshal peerId from snapshot", "err", err) + continue + } + + result = append(result, &RtSnapshotPeerInfo{ + ID: id, + Addrs: p.Addresses(), + AddedAt: time.Unix(0, p.AddedAtNs)}) + } + return result, err +} + +func (dsp *dsSnapshotter) Store(h host.Host, rt *kb.RoutingTable) error { + pinfos := rt.GetPeerInfos() + snapshotPeers := make([]*dht_pb.RoutingTableSnapshot_Peer, 0, len(pinfos)) + + for _, p := range pinfos { + id, err := p.Id.MarshalBinary() + if err != nil { + logSnapshot.Warnw("encountered error while adding peer to routing table snapshot; skipping", "peer", p.Id, "err", err) + continue + } + rp := &dht_pb.RoutingTableSnapshot_Peer{} + rp.Id = id + addrs := h.Peerstore().Addrs(p.Id) + rp.Addrs = make([][]byte, len(addrs)) + for i, maddr := range addrs { + rp.Addrs[i] = maddr.Bytes() + } + + rp.AddedAtNs = p.AddedAt.UnixNano() + snapshotPeers = append(snapshotPeers, rp) + } + + snap := dht_pb.RoutingTableSnapshot{ + Peers: snapshotPeers, + TimestampNs: time.Now().Unix(), + } + + bytes, err := snap.Marshal() + if err != nil { + return fmt.Errorf("failed to marshal snapshot %w", err) + } + + if err := dsp.Put(dsSnapshotKey, bytes); err != nil { + return fmt.Errorf("failed to persist snapshot %w", err) + } + + // flush to disk + return dsp.Sync(dsSnapshotKey) +} diff --git a/persist/snapshot_test.go b/persist/snapshot_test.go new file mode 100644 index 000000000..2ebfcda7a --- /dev/null +++ b/persist/snapshot_test.go @@ -0,0 +1,87 @@ +package persist + +import ( + "context" + "testing" + "time" + + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/test" + + blankhost "github.com/libp2p/go-libp2p-blankhost" + kb "github.com/libp2p/go-libp2p-kbucket" + peerstore "github.com/libp2p/go-libp2p-peerstore" + swarmt "github.com/libp2p/go-libp2p-swarm/testing" + + ds "github.com/ipfs/go-datastore" + ma "github.com/multiformats/go-multiaddr" + "github.com/stretchr/testify/require" +) + +func TestNewDatastoreSnapshotter(t *testing.T) { + _, err := NewDatastoreSnapshotter(nil, "rand") + require.Error(t, err) + _, err = NewDatastoreSnapshotter(ds.NewMapDatastore(), "") + require.Error(t, err) + _, err = NewDatastoreSnapshotter(ds.NewMapDatastore(), "rand") + require.NoError(t, err) +} + +func TestStoreAndLoad(t *testing.T) { + nBuckets := 5 + nPeers := 2 + ctx := context.Background() + + h := blankhost.NewBlankHost(swarmt.GenSwarm(t, ctx)) + + // create a routing table with nBuckets & nPeers in each bucket + local := test.RandPeerIDFatal(t) + rt, err := kb.NewRoutingTable(nPeers, kb.ConvertPeerID(local), time.Hour, peerstore.NewMetrics(), 1*time.Hour) + require.NoError(t, err) + require.NotNil(t, rt) + addrsForPeer := make(map[peer.ID][]ma.Multiaddr) + + for i := 0; i < nBuckets; i++ { + for j := 0; j < nPeers; j++ { + p, err := rt.GenRandPeerID(uint(i)) + require.NoError(t, err) + b, err := rt.TryAddPeer(p, true) + require.NoError(t, err) + require.True(t, b) + addrs := test.GenerateTestAddrs(2) + addrsForPeer[p] = addrs + h.Peerstore().AddAddrs(p, addrs, 1*time.Hour) + } + } + + require.Len(t, rt.ListPeers(), nBuckets*nPeers) + require.Len(t, addrsForPeer, nBuckets*nPeers) + + // create a snapshotter with an in-memory ds + snapshotter, err := NewDatastoreSnapshotter(ds.NewMapDatastore(), "test") + require.NoError(t, err) + + // store snapshot + require.NoError(t, snapshotter.Store(h, rt)) + + // load snapshot & verify it is as expected + rtPeers, err := snapshotter.Load() + require.NoError(t, err) + require.Len(t, rtPeers, nBuckets*nPeers) + for i := range rtPeers { + pi := rtPeers[i] + addrs, ok := addrsForPeer[pi.ID] + require.True(t, ok) + require.ElementsMatch(t, addrs, pi.Addrs) + require.False(t, pi.AddedAt.IsZero()) + delete(addrsForPeer, pi.ID) + } + require.Empty(t, addrsForPeer) + + // Load an empty snapshot + snapshotter, err = NewDatastoreSnapshotter(ds.NewMapDatastore(), "test") + require.NoError(t, err) + peers, err := snapshotter.Load() + require.NoError(t, err) + require.Empty(t, peers) +}