diff --git a/CHANGELOG.md b/CHANGELOG.md index 7afe79f54..8a4122902 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,10 +16,26 @@ The following emojis are used to highlight certain changes: ### Added +* ✨ The `routing/http` implements Delegated Peer Routing introduced in [IPIP-417](https://github.com/ipfs/specs/pull/417). + ### Changed +* 🛠 The `routing/http` package received the following modifications: + * Client `GetIPNSRecord` and `PutIPNSRecord` have been renamed to `GetIPNS` and + `PutIPNS`, respectively. Similarly, the required function names in the server + `ContentRouter` have also been updated. + * `ReadBitswapProviderRecord` has been renamed to `BitswapRecord` and marked as deprecated. + From now on, please use the protocol-agnostic `PeerRecord` for most use cases. The new + Peer Schema has been introduced in [IPIP-417](https://github.com/ipfs/specs/pull/417). + ### Removed +* 🛠 The `routing/http` package experienced following removals: + * Server and client no longer support the experimental `Provide` method. + `ProvideBitswap` is still usable, but marked as deprecated. A protocol-agnostic + provide mechanism is being worked on in [IPIP-378](https://github.com/ipfs/specs/pull/378). + * Server no longer exports `FindProvidersPath` and `ProvidePath`. + ### Fixed ### Security @@ -32,7 +48,7 @@ The following emojis are used to highlight certain changes: as per [IPIP-379](https://specs.ipfs.tech/ipips/ipip-0379/). * 🛠 The `verifycid` package has been updated with the new Allowlist interface as part of reducing globals efforts. -* The `blockservice` and `provider` packages has been updated to accommodate for +* The `blockservice` and `provider` packages has been updated to accommodate for changes in `verifycid`. ### Changed diff --git a/routing/http/README.md b/routing/http/README.md index 65650ed50..0f0281f8f 100644 --- a/routing/http/README.md +++ b/routing/http/README.md @@ -1,24 +1,9 @@ -go-delegated-routing +Routing V1 Server and Client ======================= -> Delegated routing Client and Server over Reframe RPC - -This package provides delegated routing implementation in Go: -- Client (for IPFS nodes like [Kubo](https://github.com/ipfs/kubo/blob/master/docs/config.md#routingrouters-parameters)), -- Server (for public indexers such as https://cid.contact) +> Delegated Routing V1 Server and Client over HTTP API. ## Documentation -- Go docs: https://pkg.go.dev/github.com/ipfs/boxo/routing/http/ - -## Lead Maintainer - -🦗🎶 - -## Contributing - -Contributions are welcome! This repository is part of the IPFS project and therefore governed by our [contributing guidelines](https://github.com/ipfs/community/blob/master/CONTRIBUTING.md). - -## License - -[SPDX-License-Identifier: Apache-2.0 OR MIT](LICENSE.md) \ No newline at end of file +- Go Documentation: https://pkg.go.dev/github.com/ipfs/boxo/routing/http +- Routing V1 Specification: https://specs.ipfs.tech/routing/http-routing-v1/ diff --git a/routing/http/client/client.go b/routing/http/client/client.go index c504a0315..4a0d29b33 100644 --- a/routing/http/client/client.go +++ b/routing/http/client/client.go @@ -16,14 +16,12 @@ import ( ipns "github.com/ipfs/boxo/ipns" "github.com/ipfs/boxo/routing/http/contentrouter" "github.com/ipfs/boxo/routing/http/internal/drjson" - "github.com/ipfs/boxo/routing/http/server" "github.com/ipfs/boxo/routing/http/types" "github.com/ipfs/boxo/routing/http/types/iter" jsontypes "github.com/ipfs/boxo/routing/http/types/json" "github.com/ipfs/boxo/routing/http/types/ndjson" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" - record "github.com/libp2p/go-libp2p-record" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" @@ -31,7 +29,7 @@ import ( var ( _ contentrouter.Client = &client{} - logger = logging.Logger("service/delegatedrouting") + logger = logging.Logger("routing/http/client") defaultHTTPClient = &http.Client{ Transport: &ResponseBodyLimitedTransport{ RoundTripper: http.DefaultTransport, @@ -50,18 +48,17 @@ const ( type client struct { baseURL string httpClient httpClient - validator record.Validator clock clock.Clock - - accepts string + accepts string peerID peer.ID addrs []types.Multiaddr identity crypto.PrivKey - // called immeidately after signing a provide req - // used for testing, e.g. testing the server with a mangled signature - afterSignCallback func(req *types.WriteBitswapProviderRecord) + // Called immediately after signing a provide request. It is used + // for testing, e.g., testing the server with a mangled signature. + //lint:ignore SA1019 // ignore staticcheck + afterSignCallback func(req *types.WriteBitswapRecord) } // defaultUserAgent is used as a fallback to inform HTTP server which library @@ -121,12 +118,11 @@ func WithStreamResultsRequired() Option { } // New creates a content routing API client. -// The Provider and identity parameters are option. If they are nil, the `Provide` method will not function. +// The Provider and identity parameters are option. If they are nil, the [client.ProvideBitswap] method will not function. func New(baseURL string, opts ...Option) (*client, error) { client := &client{ baseURL: baseURL, httpClient: defaultHTTPClient, - validator: ipns.Validator{}, clock: clock.New(), accepts: strings.Join([]string{mediaTypeNDJSON, mediaTypeJSON}, ","), } @@ -164,11 +160,11 @@ func (c *measuringIter[T]) Close() error { return c.Iter.Close() } -func (c *client) FindProviders(ctx context.Context, key cid.Cid) (provs iter.ResultIter[types.ProviderResponse], err error) { +func (c *client) FindProviders(ctx context.Context, key cid.Cid) (providers iter.ResultIter[types.Record], err error) { // TODO test measurements m := newMeasurement("FindProviders") - url := c.baseURL + server.ProvidePath + key.String() + url := c.baseURL + "/routing/v1/providers/" + key.String() req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { return nil, err @@ -192,7 +188,7 @@ func (c *client) FindProviders(ctx context.Context, key cid.Cid) (provs iter.Res if resp.StatusCode == http.StatusNotFound { resp.Body.Close() m.record(ctx) - return iter.FromSlice[iter.Result[types.ProviderResponse]](nil), nil + return iter.FromSlice[iter.Result[types.Record]](nil), nil } if resp.StatusCode != http.StatusOK { @@ -220,24 +216,27 @@ func (c *client) FindProviders(ctx context.Context, key cid.Cid) (provs iter.Res } }() - var it iter.ResultIter[types.ProviderResponse] + var it iter.ResultIter[types.Record] switch mediaType { case mediaTypeJSON: - parsedResp := &jsontypes.ReadProvidersResponse{} + parsedResp := &jsontypes.ProvidersResponse{} err = json.NewDecoder(resp.Body).Decode(parsedResp) - var sliceIt iter.Iter[types.ProviderResponse] = iter.FromSlice(parsedResp.Providers) + var sliceIt iter.Iter[types.Record] = iter.FromSlice(parsedResp.Providers) it = iter.ToResultIter(sliceIt) case mediaTypeNDJSON: skipBodyClose = true - it = ndjson.NewReadProvidersResponseIter(resp.Body) + it = ndjson.NewRecordsIter(resp.Body) default: logger.Errorw("unknown media type", "MediaType", mediaType, "ContentType", respContentType) return nil, errors.New("unknown content type") } - return &measuringIter[iter.Result[types.ProviderResponse]]{Iter: it, ctx: ctx, m: m}, nil + return &measuringIter[iter.Result[types.Record]]{Iter: it, ctx: ctx, m: m}, nil } +// Deprecated: protocol-agnostic provide is being worked on in [IPIP-378]: +// +// [IPIP-378]: https://github.com/ipfs/specs/pull/378 func (c *client) ProvideBitswap(ctx context.Context, keys []cid.Cid, ttl time.Duration) (time.Duration, error) { if c.identity == nil { return 0, errors.New("cannot provide Bitswap records without an identity") @@ -253,7 +252,7 @@ func (c *client) ProvideBitswap(ctx context.Context, keys []cid.Cid, ttl time.Du now := c.clock.Now() - req := types.WriteBitswapProviderRecord{ + req := types.WriteBitswapRecord{ Protocol: "transport-bitswap", Schema: types.SchemaBitswap, Payload: types.BitswapPayload{ @@ -282,10 +281,13 @@ func (c *client) ProvideBitswap(ctx context.Context, keys []cid.Cid, ttl time.Du } // ProvideAsync makes a provide request to a delegated router -func (c *client) provideSignedBitswapRecord(ctx context.Context, bswp *types.WriteBitswapProviderRecord) (time.Duration, error) { - req := jsontypes.WriteProvidersRequest{Providers: []types.WriteProviderRecord{bswp}} +// +//lint:ignore SA1019 // ignore staticcheck +func (c *client) provideSignedBitswapRecord(ctx context.Context, bswp *types.WriteBitswapRecord) (time.Duration, error) { + //lint:ignore SA1019 // ignore staticcheck + req := jsontypes.WriteProvidersRequest{Providers: []types.Record{bswp}} - url := c.baseURL + server.ProvidePath + url := c.baseURL + "/routing/v1/providers/" b, err := drjson.MarshalJSONBytes(req) if err != nil { @@ -306,6 +308,8 @@ func (c *client) provideSignedBitswapRecord(ctx context.Context, bswp *types.Wri if resp.StatusCode != http.StatusOK { return 0, httpError(resp.StatusCode, resp.Body) } + + //lint:ignore SA1019 // ignore staticcheck var provideResult jsontypes.WriteProvidersResponse err = json.NewDecoder(resp.Body).Decode(&provideResult) if err != nil { @@ -315,7 +319,8 @@ func (c *client) provideSignedBitswapRecord(ctx context.Context, bswp *types.Wri return 0, fmt.Errorf("expected 1 result but got %d", len(provideResult.ProvideResults)) } - v, ok := provideResult.ProvideResults[0].(*types.WriteBitswapProviderRecordResponse) + //lint:ignore SA1019 // ignore staticcheck + v, ok := provideResult.ProvideResults[0].(*types.WriteBitswapRecordResponse) if !ok { return 0, fmt.Errorf("expected AdvisoryTTL field") } @@ -327,7 +332,80 @@ func (c *client) provideSignedBitswapRecord(ctx context.Context, bswp *types.Wri return 0, nil } -func (c *client) FindIPNSRecord(ctx context.Context, name ipns.Name) (*ipns.Record, error) { +func (c *client) FindPeers(ctx context.Context, pid peer.ID) (peers iter.ResultIter[types.Record], err error) { + m := newMeasurement("FindPeers") + + url := c.baseURL + "/routing/v1/peers/" + peer.ToCid(pid).String() + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, err + } + req.Header.Set("Accept", c.accepts) + + m.host = req.Host + + start := c.clock.Now() + resp, err := c.httpClient.Do(req) + + m.err = err + m.latency = c.clock.Since(start) + + if err != nil { + m.record(ctx) + return nil, err + } + + m.statusCode = resp.StatusCode + if resp.StatusCode == http.StatusNotFound { + resp.Body.Close() + m.record(ctx) + return iter.FromSlice[iter.Result[types.Record]](nil), nil + } + + if resp.StatusCode != http.StatusOK { + err := httpError(resp.StatusCode, resp.Body) + resp.Body.Close() + m.record(ctx) + return nil, err + } + + respContentType := resp.Header.Get("Content-Type") + mediaType, _, err := mime.ParseMediaType(respContentType) + if err != nil { + resp.Body.Close() + m.err = err + m.record(ctx) + return nil, fmt.Errorf("parsing Content-Type: %w", err) + } + + m.mediaType = mediaType + + var skipBodyClose bool + defer func() { + if !skipBodyClose { + resp.Body.Close() + } + }() + + var it iter.ResultIter[types.Record] + switch mediaType { + case mediaTypeJSON: + parsedResp := &jsontypes.PeersResponse{} + err = json.NewDecoder(resp.Body).Decode(parsedResp) + var sliceIt iter.Iter[types.Record] = iter.FromSlice(parsedResp.Peers) + it = iter.ToResultIter(sliceIt) + case mediaTypeNDJSON: + skipBodyClose = true + it = ndjson.NewRecordsIter(resp.Body) + default: + logger.Errorw("unknown media type", "MediaType", mediaType, "ContentType", respContentType) + return nil, errors.New("unknown content type") + } + + return &measuringIter[iter.Result[types.Record]]{Iter: it, ctx: ctx, m: m}, nil +} + +func (c *client) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) { url := c.baseURL + "/routing/v1/ipns/" + name.String() httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) @@ -365,7 +443,7 @@ func (c *client) FindIPNSRecord(ctx context.Context, name ipns.Name) (*ipns.Reco return record, nil } -func (c *client) ProvideIPNSRecord(ctx context.Context, name ipns.Name, record *ipns.Record) error { +func (c *client) PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Record) error { url := c.baseURL + "/routing/v1/ipns/" + name.String() rawRecord, err := ipns.MarshalRecord(record) diff --git a/routing/http/client/client_test.go b/routing/http/client/client_test.go index c1690b3f2..95683bc3f 100644 --- a/routing/http/client/client_test.go +++ b/routing/http/client/client_test.go @@ -3,6 +3,7 @@ package client import ( "context" "crypto/rand" + "encoding/json" "errors" "net/http" "net/http/httptest" @@ -31,27 +32,28 @@ import ( type mockContentRouter struct{ mock.Mock } -func (m *mockContentRouter) FindProviders(ctx context.Context, key cid.Cid, limit int) (iter.ResultIter[types.ProviderResponse], error) { +func (m *mockContentRouter) FindProviders(ctx context.Context, key cid.Cid, limit int) (iter.ResultIter[types.Record], error) { args := m.Called(ctx, key, limit) - return args.Get(0).(iter.ResultIter[types.ProviderResponse]), args.Error(1) + return args.Get(0).(iter.ResultIter[types.Record]), args.Error(1) } +//lint:ignore SA1019 // ignore staticcheck func (m *mockContentRouter) ProvideBitswap(ctx context.Context, req *server.BitswapWriteProvideRequest) (time.Duration, error) { args := m.Called(ctx, req) return args.Get(0).(time.Duration), args.Error(1) } -func (m *mockContentRouter) Provide(ctx context.Context, req *server.WriteProvideRequest) (types.ProviderResponse, error) { - args := m.Called(ctx, req) - return args.Get(0).(types.ProviderResponse), args.Error(1) +func (m *mockContentRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[types.Record], error) { + args := m.Called(ctx, pid, limit) + return args.Get(0).(iter.ResultIter[types.Record]), args.Error(1) } -func (m *mockContentRouter) FindIPNSRecord(ctx context.Context, name ipns.Name) (*ipns.Record, error) { +func (m *mockContentRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) { args := m.Called(ctx, name) return args.Get(0).(*ipns.Record), args.Error(1) } -func (m *mockContentRouter) ProvideIPNSRecord(ctx context.Context, name ipns.Name, record *ipns.Record) error { +func (m *mockContentRouter) PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Record) error { args := m.Called(ctx, name, record) return args.Error(0) } @@ -144,6 +146,13 @@ func makeCID() cid.Cid { return c } +func drAddrsToAddrs(drmas []types.Multiaddr) (addrs []multiaddr.Multiaddr) { + for _, a := range drmas { + addrs = append(addrs, a.Multiaddr) + } + return +} + func addrsToDRAddrs(addrs []multiaddr.Multiaddr) (drmas []types.Multiaddr) { for _, a := range addrs { drmas = append(drmas, types.Multiaddr{Multiaddr: a}) @@ -151,19 +160,26 @@ func addrsToDRAddrs(addrs []multiaddr.Multiaddr) (drmas []types.Multiaddr) { return } -func drAddrsToAddrs(drmas []types.Multiaddr) (addrs []multiaddr.Multiaddr) { - for _, a := range drmas { - addrs = append(addrs, a.Multiaddr) +func makePeerRecord() types.PeerRecord { + peerID, addrs, _ := makeProviderAndIdentity() + return types.PeerRecord{ + Schema: types.SchemaPeer, + ID: &peerID, + Protocols: []string{"transport-bitswap"}, + Addrs: addrsToDRAddrs(addrs), + Extra: map[string]json.RawMessage{}, } - return } -func makeBSReadProviderResp() types.ReadBitswapProviderRecord { +//lint:ignore SA1019 // ignore staticcheck +func makeBitswapRecord() types.BitswapRecord { peerID, addrs, _ := makeProviderAndIdentity() - return types.ReadBitswapProviderRecord{ - Protocol: "transport-bitswap", + //lint:ignore SA1019 // ignore staticcheck + return types.BitswapRecord{ + //lint:ignore SA1019 // ignore staticcheck Schema: types.SchemaBitswap, ID: &peerID, + Protocol: "transport-bitswap", Addrs: addrsToDRAddrs(addrs), } } @@ -208,35 +224,46 @@ func (e *osErrContains) errContains(t *testing.T, err error) { } func TestClient_FindProviders(t *testing.T) { - bsReadProvResp := makeBSReadProviderResp() - bitswapProvs := []iter.Result[types.ProviderResponse]{ - {Val: &bsReadProvResp}, + peerRecord := makePeerRecord() + peerProviders := []iter.Result[types.Record]{ + {Val: &peerRecord}, + } + + bitswapRecord := makeBitswapRecord() + bitswapProviders := []iter.Result[types.Record]{ + {Val: &bitswapRecord}, } cases := []struct { name string httpStatusCode int stopServer bool - routerProvs []iter.Result[types.ProviderResponse] + routerResult []iter.Result[types.Record] routerErr error clientRequiresStreaming bool serverStreamingDisabled bool expErrContains osErrContains - expProvs []iter.Result[types.ProviderResponse] + expResult []iter.Result[types.Record] expStreamingResponse bool expJSONResponse bool }{ { name: "happy case", - routerProvs: bitswapProvs, - expProvs: bitswapProvs, + routerResult: peerProviders, + expResult: peerProviders, + expStreamingResponse: true, + }, + { + name: "happy case (with deprecated bitswap schema)", + routerResult: bitswapProviders, + expResult: bitswapProviders, expStreamingResponse: true, }, { name: "server doesn't support streaming", - routerProvs: bitswapProvs, - expProvs: bitswapProvs, + routerResult: peerProviders, + expResult: peerProviders, serverStreamingDisabled: true, expJSONResponse: true, }, @@ -262,7 +289,7 @@ func TestClient_FindProviders(t *testing.T) { { name: "returns no providers if the HTTP server returns a 404 respones", httpStatusCode: 404, - expProvs: nil, + expResult: nil, }, } for _, c := range cases { @@ -287,6 +314,7 @@ func TestClient_FindProviders(t *testing.T) { assert.Equal(t, mediaTypeNDJSON, r.Header.Get("Content-Type")) }) } + if c.expJSONResponse { onRespReceived = append(onRespReceived, func(r *http.Response) { assert.Equal(t, mediaTypeJSON, r.Header.Get("Content-Type")) @@ -315,20 +343,18 @@ func TestClient_FindProviders(t *testing.T) { } cid := makeCID() - findProvsIter := iter.FromSlice(c.routerProvs) - + routerResultIter := iter.FromSlice(c.routerResult) if c.expStreamingResponse { - router.On("FindProviders", mock.Anything, cid, 0).Return(findProvsIter, c.routerErr) + router.On("FindProviders", mock.Anything, cid, 0).Return(routerResultIter, c.routerErr) } else { - router.On("FindProviders", mock.Anything, cid, 20).Return(findProvsIter, c.routerErr) + router.On("FindProviders", mock.Anything, cid, 20).Return(routerResultIter, c.routerErr) } - provsIter, err := client.FindProviders(ctx, cid) - + resultIter, err := client.FindProviders(ctx, cid) c.expErrContains.errContains(t, err) - provs := iter.ReadAll[iter.Result[types.ProviderResponse]](provsIter) - assert.Equal(t, c.expProvs, provs) + results := iter.ReadAll[iter.Result[types.Record]](resultIter) + assert.Equal(t, c.expResult, results) }) } } @@ -416,7 +442,8 @@ func TestClient_Provide(t *testing.T) { deps.server.Close() } if c.mangleSignature { - client.afterSignCallback = func(req *types.WriteBitswapProviderRecord) { + //lint:ignore SA1019 // ignore staticcheck + client.afterSignCallback = func(req *types.WriteBitswapRecord) { mh, err := multihash.Encode([]byte("boom"), multihash.SHA2_256) require.NoError(t, err) mb, err := multibase.Encode(multibase.Base64, mh) @@ -426,6 +453,7 @@ func TestClient_Provide(t *testing.T) { } } + //lint:ignore SA1019 // ignore staticcheck expectedProvReq := &server.BitswapWriteProvideRequest{ Keys: c.cids, Timestamp: clock.Now().Truncate(time.Millisecond), @@ -457,6 +485,134 @@ func TestClient_Provide(t *testing.T) { } } +func TestClient_FindPeers(t *testing.T) { + peerRecord := makePeerRecord() + peerRecords := []iter.Result[types.Record]{ + {Val: &peerRecord}, + } + pid := *peerRecord.ID + + cases := []struct { + name string + httpStatusCode int + stopServer bool + routerResult []iter.Result[types.Record] + routerErr error + clientRequiresStreaming bool + serverStreamingDisabled bool + + expErrContains osErrContains + expResult []iter.Result[types.Record] + expStreamingResponse bool + expJSONResponse bool + }{ + { + name: "happy case", + routerResult: peerRecords, + expResult: peerRecords, + expStreamingResponse: true, + }, + { + name: "server doesn't support streaming", + routerResult: peerRecords, + expResult: peerRecords, + serverStreamingDisabled: true, + expJSONResponse: true, + }, + { + name: "client requires streaming but server doesn't support it", + serverStreamingDisabled: true, + clientRequiresStreaming: true, + expErrContains: osErrContains{expContains: "HTTP error with StatusCode=400: no supported content types"}, + }, + { + name: "returns an error if there's a non-200 response", + httpStatusCode: 500, + expErrContains: osErrContains{expContains: "HTTP error with StatusCode=500"}, + }, + { + name: "returns an error if the HTTP client returns a non-HTTP error", + stopServer: true, + expErrContains: osErrContains{ + expContains: "connect: connection refused", + expContainsWin: "connectex: No connection could be made because the target machine actively refused it.", + }, + }, + { + name: "returns no providers if the HTTP server returns a 404 respones", + httpStatusCode: 404, + expResult: nil, + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + var ( + clientOpts []Option + serverOpts []server.Option + onRespReceived []func(*http.Response) + onReqReceived []func(*http.Request) + ) + + if c.serverStreamingDisabled { + serverOpts = append(serverOpts, server.WithStreamingResultsDisabled()) + } + + if c.clientRequiresStreaming { + clientOpts = append(clientOpts, WithStreamResultsRequired()) + onReqReceived = append(onReqReceived, func(r *http.Request) { + assert.Equal(t, mediaTypeNDJSON, r.Header.Get("Accept")) + }) + } + + if c.expStreamingResponse { + onRespReceived = append(onRespReceived, func(r *http.Response) { + assert.Equal(t, mediaTypeNDJSON, r.Header.Get("Content-Type")) + }) + } + + if c.expJSONResponse { + onRespReceived = append(onRespReceived, func(r *http.Response) { + assert.Equal(t, mediaTypeJSON, r.Header.Get("Content-Type")) + }) + } + + deps := makeTestDeps(t, clientOpts, serverOpts) + + deps.recordingHTTPClient.f = append(deps.recordingHTTPClient.f, onRespReceived...) + deps.recordingHandler.f = append(deps.recordingHandler.f, onReqReceived...) + + client := deps.client + router := deps.router + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + if c.httpStatusCode != 0 { + deps.server.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(c.httpStatusCode) + }) + } + + if c.stopServer { + deps.server.Close() + } + + routerResultIter := iter.FromSlice(c.routerResult) + if c.expStreamingResponse { + router.On("FindPeers", mock.Anything, pid, 0).Return(routerResultIter, c.routerErr) + } else { + router.On("FindPeers", mock.Anything, pid, 20).Return(routerResultIter, c.routerErr) + } + + resultIter, err := client.FindPeers(ctx, pid) + c.expErrContains.errContains(t, err) + + results := iter.ReadAll[iter.Result[types.Record]](resultIter) + assert.Equal(t, c.expResult, results) + }) + } +} + func makeName(t *testing.T) (crypto.PrivKey, ipns.Name) { sk, _, err := crypto.GenerateEd25519Key(rand.Reader) require.NoError(t, err) @@ -492,9 +648,9 @@ func TestClient_IPNS(t *testing.T) { client := deps.client router := deps.router - router.On("FindIPNSRecord", mock.Anything, name).Return(nil, errors.New("something wrong happened")) + router.On("GetIPNS", mock.Anything, name).Return(nil, errors.New("something wrong happened")) - receivedRecord, err := client.FindIPNSRecord(context.Background(), name) + receivedRecord, err := client.GetIPNS(context.Background(), name) require.Error(t, err) require.Nil(t, receivedRecord) }) @@ -508,9 +664,9 @@ func TestClient_IPNS(t *testing.T) { client := deps.client router := deps.router - router.On("FindIPNSRecord", mock.Anything, name).Return(record, nil) + router.On("GetIPNS", mock.Anything, name).Return(record, nil) - receivedRecord, err := client.FindIPNSRecord(context.Background(), name) + receivedRecord, err := client.GetIPNS(context.Background(), name) require.NoError(t, err) require.Equal(t, record, receivedRecord) }) @@ -524,9 +680,9 @@ func TestClient_IPNS(t *testing.T) { client := deps.client router := deps.router - router.On("FindIPNSRecord", mock.Anything, name2).Return(record, nil) + router.On("GetIPNS", mock.Anything, name2).Return(record, nil) - receivedRecord, err := client.FindIPNSRecord(context.Background(), name2) + receivedRecord, err := client.GetIPNS(context.Background(), name2) require.Error(t, err) require.Nil(t, receivedRecord) }) @@ -539,9 +695,9 @@ func TestClient_IPNS(t *testing.T) { client := deps.client router := deps.router - router.On("ProvideIPNSRecord", mock.Anything, name, record).Return(nil) + router.On("PutIPNS", mock.Anything, name, record).Return(nil) - err := client.ProvideIPNSRecord(context.Background(), name, record) + err := client.PutIPNS(context.Background(), name, record) require.NoError(t, err) }) } diff --git a/routing/http/contentrouter/contentrouter.go b/routing/http/contentrouter/contentrouter.go index 8318a3163..2438d4fea 100644 --- a/routing/http/contentrouter/contentrouter.go +++ b/routing/http/contentrouter/contentrouter.go @@ -3,26 +3,32 @@ package contentrouter import ( "context" "reflect" + "strings" "time" + "github.com/ipfs/boxo/ipns" "github.com/ipfs/boxo/routing/http/internal" "github.com/ipfs/boxo/routing/http/types" "github.com/ipfs/boxo/routing/http/types/iter" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" + routinghelpers "github.com/libp2p/go-libp2p-routing-helpers" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/routing" "github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multihash" ) -var logger = logging.Logger("service/contentrouting") +var logger = logging.Logger("routing/http/contentrouter") const ttl = 24 * time.Hour type Client interface { + FindProviders(ctx context.Context, key cid.Cid) (iter.ResultIter[types.Record], error) ProvideBitswap(ctx context.Context, keys []cid.Cid, ttl time.Duration) (time.Duration, error) - FindProviders(ctx context.Context, key cid.Cid) (iter.ResultIter[types.ProviderResponse], error) + FindPeers(ctx context.Context, pid peer.ID) (peers iter.ResultIter[types.Record], err error) + GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) + PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Record) error } type contentRouter struct { @@ -32,6 +38,10 @@ type contentRouter struct { } var _ routing.ContentRouting = (*contentRouter)(nil) +var _ routing.PeerRouting = (*contentRouter)(nil) +var _ routing.ValueStore = (*contentRouter)(nil) +var _ routinghelpers.ProvideManyRouter = (*contentRouter)(nil) +var _ routinghelpers.ReadyAbleRouter = (*contentRouter)(nil) type option func(c *contentRouter) @@ -60,8 +70,7 @@ func NewContentRoutingClient(c Client, opts ...option) *contentRouter { } func (c *contentRouter) Provide(ctx context.Context, key cid.Cid, announce bool) error { - // If 'true' is - // passed, it also announces it, otherwise it is just kept in the local + // If 'true' is passed, it also announces it, otherwise it is just kept in the local // accounting of which objects are being provided. if !announce { return nil @@ -73,7 +82,7 @@ func (c *contentRouter) Provide(ctx context.Context, key cid.Cid, announce bool) // ProvideMany provides a set of keys to the remote delegate. // Large sets of keys are chunked into multiple requests and sent concurrently, according to the concurrency configuration. -// TODO: implement retries through transient errors +// TODO: switch to use [client.Provide] when ready. func (c *contentRouter) ProvideMany(ctx context.Context, mhKeys []multihash.Multihash) error { keys := make([]cid.Cid, 0, len(mhKeys)) for _, m := range mhKeys { @@ -97,13 +106,14 @@ func (c *contentRouter) ProvideMany(ctx context.Context, mhKeys []multihash.Mult ) } -// Ready is part of the existing `ProvideMany` interface. +// Ready is part of the existing [routing.ReadyAbleRouter] interface. func (c *contentRouter) Ready() bool { return true } -// readProviderResponses reads bitswap records from the iterator into the given channel, dropping non-bitswap records. -func readProviderResponses(iter iter.ResultIter[types.ProviderResponse], ch chan<- peer.AddrInfo) { +// readProviderResponses reads peer records (and bitswap records for legacy +// compatibility) from the iterator into the given channel. +func readProviderResponses(iter iter.ResultIter[types.Record], ch chan<- peer.AddrInfo) { defer close(ch) defer iter.Close() for iter.Next() { @@ -113,8 +123,31 @@ func readProviderResponses(iter iter.ResultIter[types.ProviderResponse], ch chan continue } v := res.Val - if v.GetSchema() == types.SchemaBitswap { - result, ok := v.(*types.ReadBitswapProviderRecord) + switch v.GetSchema() { + case types.SchemaPeer: + result, ok := v.(*types.PeerRecord) + if !ok { + logger.Errorw( + "problem casting find providers result", + "Schema", v.GetSchema(), + "Type", reflect.TypeOf(v).String(), + ) + continue + } + + var addrs []multiaddr.Multiaddr + for _, a := range result.Addrs { + addrs = append(addrs, a.Multiaddr) + } + + ch <- peer.AddrInfo{ + ID: *result.ID, + Addrs: addrs, + } + //lint:ignore SA1019 // ignore staticcheck + case types.SchemaBitswap: + //lint:ignore SA1019 // ignore staticcheck + result, ok := v.(*types.BitswapRecord) if !ok { logger.Errorw( "problem casting find providers result", @@ -149,3 +182,111 @@ func (c *contentRouter) FindProvidersAsync(ctx context.Context, key cid.Cid, num go readProviderResponses(resultsIter, ch) return ch } + +func (c *contentRouter) FindPeer(ctx context.Context, pid peer.ID) (peer.AddrInfo, error) { + iter, err := c.client.FindPeers(ctx, pid) + if err != nil { + return peer.AddrInfo{}, err + } + defer iter.Close() + + for iter.Next() { + res := iter.Val() + if res.Err != nil { + logger.Warnw("error iterating provider responses: %s", res.Err) + continue + } + v := res.Val + if v.GetSchema() == types.SchemaPeer { + result, ok := v.(*types.PeerRecord) + if !ok { + logger.Errorw( + "problem casting find providers result", + "Schema", v.GetSchema(), + "Type", reflect.TypeOf(v).String(), + ) + continue + } + + var addrs []multiaddr.Multiaddr + for _, a := range result.Addrs { + addrs = append(addrs, a.Multiaddr) + } + + return peer.AddrInfo{ + ID: *result.ID, + Addrs: addrs, + }, nil + } + } + + return peer.AddrInfo{}, err +} + +func (c *contentRouter) PutValue(ctx context.Context, key string, data []byte, opts ...routing.Option) error { + if !strings.HasPrefix(key, "/ipns/") { + return routing.ErrNotSupported + } + + name, err := ipns.NameFromRoutingKey([]byte(key)) + if err != nil { + return err + } + + record, err := ipns.UnmarshalRecord(data) + if err != nil { + return err + } + + return c.client.PutIPNS(ctx, name, record) +} + +func (c *contentRouter) GetValue(ctx context.Context, key string, opts ...routing.Option) ([]byte, error) { + if !strings.HasPrefix(key, "/ipns/") { + return nil, routing.ErrNotSupported + } + + name, err := ipns.NameFromRoutingKey([]byte(key)) + if err != nil { + return nil, err + } + + record, err := c.client.GetIPNS(ctx, name) + if err != nil { + return nil, err + } + + return ipns.MarshalRecord(record) +} + +func (c *contentRouter) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) { + if !strings.HasPrefix(key, "/ipns/") { + return nil, routing.ErrNotSupported + } + + name, err := ipns.NameFromRoutingKey([]byte(key)) + if err != nil { + return nil, err + } + + ch := make(chan []byte) + + go func() { + record, err := c.client.GetIPNS(ctx, name) + if err != nil { + close(ch) + return + } + + raw, err := ipns.MarshalRecord(record) + if err != nil { + close(ch) + return + } + + ch <- raw + close(ch) + }() + + return ch, nil +} diff --git a/routing/http/contentrouter/contentrouter_test.go b/routing/http/contentrouter/contentrouter_test.go index 3830482e2..83a086997 100644 --- a/routing/http/contentrouter/contentrouter_test.go +++ b/routing/http/contentrouter/contentrouter_test.go @@ -6,10 +6,15 @@ import ( "testing" "time" + "github.com/ipfs/boxo/coreiface/path" + "github.com/ipfs/boxo/ipns" + ipfspath "github.com/ipfs/boxo/path" "github.com/ipfs/boxo/routing/http/types" "github.com/ipfs/boxo/routing/http/types/iter" "github.com/ipfs/go-cid" + "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/routing" "github.com/multiformats/go-multihash" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -23,9 +28,14 @@ func (m *mockClient) ProvideBitswap(ctx context.Context, keys []cid.Cid, ttl tim return args.Get(0).(time.Duration), args.Error(1) } -func (m *mockClient) FindProviders(ctx context.Context, key cid.Cid) (iter.ResultIter[types.ProviderResponse], error) { +func (m *mockClient) FindProviders(ctx context.Context, key cid.Cid) (iter.ResultIter[types.Record], error) { args := m.Called(ctx, key) - return args.Get(0).(iter.ResultIter[types.ProviderResponse]), args.Error(1) + return args.Get(0).(iter.ResultIter[types.Record]), args.Error(1) +} + +func (m *mockClient) FindPeers(ctx context.Context, pid peer.ID) (iter.ResultIter[types.Record], error) { + args := m.Called(ctx, pid) + return args.Get(0).(iter.ResultIter[types.Record]), args.Error(1) } func (m *mockClient) Ready(ctx context.Context) (bool, error) { @@ -33,18 +43,14 @@ func (m *mockClient) Ready(ctx context.Context) (bool, error) { return args.Bool(0), args.Error(1) } -func makeCID() cid.Cid { - buf := make([]byte, 63) - _, err := rand.Read(buf) - if err != nil { - panic(err) - } - mh, err := multihash.Encode(buf, multihash.SHA2_256) - if err != nil { - panic(err) - } - c := cid.NewCidV1(cid.Raw, mh) - return c +func (m *mockClient) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) { + args := m.Called(ctx, name) + return args.Get(0).(*ipns.Record), args.Error(1) +} + +func (m *mockClient) PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Record) error { + args := m.Called(ctx, name, record) + return args.Error(0) } func TestProvide(t *testing.T) { @@ -100,6 +106,20 @@ func TestProvideMany(t *testing.T) { require.NoError(t, err) } +func makeCID() cid.Cid { + buf := make([]byte, 63) + _, err := rand.Read(buf) + if err != nil { + panic(err) + } + mh, err := multihash.Encode(buf, multihash.SHA2_256) + if err != nil { + panic(err) + } + c := cid.NewCidV1(cid.Raw, mh) + return c +} + func TestFindProvidersAsync(t *testing.T) { key := makeCID() ctx := context.Background() @@ -108,26 +128,40 @@ func TestFindProvidersAsync(t *testing.T) { p1 := peer.ID("peer1") p2 := peer.ID("peer2") - ais := []types.ProviderResponse{ - &types.ReadBitswapProviderRecord{ - Protocol: "transport-bitswap", - Schema: types.SchemaBitswap, - ID: &p1, + p3 := peer.ID("peer3") + p4 := peer.ID("peer4") + ais := []types.Record{ + &types.PeerRecord{ + Schema: types.SchemaPeer, + ID: &p1, + Protocols: []string{"transport-bitswap"}, }, - &types.ReadBitswapProviderRecord{ - Protocol: "transport-bitswap", + //lint:ignore SA1019 // ignore staticcheck + &types.BitswapRecord{ + //lint:ignore SA1019 // ignore staticcheck Schema: types.SchemaBitswap, ID: &p2, + Protocol: "transport-bitswap", + }, + &types.PeerRecord{ + Schema: types.SchemaPeer, + ID: &p3, + Protocols: []string{"transport-bitswap"}, + }, + &types.PeerRecord{ + Schema: types.SchemaPeer, + ID: &p4, + Protocols: []string{"transport-horse"}, }, - &types.UnknownProviderRecord{ - Protocol: "UNKNOWN", + &types.UnknownRecord{ + Schema: "unknown", }, } - aisIter := iter.ToResultIter[types.ProviderResponse](iter.FromSlice(ais)) + aisIter := iter.ToResultIter[types.Record](iter.FromSlice(ais)) client.On("FindProviders", ctx, key).Return(aisIter, nil) - aiChan := crc.FindProvidersAsync(ctx, key, 2) + aiChan := crc.FindProvidersAsync(ctx, key, 3) var actualAIs []peer.AddrInfo for ai := range aiChan { @@ -137,7 +171,118 @@ func TestFindProvidersAsync(t *testing.T) { expected := []peer.AddrInfo{ {ID: p1}, {ID: p2}, + {ID: p3}, + {ID: p4}, } require.Equal(t, expected, actualAIs) } + +func TestFindPeer(t *testing.T) { + ctx := context.Background() + client := &mockClient{} + crc := NewContentRoutingClient(client) + + p1 := peer.ID("peer1") + ais := []types.Record{ + &types.UnknownRecord{ + Schema: "unknown", + }, + &types.PeerRecord{ + Schema: types.SchemaPeer, + ID: &p1, + Protocols: []string{"transport-bitswap"}, + }, + } + aisIter := iter.ToResultIter[types.Record](iter.FromSlice(ais)) + + client.On("FindPeers", ctx, p1).Return(aisIter, nil) + + peer, err := crc.FindPeer(ctx, p1) + require.NoError(t, err) + require.Equal(t, peer.ID, p1) +} + +func makeName(t *testing.T) (crypto.PrivKey, ipns.Name) { + sk, _, err := crypto.GenerateEd25519Key(rand.Reader) + require.NoError(t, err) + + pid, err := peer.IDFromPrivateKey(sk) + require.NoError(t, err) + + return sk, ipns.NameFromPeer(pid) +} + +func makeIPNSRecord(t *testing.T, sk crypto.PrivKey, opts ...ipns.Option) (*ipns.Record, []byte) { + cid, err := cid.Decode("bafkreifjjcie6lypi6ny7amxnfftagclbuxndqonfipmb64f2km2devei4") + require.NoError(t, err) + + path := path.IpfsPath(cid) + eol := time.Now().Add(time.Hour * 48) + ttl := time.Second * 20 + + record, err := ipns.NewRecord(sk, ipfspath.FromString(path.String()), 1, eol, ttl, opts...) + require.NoError(t, err) + + rawRecord, err := ipns.MarshalRecord(record) + require.NoError(t, err) + + return record, rawRecord +} + +func TestGetValue(t *testing.T) { + ctx := context.Background() + client := &mockClient{} + crc := NewContentRoutingClient(client) + + t.Run("Fail On Unsupported Key", func(t *testing.T) { + v, err := crc.GetValue(ctx, "/something/unsupported") + require.Nil(t, v) + require.ErrorIs(t, err, routing.ErrNotSupported) + }) + + t.Run("Fail On Invalid IPNS Name", func(t *testing.T) { + v, err := crc.GetValue(ctx, "/ipns/invalid") + require.Nil(t, v) + require.Error(t, err) + }) + + t.Run("Succeeds On Valid IPNS Name", func(t *testing.T) { + sk, name := makeName(t) + rec, rawRec := makeIPNSRecord(t, sk) + client.On("GetIPNS", ctx, name).Return(rec, nil) + v, err := crc.GetValue(ctx, string(name.RoutingKey())) + require.NoError(t, err) + require.Equal(t, rawRec, v) + }) +} + +func TestPutValue(t *testing.T) { + ctx := context.Background() + client := &mockClient{} + crc := NewContentRoutingClient(client) + + sk, name := makeName(t) + _, rawRec := makeIPNSRecord(t, sk) + + t.Run("Fail On Unsupported Key", func(t *testing.T) { + err := crc.PutValue(ctx, "/something/unsupported", rawRec) + require.ErrorIs(t, err, routing.ErrNotSupported) + }) + + t.Run("Fail On Invalid IPNS Name", func(t *testing.T) { + err := crc.PutValue(ctx, "/ipns/invalid", rawRec) + require.Error(t, err) + }) + + t.Run("Fail On Invalid IPNS Record", func(t *testing.T) { + err := crc.PutValue(ctx, string(name.RoutingKey()), []byte("gibberish")) + require.Error(t, err) + }) + + t.Run("Succeeds On Valid IPNS Name & Record", func(t *testing.T) { + client.On("PutIPNS", ctx, name, mock.Anything).Return(nil) + err := crc.PutValue(ctx, string(name.RoutingKey()), rawRec) + require.NoError(t, err) + }) +} diff --git a/routing/http/server/server.go b/routing/http/server/server.go index 835262990..9e7d81a04 100644 --- a/routing/http/server/server.go +++ b/routing/http/server/server.go @@ -37,34 +37,45 @@ const ( DefaultStreamingRecordsLimit = 0 ) -var logger = logging.Logger("service/server/delegatedrouting") +var logger = logging.Logger("routing/http/server") const ( - ProvidePath = "/routing/v1/providers/" - FindProvidersPath = "/routing/v1/providers/{cid}" - IPNSPath = "/routing/v1/ipns/{cid}" + providePath = "/routing/v1/providers/" + findProvidersPath = "/routing/v1/providers/{cid}" + findPeersPath = "/routing/v1/peers/{peer-id}" + GetIPNSPath = "/routing/v1/ipns/{cid}" ) type FindProvidersAsyncResponse struct { - ProviderResponse types.ProviderResponse + ProviderResponse types.Record Error error } type ContentRouter interface { - // FindProviders searches for peers who are able to provide a given key. Limit - // indicates the maximum amount of results to return. 0 means unbounded. - FindProviders(ctx context.Context, key cid.Cid, limit int) (iter.ResultIter[types.ProviderResponse], error) + // FindProviders searches for peers who are able to provide the given [cid.Cid]. + // Limit indicates the maximum amount of results to return; 0 means unbounded. + FindProviders(ctx context.Context, cid cid.Cid, limit int) (iter.ResultIter[types.Record], error) + + // Deprecated: protocol-agnostic provide is being worked on in [IPIP-378]: + // + // [IPIP-378]: https://github.com/ipfs/specs/pull/378 ProvideBitswap(ctx context.Context, req *BitswapWriteProvideRequest) (time.Duration, error) - Provide(ctx context.Context, req *WriteProvideRequest) (types.ProviderResponse, error) - // FindIPNSRecord searches for an [ipns.Record] for the given [ipns.Name]. - FindIPNSRecord(ctx context.Context, name ipns.Name) (*ipns.Record, error) + // FindPeers searches for peers who have the provided [peer.ID]. + // Limit indicates the maximum amount of results to return; 0 means unbounded. + FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[types.Record], error) + + // GetIPNS searches for an [ipns.Record] for the given [ipns.Name]. + GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) - // ProvideIPNSRecord stores the provided [ipns.Record] for the given [ipns.Name]. It is - // guaranteed that the record matches the provided name. - ProvideIPNSRecord(ctx context.Context, name ipns.Name, record *ipns.Record) error + // PutIPNS stores the provided [ipns.Record] for the given [ipns.Name]. + // It is guaranteed that the record matches the provided name. + PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Record) error } +// Deprecated: protocol-agnostic provide is being worked on in [IPIP-378]: +// +// [IPIP-378]: https://github.com/ipfs/specs/pull/378 type BitswapWriteProvideRequest struct { Keys []cid.Cid Timestamp time.Time @@ -73,6 +84,9 @@ type BitswapWriteProvideRequest struct { Addrs []multiaddr.Multiaddr } +// Deprecated: protocol-agnostic provide is being worked on in [IPIP-378]: +// +// [IPIP-378]: https://github.com/ipfs/specs/pull/378 type WriteProvideRequest struct { Protocol string Schema string @@ -88,16 +102,18 @@ func WithStreamingResultsDisabled() Option { } } -// WithRecordsLimit sets a limit that will be passed to ContentRouter.FindProviders -// for non-streaming requests (application/json). Default is DefaultRecordsLimit. +// WithRecordsLimit sets a limit that will be passed to [ContentRouter.FindProviders] +// and [ContentRouter.FindPeers] for non-streaming requests (application/json). +// Default is [DefaultRecordsLimit]. func WithRecordsLimit(limit int) Option { return func(s *server) { s.recordsLimit = limit } } -// WithStreamingRecordsLimit sets a limit that will be passed to ContentRouter.FindProviders -// for streaming requests (application/x-ndjson). Default is DefaultStreamingRecordsLimit. +// WithStreamingRecordsLimit sets a limit that will be passed to [ContentRouter.FindProviders] +// and [ContentRouter.FindPeers] for streaming requests (application/x-ndjson). +// Default is [DefaultStreamingRecordsLimit]. func WithStreamingRecordsLimit(limit int) Option { return func(s *server) { s.streamingRecordsLimit = limit @@ -116,12 +132,11 @@ func Handler(svc ContentRouter, opts ...Option) http.Handler { } r := mux.NewRouter() - r.HandleFunc(ProvidePath, server.provide).Methods(http.MethodPut) - r.HandleFunc(FindProvidersPath, server.findProviders).Methods(http.MethodGet) - - r.HandleFunc(IPNSPath, server.getIPNSRecord).Methods(http.MethodGet) - r.HandleFunc(IPNSPath, server.putIPNSRecord).Methods(http.MethodPut) - + r.HandleFunc(findProvidersPath, server.findProviders).Methods(http.MethodGet) + r.HandleFunc(providePath, server.provide).Methods(http.MethodPut) + r.HandleFunc(findPeersPath, server.findPeers).Methods(http.MethodGet) + r.HandleFunc(GetIPNSPath, server.GetIPNS).Methods(http.MethodGet) + r.HandleFunc(GetIPNSPath, server.PutIPNS).Methods(http.MethodPut) return r } @@ -132,7 +147,149 @@ type server struct { streamingRecordsLimit int } +func (s *server) detectResponseType(r *http.Request) (string, error) { + var ( + supportsNDJSON bool + supportsJSON bool + + acceptHeaders = r.Header.Values("Accept") + ) + + if len(acceptHeaders) == 0 { + return mediaTypeJSON, nil + } + + for _, acceptHeader := range acceptHeaders { + for _, accept := range strings.Split(acceptHeader, ",") { + mediaType, _, err := mime.ParseMediaType(accept) + if err != nil { + return "", fmt.Errorf("unable to parse Accept header: %w", err) + } + + switch mediaType { + case mediaTypeJSON, mediaTypeWildcard: + supportsJSON = true + case mediaTypeNDJSON: + supportsNDJSON = true + } + } + } + + if supportsNDJSON && !s.disableNDJSON { + return mediaTypeNDJSON, nil + } else if supportsJSON { + return mediaTypeJSON, nil + } else { + return "", errors.New("no supported content types") + } +} + +func (s *server) findProviders(w http.ResponseWriter, httpReq *http.Request) { + vars := mux.Vars(httpReq) + cidStr := vars["cid"] + cid, err := cid.Decode(cidStr) + if err != nil { + writeErr(w, "FindProviders", http.StatusBadRequest, fmt.Errorf("unable to parse CID: %w", err)) + return + } + + mediaType, err := s.detectResponseType(httpReq) + if err != nil { + writeErr(w, "FindProviders", http.StatusBadRequest, err) + return + } + + var ( + handlerFunc func(w http.ResponseWriter, provIter iter.ResultIter[types.Record]) + recordsLimit int + ) + + if mediaType == mediaTypeNDJSON { + handlerFunc = s.findProvidersNDJSON + recordsLimit = s.streamingRecordsLimit + } else { + handlerFunc = s.findProvidersJSON + recordsLimit = s.recordsLimit + } + + provIter, err := s.svc.FindProviders(httpReq.Context(), cid, recordsLimit) + if err != nil { + writeErr(w, "FindProviders", http.StatusInternalServerError, fmt.Errorf("delegate error: %w", err)) + return + } + + handlerFunc(w, provIter) +} + +func (s *server) findProvidersJSON(w http.ResponseWriter, provIter iter.ResultIter[types.Record]) { + defer provIter.Close() + + providers, err := iter.ReadAllResults(provIter) + if err != nil { + writeErr(w, "FindProviders", http.StatusInternalServerError, fmt.Errorf("delegate error: %w", err)) + return + } + + writeJSONResult(w, "FindProviders", jsontypes.ProvidersResponse{ + Providers: providers, + }) +} + +func (s *server) findProvidersNDJSON(w http.ResponseWriter, provIter iter.ResultIter[types.Record]) { + writeResultsIterNDJSON(w, provIter) +} + +func (s *server) findPeers(w http.ResponseWriter, r *http.Request) { + pidStr := mux.Vars(r)["peer-id"] + + // pidStr must be in CIDv1 format. Therefore, use [cid.Decode]. We can't use + // [peer.Decode] because that would allow other formats to pass through. + cid, err := cid.Decode(pidStr) + if err != nil { + if pid, err := peer.Decode(pidStr); err == nil { + writeErr(w, "FindPeers", http.StatusBadRequest, fmt.Errorf("the value is a peer ID, try using its CID representation: %s", peer.ToCid(pid).String())) + } else { + writeErr(w, "FindPeers", http.StatusBadRequest, fmt.Errorf("unable to parse peer ID: %w", err)) + } + return + } + + pid, err := peer.FromCid(cid) + if err != nil { + writeErr(w, "FindPeers", http.StatusBadRequest, fmt.Errorf("unable to parse peer ID: %w", err)) + return + } + + mediaType, err := s.detectResponseType(r) + if err != nil { + writeErr(w, "FindPeers", http.StatusBadRequest, err) + return + } + + var ( + handlerFunc func(w http.ResponseWriter, provIter iter.ResultIter[types.Record]) + recordsLimit int + ) + + if mediaType == mediaTypeNDJSON { + handlerFunc = s.findPeersNDJSON + recordsLimit = s.streamingRecordsLimit + } else { + handlerFunc = s.findPeersJSON + recordsLimit = s.recordsLimit + } + + provIter, err := s.svc.FindPeers(r.Context(), pid, recordsLimit) + if err != nil { + writeErr(w, "FindPeers", http.StatusInternalServerError, fmt.Errorf("delegate error: %w", err)) + return + } + + handlerFunc(w, provIter) +} + func (s *server) provide(w http.ResponseWriter, httpReq *http.Request) { + //lint:ignore SA1019 // ignore staticcheck req := jsontypes.WriteProvidersRequest{} err := json.NewDecoder(httpReq.Body).Decode(&req) _ = httpReq.Body.Close() @@ -141,11 +298,13 @@ func (s *server) provide(w http.ResponseWriter, httpReq *http.Request) { return } + //lint:ignore SA1019 // ignore staticcheck resp := jsontypes.WriteProvidersResponse{} for i, prov := range req.Providers { switch v := prov.(type) { - case *types.WriteBitswapProviderRecord: + //lint:ignore SA1019 // ignore staticcheck + case *types.WriteBitswapRecord: err := v.Verify() if err != nil { logErr("Provide", "signature verification failed", err) @@ -173,148 +332,42 @@ func (s *server) provide(w http.ResponseWriter, httpReq *http.Request) { return } resp.ProvideResults = append(resp.ProvideResults, - &types.WriteBitswapProviderRecordResponse{ + //lint:ignore SA1019 // ignore staticcheck + &types.WriteBitswapRecordResponse{ Protocol: v.Protocol, Schema: v.Schema, AdvisoryTTL: &types.Duration{Duration: advisoryTTL}, }, ) - case *types.UnknownProviderRecord: - provResp, err := s.svc.Provide(httpReq.Context(), &WriteProvideRequest{ - Protocol: v.Protocol, - Schema: v.Schema, - Bytes: v.Bytes, - }) - if err != nil { - writeErr(w, "Provide", http.StatusInternalServerError, fmt.Errorf("delegate error: %w", err)) - return - } - resp.ProvideResults = append(resp.ProvideResults, provResp) default: - writeErr(w, "Provide", http.StatusBadRequest, fmt.Errorf("provider record %d does not contain a protocol", i)) + writeErr(w, "Provide", http.StatusBadRequest, fmt.Errorf("provider record %d is not bitswap", i)) return } } writeJSONResult(w, "Provide", resp) } -func (s *server) findProviders(w http.ResponseWriter, httpReq *http.Request) { - vars := mux.Vars(httpReq) - cidStr := vars["cid"] - cid, err := cid.Decode(cidStr) - if err != nil { - writeErr(w, "FindProviders", http.StatusBadRequest, fmt.Errorf("unable to parse CID: %w", err)) - return - } - - var handlerFunc func(w http.ResponseWriter, provIter iter.ResultIter[types.ProviderResponse]) - - var supportsNDJSON bool - var supportsJSON bool - var recordsLimit int - acceptHeaders := httpReq.Header.Values("Accept") - if len(acceptHeaders) == 0 { - handlerFunc = s.findProvidersJSON - recordsLimit = s.recordsLimit - } else { - for _, acceptHeader := range acceptHeaders { - for _, accept := range strings.Split(acceptHeader, ",") { - mediaType, _, err := mime.ParseMediaType(accept) - if err != nil { - writeErr(w, "FindProviders", http.StatusBadRequest, fmt.Errorf("unable to parse Accept header: %w", err)) - return - } - - switch mediaType { - case mediaTypeJSON, mediaTypeWildcard: - supportsJSON = true - case mediaTypeNDJSON: - supportsNDJSON = true - } - } - } - - if supportsNDJSON && !s.disableNDJSON { - handlerFunc = s.findProvidersNDJSON - recordsLimit = s.streamingRecordsLimit - } else if supportsJSON { - handlerFunc = s.findProvidersJSON - recordsLimit = s.recordsLimit - } else { - writeErr(w, "FindProviders", http.StatusBadRequest, errors.New("no supported content types")) - return - } - } +func (s *server) findPeersJSON(w http.ResponseWriter, peersIter iter.ResultIter[types.Record]) { + defer peersIter.Close() - provIter, err := s.svc.FindProviders(httpReq.Context(), cid, recordsLimit) + peers, err := iter.ReadAllResults(peersIter) if err != nil { - writeErr(w, "FindProviders", http.StatusInternalServerError, fmt.Errorf("delegate error: %w", err)) + writeErr(w, "FindPeers", http.StatusInternalServerError, fmt.Errorf("delegate error: %w", err)) return } - handlerFunc(w, provIter) + writeJSONResult(w, "FindPeers", jsontypes.PeersResponse{ + Peers: peers, + }) } -func (s *server) findProvidersJSON(w http.ResponseWriter, provIter iter.ResultIter[types.ProviderResponse]) { - defer provIter.Close() - - var ( - providers []types.ProviderResponse - i int - ) - - for provIter.Next() { - res := provIter.Val() - if res.Err != nil { - writeErr(w, "FindProviders", http.StatusInternalServerError, fmt.Errorf("delegate error on result %d: %w", i, res.Err)) - return - } - providers = append(providers, res.Val) - i++ - } - response := jsontypes.ReadProvidersResponse{Providers: providers} - writeJSONResult(w, "FindProviders", response) -} - -func (s *server) findProvidersNDJSON(w http.ResponseWriter, provIter iter.ResultIter[types.ProviderResponse]) { - defer provIter.Close() - - w.Header().Set("Content-Type", mediaTypeNDJSON) - w.WriteHeader(http.StatusOK) - for provIter.Next() { - res := provIter.Val() - if res.Err != nil { - logger.Errorw("FindProviders ndjson iterator error", "Error", res.Err) - return - } - // don't use an encoder because we can't easily differentiate writer errors from encoding errors - b, err := drjson.MarshalJSONBytes(res.Val) - if err != nil { - logger.Errorw("FindProviders ndjson marshal error", "Error", err) - return - } - - _, err = w.Write(b) - if err != nil { - logger.Warn("FindProviders ndjson write error", "Error", err) - return - } - - _, err = w.Write([]byte{'\n'}) - if err != nil { - logger.Warn("FindProviders ndjson write error", "Error", err) - return - } - - if f, ok := w.(http.Flusher); ok { - f.Flush() - } - } +func (s *server) findPeersNDJSON(w http.ResponseWriter, peersIter iter.ResultIter[types.Record]) { + writeResultsIterNDJSON(w, peersIter) } -func (s *server) getIPNSRecord(w http.ResponseWriter, r *http.Request) { +func (s *server) GetIPNS(w http.ResponseWriter, r *http.Request) { if !strings.Contains(r.Header.Get("Accept"), mediaTypeIPNSRecord) { - writeErr(w, "GetIPNSRecord", http.StatusNotAcceptable, errors.New("content type in 'Accept' header is missing or not supported")) + writeErr(w, "GetIPNS", http.StatusNotAcceptable, errors.New("content type in 'Accept' header is missing or not supported")) return } @@ -322,25 +375,25 @@ func (s *server) getIPNSRecord(w http.ResponseWriter, r *http.Request) { cidStr := vars["cid"] cid, err := cid.Decode(cidStr) if err != nil { - writeErr(w, "GetIPNSRecord", http.StatusBadRequest, fmt.Errorf("unable to parse CID: %w", err)) + writeErr(w, "GetIPNS", http.StatusBadRequest, fmt.Errorf("unable to parse CID: %w", err)) return } name, err := ipns.NameFromCid(cid) if err != nil { - writeErr(w, "GetIPNSRecord", http.StatusBadRequest, fmt.Errorf("peer ID CID is not valid: %w", err)) + writeErr(w, "GetIPNS", http.StatusBadRequest, fmt.Errorf("peer ID CID is not valid: %w", err)) return } - record, err := s.svc.FindIPNSRecord(r.Context(), name) + record, err := s.svc.GetIPNS(r.Context(), name) if err != nil { - writeErr(w, "GetIPNSRecord", http.StatusInternalServerError, fmt.Errorf("delegate error: %w", err)) + writeErr(w, "GetIPNS", http.StatusInternalServerError, fmt.Errorf("delegate error: %w", err)) return } rawRecord, err := ipns.MarshalRecord(record) if err != nil { - writeErr(w, "GetIPNSRecord", http.StatusInternalServerError, err) + writeErr(w, "GetIPNS", http.StatusInternalServerError, err) return } @@ -356,9 +409,9 @@ func (s *server) getIPNSRecord(w http.ResponseWriter, r *http.Request) { w.Write(rawRecord) } -func (s *server) putIPNSRecord(w http.ResponseWriter, r *http.Request) { +func (s *server) PutIPNS(w http.ResponseWriter, r *http.Request) { if !strings.Contains(r.Header.Get("Content-Type"), mediaTypeIPNSRecord) { - writeErr(w, "PutIPNSRecord", http.StatusNotAcceptable, errors.New("content type in 'Content-Type' header is missing or not supported")) + writeErr(w, "PutIPNS", http.StatusNotAcceptable, errors.New("content type in 'Content-Type' header is missing or not supported")) return } @@ -366,38 +419,38 @@ func (s *server) putIPNSRecord(w http.ResponseWriter, r *http.Request) { cidStr := vars["cid"] cid, err := cid.Decode(cidStr) if err != nil { - writeErr(w, "PutIPNSRecord", http.StatusBadRequest, fmt.Errorf("unable to parse CID: %w", err)) + writeErr(w, "PutIPNS", http.StatusBadRequest, fmt.Errorf("unable to parse CID: %w", err)) return } name, err := ipns.NameFromCid(cid) if err != nil { - writeErr(w, "PutIPNSRecord", http.StatusBadRequest, fmt.Errorf("peer ID CID is not valid: %w", err)) + writeErr(w, "PutIPNS", http.StatusBadRequest, fmt.Errorf("peer ID CID is not valid: %w", err)) return } // Limit the reader to the maximum record size. rawRecord, err := io.ReadAll(io.LimitReader(r.Body, int64(ipns.MaxRecordSize))) if err != nil { - writeErr(w, "PutIPNSRecord", http.StatusBadRequest, fmt.Errorf("provided record is too long: %w", err)) + writeErr(w, "PutIPNS", http.StatusBadRequest, fmt.Errorf("provided record is too long: %w", err)) return } record, err := ipns.UnmarshalRecord(rawRecord) if err != nil { - writeErr(w, "PutIPNSRecord", http.StatusBadRequest, fmt.Errorf("provided record is invalid: %w", err)) + writeErr(w, "PutIPNS", http.StatusBadRequest, fmt.Errorf("provided record is invalid: %w", err)) return } err = ipns.ValidateWithName(record, name) if err != nil { - writeErr(w, "PutIPNSRecord", http.StatusBadRequest, fmt.Errorf("provided record is invalid: %w", err)) + writeErr(w, "PutIPNS", http.StatusBadRequest, fmt.Errorf("provided record is invalid: %w", err)) return } - err = s.svc.ProvideIPNSRecord(r.Context(), name, record) + err = s.svc.PutIPNS(r.Context(), name, record) if err != nil { - writeErr(w, "PutIPNSRecord", http.StatusInternalServerError, fmt.Errorf("delegate error: %w", err)) + writeErr(w, "PutIPNS", http.StatusInternalServerError, fmt.Errorf("delegate error: %w", err)) return } @@ -437,3 +490,40 @@ func writeErr(w http.ResponseWriter, method string, statusCode int, cause error) func logErr(method, msg string, err error) { logger.Infow(msg, "Method", method, "Error", err) } + +func writeResultsIterNDJSON(w http.ResponseWriter, resultIter iter.ResultIter[types.Record]) { + defer resultIter.Close() + + w.Header().Set("Content-Type", mediaTypeNDJSON) + w.WriteHeader(http.StatusOK) + + for resultIter.Next() { + res := resultIter.Val() + if res.Err != nil { + logger.Errorw("ndjson iterator error", "Error", res.Err) + return + } + // don't use an encoder because we can't easily differentiate writer errors from encoding errors + b, err := drjson.MarshalJSONBytes(res.Val) + if err != nil { + logger.Errorw("ndjson marshal error", "Error", err) + return + } + + _, err = w.Write(b) + if err != nil { + logger.Warn("ndjson write error", "Error", err) + return + } + + _, err = w.Write([]byte{'\n'}) + if err != nil { + logger.Warn("ndjson write error", "Error", err) + return + } + + if f, ok := w.(http.Flusher); ok { + f.Flush() + } + } +} diff --git a/routing/http/server/server_test.go b/routing/http/server/server_test.go index dfe38f0da..f6d4a3dba 100644 --- a/routing/http/server/server_test.go +++ b/routing/http/server/server_test.go @@ -18,6 +18,7 @@ import ( "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" + b58 "github.com/mr-tron/base58/base58" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) @@ -28,12 +29,11 @@ func TestHeaders(t *testing.T) { t.Cleanup(server.Close) serverAddr := "http://" + server.Listener.Addr().String() - results := iter.FromSlice([]iter.Result[types.ProviderResponse]{ - {Val: &types.ReadBitswapProviderRecord{ - Protocol: "transport-bitswap", - Schema: types.SchemaBitswap, - }}, - }, + results := iter.FromSlice([]iter.Result[types.Record]{ + {Val: &types.PeerRecord{ + Schema: types.SchemaPeer, + Protocols: []string{"transport-bitswap"}, + }}}, ) c := "baeabep4vu3ceru7nerjjbk37sxb7wmftteve4hcosmyolsbsiubw2vr6pqzj6mw7kv6tbn6nqkkldnklbjgm5tzbi4hkpkled4xlcr7xz4bq" @@ -43,13 +43,13 @@ func TestHeaders(t *testing.T) { router.On("FindProviders", mock.Anything, cb, DefaultRecordsLimit). Return(results, nil) - resp, err := http.Get(serverAddr + ProvidePath + c) + resp, err := http.Get(serverAddr + "/routing/v1/providers/" + c) require.NoError(t, err) require.Equal(t, 200, resp.StatusCode) header := resp.Header.Get("Content-Type") require.Equal(t, mediaTypeJSON, header) - resp, err = http.Get(serverAddr + ProvidePath + "BAD_CID") + resp, err = http.Get(serverAddr + "/routing/v1/providers/" + "BAD_CID") require.NoError(t, err) defer resp.Body.Close() require.Equal(t, 400, resp.StatusCode) @@ -57,7 +57,17 @@ func TestHeaders(t *testing.T) { require.Equal(t, "text/plain; charset=utf-8", header) } -func TestResponse(t *testing.T) { +func makePeerID(t *testing.T) (crypto.PrivKey, peer.ID) { + sk, _, err := crypto.GenerateEd25519Key(rand.Reader) + require.NoError(t, err) + + pid, err := peer.IDFromPrivateKey(sk) + require.NoError(t, err) + + return sk, pid +} + +func TestProviders(t *testing.T) { pidStr := "12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vn" pid2Str := "12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vz" cidStr := "bafkreifjjcie6lypi6ny7amxnfftagclbuxndqonfipmb64f2km2devei4" @@ -73,20 +83,21 @@ func TestResponse(t *testing.T) { runTest := func(t *testing.T, contentType string, expectedStream bool, expectedBody string) { t.Parallel() - results := iter.FromSlice([]iter.Result[types.ProviderResponse]{ - {Val: &types.ReadBitswapProviderRecord{ - Protocol: "transport-bitswap", - Schema: types.SchemaBitswap, - ID: &pid, - Addrs: []types.Multiaddr{}, + results := iter.FromSlice([]iter.Result[types.Record]{ + {Val: &types.PeerRecord{ + Schema: types.SchemaPeer, + ID: &pid, + Protocols: []string{"transport-bitswap"}, + Addrs: []types.Multiaddr{}, }}, - {Val: &types.ReadBitswapProviderRecord{ - Protocol: "transport-bitswap", + //lint:ignore SA1019 // ignore staticcheck + {Val: &types.BitswapRecord{ + //lint:ignore SA1019 // ignore staticcheck Schema: types.SchemaBitswap, ID: &pid2, + Protocol: "transport-bitswap", Addrs: []types.Multiaddr{}, - }}, - }, + }}}, ) router := &mockContentRouter{} @@ -98,7 +109,7 @@ func TestResponse(t *testing.T) { limit = DefaultStreamingRecordsLimit } router.On("FindProviders", mock.Anything, cid, limit).Return(results, nil) - urlStr := serverAddr + ProvidePath + cidStr + urlStr := serverAddr + "/routing/v1/providers/" + cidStr req, err := http.NewRequest(http.MethodGet, urlStr, nil) require.NoError(t, err) @@ -113,25 +124,120 @@ func TestResponse(t *testing.T) { body, err := io.ReadAll(resp.Body) require.NoError(t, err) - require.Equal(t, string(body), expectedBody) + require.Equal(t, expectedBody, string(body)) } t.Run("JSON Response", func(t *testing.T) { - runTest(t, mediaTypeJSON, false, `{"Providers":[{"Protocol":"transport-bitswap","Schema":"bitswap","ID":"12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vn","Addrs":[]},{"Protocol":"transport-bitswap","Schema":"bitswap","ID":"12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vz","Addrs":[]}]}`) + runTest(t, mediaTypeJSON, false, `{"Providers":[{"Addrs":[],"ID":"12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vn","Protocols":["transport-bitswap"],"Schema":"peer"},{"Schema":"bitswap","Protocol":"transport-bitswap","ID":"12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vz"}]}`) }) t.Run("NDJSON Response", func(t *testing.T) { - runTest(t, mediaTypeNDJSON, true, `{"Protocol":"transport-bitswap","Schema":"bitswap","ID":"12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vn","Addrs":[]}`+"\n"+`{"Protocol":"transport-bitswap","Schema":"bitswap","ID":"12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vz","Addrs":[]}`+"\n") + runTest(t, mediaTypeNDJSON, true, `{"Addrs":[],"ID":"12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vn","Protocols":["transport-bitswap"],"Schema":"peer"}`+"\n"+`{"Schema":"bitswap","Protocol":"transport-bitswap","ID":"12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vz"}`+"\n") }) } -func makeName(t *testing.T) (crypto.PrivKey, ipns.Name) { - sk, _, err := crypto.GenerateEd25519Key(rand.Reader) - require.NoError(t, err) +func TestPeers(t *testing.T) { + makeRequest := func(t *testing.T, router *mockContentRouter, contentType, arg string) *http.Response { + server := httptest.NewServer(Handler(router)) + t.Cleanup(server.Close) + req, err := http.NewRequest(http.MethodGet, "http://"+server.Listener.Addr().String()+"/routing/v1/peers/"+arg, nil) + require.NoError(t, err) + req.Header.Set("Accept", contentType) + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + return resp + } - pid, err := peer.IDFromPrivateKey(sk) - require.NoError(t, err) + t.Run("GET /routing/v1/peers/{non-peer-cid} returns 400", func(t *testing.T) { + t.Parallel() + router := &mockContentRouter{} + resp := makeRequest(t, router, mediaTypeJSON, "bafkqaaa") + require.Equal(t, 400, resp.StatusCode) + }) + + t.Run("GET /routing/v1/peers/{base58-peer-id} returns 400", func(t *testing.T) { + t.Parallel() + + _, pid := makePeerID(t) + router := &mockContentRouter{} + resp := makeRequest(t, router, mediaTypeJSON, b58.Encode([]byte(pid))) + require.Equal(t, 400, resp.StatusCode) + }) + + t.Run("GET /routing/v1/peers/{cid-peer-id} returns 200 with correct body (JSON)", func(t *testing.T) { + t.Parallel() + + _, pid := makePeerID(t) + results := iter.FromSlice([]iter.Result[types.Record]{ + {Val: &types.PeerRecord{ + Schema: types.SchemaPeer, + ID: &pid, + Protocols: []string{"transport-bitswap", "transport-foo"}, + Addrs: []types.Multiaddr{}, + }}, + {Val: &types.PeerRecord{ + Schema: types.SchemaPeer, + ID: &pid, + Protocols: []string{"transport-foo"}, + Addrs: []types.Multiaddr{}, + }}, + }) + + router := &mockContentRouter{} + router.On("FindPeers", mock.Anything, pid, 20).Return(results, nil) + + resp := makeRequest(t, router, mediaTypeJSON, peer.ToCid(pid).String()) + require.Equal(t, 200, resp.StatusCode) + + header := resp.Header.Get("Content-Type") + require.Equal(t, mediaTypeJSON, header) + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + + expectedBody := `{"Peers":[{"Addrs":[],"ID":"` + pid.String() + `","Protocols":["transport-bitswap","transport-foo"],"Schema":"peer"},{"Addrs":[],"ID":"` + pid.String() + `","Protocols":["transport-foo"],"Schema":"peer"}]}` + require.Equal(t, expectedBody, string(body)) + }) + + t.Run("GET /routing/v1/peers/{cid-peer-id} returns 200 with correct body (NDJSON)", func(t *testing.T) { + t.Parallel() + + _, pid := makePeerID(t) + results := iter.FromSlice([]iter.Result[types.Record]{ + {Val: &types.PeerRecord{ + Schema: types.SchemaPeer, + ID: &pid, + Protocols: []string{"transport-bitswap", "transport-foo"}, + Addrs: []types.Multiaddr{}, + }}, + {Val: &types.PeerRecord{ + Schema: types.SchemaPeer, + ID: &pid, + Protocols: []string{"transport-foo"}, + Addrs: []types.Multiaddr{}, + }}, + }) + + router := &mockContentRouter{} + router.On("FindPeers", mock.Anything, pid, 0).Return(results, nil) + + resp := makeRequest(t, router, mediaTypeNDJSON, peer.ToCid(pid).String()) + require.Equal(t, 200, resp.StatusCode) + + header := resp.Header.Get("Content-Type") + require.Equal(t, mediaTypeNDJSON, header) + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + + expectedBody := `{"Addrs":[],"ID":"` + pid.String() + `","Protocols":["transport-bitswap","transport-foo"],"Schema":"peer"}` + "\n" + `{"Addrs":[],"ID":"` + pid.String() + `","Protocols":["transport-foo"],"Schema":"peer"}` + "\n" + require.Equal(t, expectedBody, string(body)) + }) +} + +func makeName(t *testing.T) (crypto.PrivKey, ipns.Name) { + sk, pid := makePeerID(t) return sk, ipns.NameFromPeer(pid) } @@ -179,7 +285,7 @@ func TestIPNS(t *testing.T) { require.NoError(t, err) router := &mockContentRouter{} - router.On("FindIPNSRecord", mock.Anything, name1).Return(rec, nil) + router.On("GetIPNS", mock.Anything, name1).Return(rec, nil) resp := makeRequest(t, router, "/routing/v1/ipns/"+name1.String()) require.Equal(t, 200, resp.StatusCode) @@ -212,7 +318,7 @@ func TestIPNS(t *testing.T) { t.Parallel() router := &mockContentRouter{} - router.On("ProvideIPNSRecord", mock.Anything, name1, record1).Return(nil) + router.On("PutIPNS", mock.Anything, name1, record1).Return(nil) server := httptest.NewServer(Handler(router)) t.Cleanup(server.Close) @@ -259,9 +365,9 @@ func TestIPNS(t *testing.T) { type mockContentRouter struct{ mock.Mock } -func (m *mockContentRouter) FindProviders(ctx context.Context, key cid.Cid, limit int) (iter.ResultIter[types.ProviderResponse], error) { +func (m *mockContentRouter) FindProviders(ctx context.Context, key cid.Cid, limit int) (iter.ResultIter[types.Record], error) { args := m.Called(ctx, key, limit) - return args.Get(0).(iter.ResultIter[types.ProviderResponse]), args.Error(1) + return args.Get(0).(iter.ResultIter[types.Record]), args.Error(1) } func (m *mockContentRouter) ProvideBitswap(ctx context.Context, req *BitswapWriteProvideRequest) (time.Duration, error) { @@ -269,17 +375,17 @@ func (m *mockContentRouter) ProvideBitswap(ctx context.Context, req *BitswapWrit return args.Get(0).(time.Duration), args.Error(1) } -func (m *mockContentRouter) Provide(ctx context.Context, req *WriteProvideRequest) (types.ProviderResponse, error) { - args := m.Called(ctx, req) - return args.Get(0).(types.ProviderResponse), args.Error(1) +func (m *mockContentRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[types.Record], error) { + args := m.Called(ctx, pid, limit) + return args.Get(0).(iter.ResultIter[types.Record]), args.Error(1) } -func (m *mockContentRouter) FindIPNSRecord(ctx context.Context, name ipns.Name) (*ipns.Record, error) { +func (m *mockContentRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) { args := m.Called(ctx, name) return args.Get(0).(*ipns.Record), args.Error(1) } -func (m *mockContentRouter) ProvideIPNSRecord(ctx context.Context, name ipns.Name, record *ipns.Record) error { +func (m *mockContentRouter) PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Record) error { args := m.Called(ctx, name, record) return args.Error(0) } diff --git a/routing/http/types/iter/iter.go b/routing/http/types/iter/iter.go index 67c6dde00..2e9801d46 100644 --- a/routing/http/types/iter/iter.go +++ b/routing/http/types/iter/iter.go @@ -1,5 +1,7 @@ package iter +import "fmt" + // Iter is an iterator of arbitrary values. // Iterators are generally not goroutine-safe, to make them safe just read from them into a channel. // For our use cases, these usually have a single reader. This motivates iterators instead of channels, @@ -44,3 +46,21 @@ func ReadAll[T any](iter Iter[T]) []T { } return vs } + +func ReadAllResults[T any](iter ResultIter[T]) ([]T, error) { + var ( + vs []T + i int + ) + + for iter.Next() { + res := iter.Val() + if res.Err != nil { + return nil, fmt.Errorf("error on result %d: %w", i, res.Err) + } + vs = append(vs, res.Val) + i++ + } + + return vs, nil +} diff --git a/routing/http/types/json/provider.go b/routing/http/types/json/provider.go deleted file mode 100644 index 351197338..000000000 --- a/routing/http/types/json/provider.go +++ /dev/null @@ -1,116 +0,0 @@ -package json - -import ( - "encoding/json" - - "github.com/ipfs/boxo/routing/http/types" -) - -// ReadProvidersResponse is the result of a Provide request -type ReadProvidersResponse struct { - Providers []types.ProviderResponse -} - -func (r *ReadProvidersResponse) UnmarshalJSON(b []byte) error { - var tempFPR struct{ Providers []json.RawMessage } - err := json.Unmarshal(b, &tempFPR) - if err != nil { - return err - } - - for _, provBytes := range tempFPR.Providers { - var readProv types.UnknownProviderRecord - err := json.Unmarshal(provBytes, &readProv) - if err != nil { - return err - } - - switch readProv.Schema { - case types.SchemaBitswap: - var prov types.ReadBitswapProviderRecord - err := json.Unmarshal(readProv.Bytes, &prov) - if err != nil { - return err - } - r.Providers = append(r.Providers, &prov) - default: - r.Providers = append(r.Providers, &readProv) - } - - } - return nil -} - -type WriteProvidersRequest struct { - Providers []types.WriteProviderRecord -} - -func (r *WriteProvidersRequest) UnmarshalJSON(b []byte) error { - type wpr struct{ Providers []json.RawMessage } - var tempWPR wpr - err := json.Unmarshal(b, &tempWPR) - if err != nil { - return err - } - - for _, provBytes := range tempWPR.Providers { - var rawProv types.UnknownProviderRecord - err := json.Unmarshal(provBytes, &rawProv) - if err != nil { - return err - } - - switch rawProv.Schema { - case types.SchemaBitswap: - var prov types.WriteBitswapProviderRecord - err := json.Unmarshal(rawProv.Bytes, &prov) - if err != nil { - return err - } - r.Providers = append(r.Providers, &prov) - default: - var prov types.UnknownProviderRecord - err := json.Unmarshal(b, &prov) - if err != nil { - return err - } - r.Providers = append(r.Providers, &prov) - } - } - return nil -} - -// WriteProvidersResponse is the result of a Provide operation -type WriteProvidersResponse struct { - ProvideResults []types.ProviderResponse -} - -func (r *WriteProvidersResponse) UnmarshalJSON(b []byte) error { - var tempWPR struct{ ProvideResults []json.RawMessage } - err := json.Unmarshal(b, &tempWPR) - if err != nil { - return err - } - - for _, provBytes := range tempWPR.ProvideResults { - var rawProv types.UnknownProviderRecord - err := json.Unmarshal(provBytes, &rawProv) - if err != nil { - return err - } - - switch rawProv.Schema { - case types.SchemaBitswap: - var prov types.WriteBitswapProviderRecordResponse - err := json.Unmarshal(rawProv.Bytes, &prov) - if err != nil { - return err - } - r.ProvideResults = append(r.ProvideResults, &prov) - default: - r.ProvideResults = append(r.ProvideResults, &rawProv) - } - } - - return nil -} diff --git a/routing/http/types/json/requests.go b/routing/http/types/json/requests.go new file mode 100644 index 000000000..4b582c3ba --- /dev/null +++ b/routing/http/types/json/requests.go @@ -0,0 +1,51 @@ +package json + +import ( + "encoding/json" + + "github.com/ipfs/boxo/routing/http/types" +) + +// Deprecated: protocol-agnostic provide is being worked on in [IPIP-378]: +// +// [IPIP-378]: https://github.com/ipfs/specs/pull/378 +type WriteProvidersRequest struct { + Providers []types.Record +} + +func (r *WriteProvidersRequest) UnmarshalJSON(b []byte) error { + type wpr struct{ Providers []json.RawMessage } + var tempWPR wpr + err := json.Unmarshal(b, &tempWPR) + if err != nil { + return err + } + + for _, provBytes := range tempWPR.Providers { + var rawProv types.UnknownRecord + err := json.Unmarshal(provBytes, &rawProv) + if err != nil { + return err + } + + switch rawProv.Schema { + //lint:ignore SA1019 // ignore staticcheck + case types.SchemaBitswap: + //lint:ignore SA1019 // ignore staticcheck + var prov types.WriteBitswapRecord + err := json.Unmarshal(rawProv.Bytes, &prov) + if err != nil { + return err + } + r.Providers = append(r.Providers, &prov) + default: + var prov types.UnknownRecord + err := json.Unmarshal(b, &prov) + if err != nil { + return err + } + r.Providers = append(r.Providers, &prov) + } + } + return nil +} diff --git a/routing/http/types/json/responses.go b/routing/http/types/json/responses.go new file mode 100644 index 000000000..dfcfad830 --- /dev/null +++ b/routing/http/types/json/responses.go @@ -0,0 +1,98 @@ +package json + +import ( + "encoding/json" + + "github.com/ipfs/boxo/routing/http/types" +) + +// ProvidersResponse is the result of a GET Providers request. +type ProvidersResponse struct { + Providers RecordsArray +} + +// PeersResponse is the result of a GET Peers request. +type PeersResponse struct { + Peers RecordsArray +} + +// RecordsArray is an array of [types.Record] +type RecordsArray []types.Record + +func (r *RecordsArray) UnmarshalJSON(b []byte) error { + var tempRecords []json.RawMessage + err := json.Unmarshal(b, &tempRecords) + if err != nil { + return err + } + + for _, provBytes := range tempRecords { + var readProv types.UnknownRecord + err := json.Unmarshal(provBytes, &readProv) + if err != nil { + return err + } + + switch readProv.Schema { + case types.SchemaPeer: + var prov types.PeerRecord + err := json.Unmarshal(provBytes, &prov) + if err != nil { + return err + } + *r = append(*r, &prov) + //lint:ignore SA1019 // ignore staticcheck + case types.SchemaBitswap: + //lint:ignore SA1019 // ignore staticcheck + var prov types.BitswapRecord + err := json.Unmarshal(provBytes, &prov) + if err != nil { + return err + } + *r = append(*r, &prov) + default: + *r = append(*r, &readProv) + } + + } + return nil +} + +// Deprecated: protocol-agnostic provide is being worked on in [IPIP-378]: +// +// [IPIP-378]: https://github.com/ipfs/specs/pull/378 +type WriteProvidersResponse struct { + ProvideResults []types.Record +} + +func (r *WriteProvidersResponse) UnmarshalJSON(b []byte) error { + var tempWPR struct{ ProvideResults []json.RawMessage } + err := json.Unmarshal(b, &tempWPR) + if err != nil { + return err + } + + for _, provBytes := range tempWPR.ProvideResults { + var rawProv types.UnknownRecord + err := json.Unmarshal(provBytes, &rawProv) + if err != nil { + return err + } + + switch rawProv.Schema { + //lint:ignore SA1019 // ignore staticcheck + case types.SchemaBitswap: + //lint:ignore SA1019 // ignore staticcheck + var prov types.WriteBitswapRecordResponse + err := json.Unmarshal(rawProv.Bytes, &prov) + if err != nil { + return err + } + r.ProvideResults = append(r.ProvideResults, &prov) + default: + r.ProvideResults = append(r.ProvideResults, &rawProv) + } + } + + return nil +} diff --git a/routing/http/types/ndjson/provider.go b/routing/http/types/ndjson/provider.go deleted file mode 100644 index 38e28df9a..000000000 --- a/routing/http/types/ndjson/provider.go +++ /dev/null @@ -1,36 +0,0 @@ -package ndjson - -import ( - "encoding/json" - "io" - - "github.com/ipfs/boxo/routing/http/types" - "github.com/ipfs/boxo/routing/http/types/iter" -) - -// NewReadProvidersResponseIter returns an iterator that reads Read Provider Records from the given reader. -func NewReadProvidersResponseIter(r io.Reader) iter.Iter[iter.Result[types.ProviderResponse]] { - jsonIter := iter.FromReaderJSON[types.UnknownProviderRecord](r) - mapFn := func(upr iter.Result[types.UnknownProviderRecord]) iter.Result[types.ProviderResponse] { - var result iter.Result[types.ProviderResponse] - if upr.Err != nil { - result.Err = upr.Err - return result - } - switch upr.Val.Schema { - case types.SchemaBitswap: - var prov types.ReadBitswapProviderRecord - err := json.Unmarshal(upr.Val.Bytes, &prov) - if err != nil { - result.Err = err - return result - } - result.Val = &prov - default: - result.Val = &upr.Val - } - return result - } - - return iter.Map[iter.Result[types.UnknownProviderRecord]](jsonIter, mapFn) -} diff --git a/routing/http/types/ndjson/records.go b/routing/http/types/ndjson/records.go new file mode 100644 index 000000000..d1a36b411 --- /dev/null +++ b/routing/http/types/ndjson/records.go @@ -0,0 +1,46 @@ +package ndjson + +import ( + "encoding/json" + "io" + + "github.com/ipfs/boxo/routing/http/types" + "github.com/ipfs/boxo/routing/http/types/iter" +) + +// NewRecordsIter returns an iterator that reads [types.Record] from the given [io.Reader]. +func NewRecordsIter(r io.Reader) iter.Iter[iter.Result[types.Record]] { + jsonIter := iter.FromReaderJSON[types.UnknownRecord](r) + mapFn := func(upr iter.Result[types.UnknownRecord]) iter.Result[types.Record] { + var result iter.Result[types.Record] + if upr.Err != nil { + result.Err = upr.Err + return result + } + switch upr.Val.Schema { + case types.SchemaPeer: + var prov types.PeerRecord + err := json.Unmarshal(upr.Val.Bytes, &prov) + if err != nil { + result.Err = err + return result + } + result.Val = &prov + //lint:ignore SA1019 // ignore staticcheck + case types.SchemaBitswap: + //lint:ignore SA1019 // ignore staticcheck + var prov types.BitswapRecord + err := json.Unmarshal(upr.Val.Bytes, &prov) + if err != nil { + result.Err = err + return result + } + result.Val = &prov + default: + result.Val = &upr.Val + } + return result + } + + return iter.Map[iter.Result[types.UnknownRecord]](jsonIter, mapFn) +} diff --git a/routing/http/types/provider.go b/routing/http/types/provider.go deleted file mode 100644 index 6e8e303f7..000000000 --- a/routing/http/types/provider.go +++ /dev/null @@ -1,17 +0,0 @@ -package types - -// WriteProviderRecord is a type that enforces structs to imlement it to avoid confusion -type WriteProviderRecord interface { - IsWriteProviderRecord() -} - -// ReadProviderRecord is a type that enforces structs to imlement it to avoid confusion -type ReadProviderRecord interface { - IsReadProviderRecord() -} - -// ProviderResponse is implemented for any ProviderResponse. It needs to have a Protocol field. -type ProviderResponse interface { - GetProtocol() string - GetSchema() string -} diff --git a/routing/http/types/provider_unknown.go b/routing/http/types/provider_unknown.go deleted file mode 100644 index 915cac481..000000000 --- a/routing/http/types/provider_unknown.go +++ /dev/null @@ -1,63 +0,0 @@ -package types - -import ( - "encoding/json" - - "github.com/ipfs/boxo/routing/http/internal/drjson" -) - -var ( - _ ReadProviderRecord = &UnknownProviderRecord{} - _ WriteProviderRecord = &UnknownProviderRecord{} - _ ProviderResponse = &UnknownProviderRecord{} -) - -// UnknownProviderRecord is used when we cannot parse the provider record using `GetProtocol` -type UnknownProviderRecord struct { - Protocol string - Schema string - Bytes []byte -} - -func (u *UnknownProviderRecord) GetProtocol() string { - return u.Protocol -} - -func (u *UnknownProviderRecord) GetSchema() string { - return u.Schema -} - -func (u *UnknownProviderRecord) IsReadProviderRecord() {} -func (u UnknownProviderRecord) IsWriteProviderRecord() {} - -func (u *UnknownProviderRecord) UnmarshalJSON(b []byte) error { - m := map[string]interface{}{} - if err := json.Unmarshal(b, &m); err != nil { - return err - } - - ps, ok := m["Protocol"].(string) - if ok { - u.Protocol = ps - } - schema, ok := m["Schema"].(string) - if ok { - u.Schema = schema - } - - u.Bytes = b - - return nil -} - -func (u UnknownProviderRecord) MarshalJSON() ([]byte, error) { - m := map[string]interface{}{} - err := json.Unmarshal(u.Bytes, &m) - if err != nil { - return nil, err - } - m["Protocol"] = u.Protocol - m["Schema"] = u.Schema - - return drjson.MarshalJSONBytes(m) -} diff --git a/routing/http/types/record.go b/routing/http/types/record.go new file mode 100644 index 000000000..4a734d5f5 --- /dev/null +++ b/routing/http/types/record.go @@ -0,0 +1,6 @@ +package types + +// Record is implemented for any record. +type Record interface { + GetSchema() string +} diff --git a/routing/http/types/provider_bitswap.go b/routing/http/types/record_bitswap.go similarity index 64% rename from routing/http/types/provider_bitswap.go rename to routing/http/types/record_bitswap.go index f0b5056e4..0780fc3eb 100644 --- a/routing/http/types/provider_bitswap.go +++ b/routing/http/types/record_bitswap.go @@ -12,14 +12,37 @@ import ( "github.com/multiformats/go-multibase" ) +// Deprecated: use the more versatile [SchemaPeer] instead. For more information, read [IPIP-417]. +// +// [IPIP-417]: https://github.com/ipfs/specs/pull/417 const SchemaBitswap = "bitswap" -var _ WriteProviderRecord = &WriteBitswapProviderRecord{} +var ( + _ Record = &BitswapRecord{} +) -// WriteBitswapProviderRecord is used when we want to add a new provider record that is using bitswap. -type WriteBitswapProviderRecord struct { - Protocol string +// Deprecated: use the more versatile [PeerRecord] instead. For more information, read [IPIP-417]. +// +// [IPIP-417]: https://github.com/ipfs/specs/pull/417 +type BitswapRecord struct { + Schema string + Protocol string + ID *peer.ID + Addrs []Multiaddr `json:",omitempty"` +} + +func (br *BitswapRecord) GetSchema() string { + return br.Schema +} + +var _ Record = &WriteBitswapRecord{} + +// Deprecated: protocol-agnostic provide is being worked on in [IPIP-378]: +// +// [IPIP-378]: https://github.com/ipfs/specs/pull/378 +type WriteBitswapRecord struct { Schema string + Protocol string Signature string // this content must be untouched because it is signed and we need to verify it @@ -35,11 +58,13 @@ type BitswapPayload struct { Addrs []Multiaddr } -func (*WriteBitswapProviderRecord) IsWriteProviderRecord() {} +func (wr *WriteBitswapRecord) GetSchema() string { + return wr.Schema +} -type tmpBWPR WriteBitswapProviderRecord +type tmpBWPR WriteBitswapRecord -func (p *WriteBitswapProviderRecord) UnmarshalJSON(b []byte) error { +func (p *WriteBitswapRecord) UnmarshalJSON(b []byte) error { var bwp tmpBWPR err := json.Unmarshal(b, &bwp) if err != nil { @@ -54,11 +79,11 @@ func (p *WriteBitswapProviderRecord) UnmarshalJSON(b []byte) error { return json.Unmarshal(bwp.RawPayload, &p.Payload) } -func (p *WriteBitswapProviderRecord) IsSigned() bool { +func (p *WriteBitswapRecord) IsSigned() bool { return p.Signature != "" } -func (p *WriteBitswapProviderRecord) setRawPayload() error { +func (p *WriteBitswapRecord) setRawPayload() error { payloadBytes, err := drjson.MarshalJSONBytes(p.Payload) if err != nil { return fmt.Errorf("marshaling bitswap write provider payload: %w", err) @@ -69,7 +94,7 @@ func (p *WriteBitswapProviderRecord) setRawPayload() error { return nil } -func (p *WriteBitswapProviderRecord) Sign(peerID peer.ID, key crypto.PrivKey) error { +func (p *WriteBitswapRecord) Sign(peerID peer.ID, key crypto.PrivKey) error { if p.IsSigned() { return errors.New("already signed") } @@ -105,7 +130,7 @@ func (p *WriteBitswapProviderRecord) Sign(peerID peer.ID, key crypto.PrivKey) er return nil } -func (p *WriteBitswapProviderRecord) Verify() error { +func (p *WriteBitswapRecord) Verify() error { if !p.IsSigned() { return errors.New("not signed") } @@ -145,42 +170,17 @@ func (p *WriteBitswapProviderRecord) Verify() error { return nil } -var _ ProviderResponse = &WriteBitswapProviderRecordResponse{} +var _ Record = &WriteBitswapRecordResponse{} -// WriteBitswapProviderRecordResponse will be returned as a result of WriteBitswapProviderRecord -type WriteBitswapProviderRecordResponse struct { - Protocol string +// Deprecated: protocol-agnostic provide is being worked on in [IPIP-378]: +// +// [IPIP-378]: https://github.com/ipfs/specs/pull/378 +type WriteBitswapRecordResponse struct { Schema string + Protocol string AdvisoryTTL *Duration } -func (wbprr *WriteBitswapProviderRecordResponse) GetProtocol() string { - return wbprr.Protocol +func (r *WriteBitswapRecordResponse) GetSchema() string { + return r.Schema } - -func (wbprr *WriteBitswapProviderRecordResponse) GetSchema() string { - return wbprr.Schema -} - -var ( - _ ReadProviderRecord = &ReadBitswapProviderRecord{} - _ ProviderResponse = &ReadBitswapProviderRecord{} -) - -// ReadBitswapProviderRecord is a provider result with parameters for bitswap providers -type ReadBitswapProviderRecord struct { - Protocol string - Schema string - ID *peer.ID - Addrs []Multiaddr -} - -func (rbpr *ReadBitswapProviderRecord) GetProtocol() string { - return rbpr.Protocol -} - -func (rbpr *ReadBitswapProviderRecord) GetSchema() string { - return rbpr.Schema -} - -func (*ReadBitswapProviderRecord) IsReadProviderRecord() {} diff --git a/routing/http/types/record_peer.go b/routing/http/types/record_peer.go new file mode 100644 index 000000000..76bd810e0 --- /dev/null +++ b/routing/http/types/record_peer.go @@ -0,0 +1,81 @@ +package types + +import ( + "encoding/json" + + "github.com/ipfs/boxo/routing/http/internal/drjson" + "github.com/libp2p/go-libp2p/core/peer" +) + +const SchemaPeer = "peer" + +var _ Record = &PeerRecord{} + +type PeerRecord struct { + Schema string + ID *peer.ID + Addrs []Multiaddr + Protocols []string + + // Extra contains extra fields that were included in the original JSON raw + // message, except for the known ones represented by the remaining fields. + Extra map[string]json.RawMessage +} + +func (pr *PeerRecord) GetSchema() string { + return pr.Schema +} + +func (pr *PeerRecord) UnmarshalJSON(b []byte) error { + // Unmarshal all known fields and assign them. + v := struct { + Schema string + ID *peer.ID + Addrs []Multiaddr + Protocols []string + }{} + err := json.Unmarshal(b, &v) + if err != nil { + return err + } + pr.Schema = v.Schema + pr.ID = v.ID + pr.Addrs = v.Addrs + pr.Protocols = v.Protocols + + // Unmarshal everything into the Extra field and remove the + // known fields to avoid conflictual usages of the struct. + err = json.Unmarshal(b, &pr.Extra) + if err != nil { + return err + } + delete(pr.Extra, "Schema") + delete(pr.Extra, "ID") + delete(pr.Extra, "Addrs") + delete(pr.Extra, "Protocols") + + return nil +} + +func (pr PeerRecord) MarshalJSON() ([]byte, error) { + m := map[string]interface{}{} + if pr.Extra != nil { + for key, val := range pr.Extra { + m[key] = val + } + } + + // Schema and ID must always be set. + m["Schema"] = pr.Schema + m["ID"] = pr.ID + + if pr.Addrs != nil { + m["Addrs"] = pr.Addrs + } + + if pr.Protocols != nil { + m["Protocols"] = pr.Protocols + } + + return drjson.MarshalJSONBytes(m) +} diff --git a/routing/http/types/record_unknown.go b/routing/http/types/record_unknown.go new file mode 100644 index 000000000..9b2f6f960 --- /dev/null +++ b/routing/http/types/record_unknown.go @@ -0,0 +1,47 @@ +package types + +import ( + "encoding/json" + + "github.com/ipfs/boxo/routing/http/internal/drjson" +) + +var _ Record = &UnknownRecord{} + +type UnknownRecord struct { + Schema string + + // Bytes contains the raw JSON bytes that were used to unmarshal this record. + // This value can be used, for example, to unmarshal de record into a different + // type if Schema is of a known value. + Bytes []byte +} + +func (ur *UnknownRecord) GetSchema() string { + return ur.Schema +} + +func (ur *UnknownRecord) UnmarshalJSON(b []byte) error { + v := struct { + Schema string + }{} + err := json.Unmarshal(b, &v) + if err != nil { + return err + } + ur.Schema = v.Schema + ur.Bytes = b + return nil +} + +func (ur UnknownRecord) MarshalJSON() ([]byte, error) { + m := map[string]interface{}{} + if ur.Bytes != nil { + err := json.Unmarshal(ur.Bytes, &m) + if err != nil { + return nil, err + } + } + m["Schema"] = ur.Schema + return drjson.MarshalJSONBytes(m) +}