diff --git a/routing/http/client/client.go b/routing/http/client/client.go index 4ccc7e4fa9..e74d23bf14 100644 --- a/routing/http/client/client.go +++ b/routing/http/client/client.go @@ -157,7 +157,7 @@ func (c *measuringIter[T]) Close() error { return c.Iter.Close() } -func (c *client) FindProviders(ctx context.Context, key cid.Cid) (provs iter.ResultIter[types.ProviderResponse], err error) { +func (c *client) FindProviders(ctx context.Context, key cid.Cid) (provs iter.ResultIter[types.Record], err error) { // TODO test measurements m := newMeasurement("FindProviders") @@ -185,7 +185,7 @@ func (c *client) FindProviders(ctx context.Context, key cid.Cid) (provs iter.Res if resp.StatusCode == http.StatusNotFound { resp.Body.Close() m.record(ctx) - return iter.FromSlice[iter.Result[types.ProviderResponse]](nil), nil + return iter.FromSlice[iter.Result[types.Record]](nil), nil } if resp.StatusCode != http.StatusOK { @@ -213,22 +213,22 @@ func (c *client) FindProviders(ctx context.Context, key cid.Cid) (provs iter.Res } }() - var it iter.ResultIter[types.ProviderResponse] + var it iter.ResultIter[types.Record] switch mediaType { case mediaTypeJSON: parsedResp := &jsontypes.ProvidersResponse{} err = json.NewDecoder(resp.Body).Decode(parsedResp) - var sliceIt iter.Iter[types.ProviderResponse] = iter.FromSlice(parsedResp.Providers) + var sliceIt iter.Iter[types.Record] = iter.FromSlice(parsedResp.Providers) it = iter.ToResultIter(sliceIt) case mediaTypeNDJSON: skipBodyClose = true - it = ndjson.NewReadProvidersResponseIter(resp.Body) + it = ndjson.NewProvidersResponseIter(resp.Body) default: logger.Errorw("unknown media type", "MediaType", mediaType, "ContentType", respContentType) return nil, errors.New("unknown content type") } - return &measuringIter[iter.Result[types.ProviderResponse]]{Iter: it, ctx: ctx, m: m}, nil + return &measuringIter[iter.Result[types.Record]]{Iter: it, ctx: ctx, m: m}, nil } func (c *client) FindIPNSRecord(ctx context.Context, name ipns.Name) (*ipns.Record, error) { diff --git a/routing/http/client/client_test.go b/routing/http/client/client_test.go index b386ce13d3..2d39d9b4b0 100644 --- a/routing/http/client/client_test.go +++ b/routing/http/client/client_test.go @@ -29,9 +29,9 @@ import ( type mockContentRouter struct{ mock.Mock } -func (m *mockContentRouter) FindProviders(ctx context.Context, key cid.Cid, limit int) (iter.ResultIter[types.ProviderResponse], error) { +func (m *mockContentRouter) FindProviders(ctx context.Context, key cid.Cid, limit int) (iter.ResultIter[types.Record], error) { args := m.Called(ctx, key, limit) - return args.Get(0).(iter.ResultIter[types.ProviderResponse]), args.Error(1) + return args.Get(0).(iter.ResultIter[types.Record]), args.Error(1) } func (m *mockContentRouter) FindIPNSRecord(ctx context.Context, name ipns.Name) (*ipns.Record, error) { @@ -139,13 +139,13 @@ func addrsToDRAddrs(addrs []multiaddr.Multiaddr) (drmas []types.Multiaddr) { return } -func makeBSReadProviderResp() types.ReadBitswapProviderRecord { +func makeBSReadProviderResp() types.PeerRecord { peerID, addrs, _ := makeProviderAndIdentity() - return types.ReadBitswapProviderRecord{ - Protocol: "transport-bitswap", - Schema: types.SchemaBitswap, - ID: &peerID, - Addrs: addrsToDRAddrs(addrs), + return types.PeerRecord{ + Schema: types.SchemaPeer, + ID: &peerID, + Protocols: []string{"transport-bitswap"}, + Addrs: addrsToDRAddrs(addrs), } } @@ -190,7 +190,7 @@ func (e *osErrContains) errContains(t *testing.T, err error) { func TestClient_FindProviders(t *testing.T) { bsReadProvResp := makeBSReadProviderResp() - bitswapProvs := []iter.Result[types.ProviderResponse]{ + bitswapProvs := []iter.Result[types.Record]{ {Val: &bsReadProvResp}, } @@ -198,13 +198,13 @@ func TestClient_FindProviders(t *testing.T) { name string httpStatusCode int stopServer bool - routerProvs []iter.Result[types.ProviderResponse] + routerProvs []iter.Result[types.Record] routerErr error clientRequiresStreaming bool serverStreamingDisabled bool expErrContains osErrContains - expProvs []iter.Result[types.ProviderResponse] + expProvs []iter.Result[types.Record] expStreamingResponse bool expJSONResponse bool }{ @@ -308,7 +308,7 @@ func TestClient_FindProviders(t *testing.T) { c.expErrContains.errContains(t, err) - provs := iter.ReadAll[iter.Result[types.ProviderResponse]](provsIter) + provs := iter.ReadAll[iter.Result[types.Record]](provsIter) assert.Equal(t, c.expProvs, provs) }) } diff --git a/routing/http/contentrouter/contentrouter.go b/routing/http/contentrouter/contentrouter.go index 7382f5a7a9..ef76d12b3a 100644 --- a/routing/http/contentrouter/contentrouter.go +++ b/routing/http/contentrouter/contentrouter.go @@ -19,7 +19,7 @@ var logger = logging.Logger("service/contentrouting") const ttl = 24 * time.Hour type Client interface { - FindProviders(ctx context.Context, key cid.Cid) (iter.ResultIter[types.ProviderResponse], error) + FindProviders(ctx context.Context, key cid.Cid) (iter.ResultIter[types.Record], error) } type contentRouter struct { @@ -66,7 +66,7 @@ func (c *contentRouter) Ready() bool { } // readProviderResponses reads bitswap records from the iterator into the given channel, dropping non-bitswap records. -func readProviderResponses(iter iter.ResultIter[types.ProviderResponse], ch chan<- peer.AddrInfo) { +func readProviderResponses(iter iter.ResultIter[types.Record], ch chan<- peer.AddrInfo) { defer close(ch) defer iter.Close() for iter.Next() { @@ -76,8 +76,8 @@ func readProviderResponses(iter iter.ResultIter[types.ProviderResponse], ch chan continue } v := res.Val - if v.GetSchema() == types.SchemaBitswap { - result, ok := v.(*types.ReadBitswapProviderRecord) + if v.GetSchema() == types.SchemaPeer { + result, ok := v.(*types.PeerRecord) if !ok { logger.Errorw( "problem casting find providers result", diff --git a/routing/http/contentrouter/contentrouter_test.go b/routing/http/contentrouter/contentrouter_test.go index 00ccc4d54f..c70485a04b 100644 --- a/routing/http/contentrouter/contentrouter_test.go +++ b/routing/http/contentrouter/contentrouter_test.go @@ -16,9 +16,9 @@ import ( type mockClient struct{ mock.Mock } -func (m *mockClient) FindProviders(ctx context.Context, key cid.Cid) (iter.ResultIter[types.ProviderResponse], error) { +func (m *mockClient) FindProviders(ctx context.Context, key cid.Cid) (iter.ResultIter[types.Record], error) { args := m.Called(ctx, key) - return args.Get(0).(iter.ResultIter[types.ProviderResponse]), args.Error(1) + return args.Get(0).(iter.ResultIter[types.Record]), args.Error(1) } func (m *mockClient) Ready(ctx context.Context) (bool, error) { args := m.Called(ctx) @@ -46,22 +46,22 @@ func TestFindProvidersAsync(t *testing.T) { p1 := peer.ID("peer1") p2 := peer.ID("peer2") - ais := []types.ProviderResponse{ - &types.ReadBitswapProviderRecord{ - Protocol: "transport-bitswap", - Schema: types.SchemaBitswap, - ID: &p1, + ais := []types.Record{ + &types.PeerRecord{ + Schema: types.SchemaPeer, + ID: &p1, + Protocols: []string{"transport-bitswap"}, }, - &types.ReadBitswapProviderRecord{ - Protocol: "transport-bitswap", - Schema: types.SchemaBitswap, - ID: &p2, - }, - &types.UnknownProviderRecord{ - Protocol: "UNKNOWN", + &types.PeerRecord{ + Schema: types.SchemaPeer, + ID: &p2, + Protocols: []string{"transport-bitswap"}, }, + // &types.UnknownRecord{ + // Protocol: "UNKNOWN", + // }, } - aisIter := iter.ToResultIter[types.ProviderResponse](iter.FromSlice(ais)) + aisIter := iter.ToResultIter[types.Record](iter.FromSlice(ais)) client.On("FindProviders", ctx, key).Return(aisIter, nil) diff --git a/routing/http/server/server.go b/routing/http/server/server.go index 6b2591e419..5673a14e0f 100644 --- a/routing/http/server/server.go +++ b/routing/http/server/server.go @@ -41,14 +41,14 @@ const ( ) type FindProvidersAsyncResponse struct { - ProviderResponse types.ProviderResponse + ProviderResponse types.Record Error error } type ContentRouter interface { // FindProviders searches for peers who are able to provide a given key. Limit // indicates the maximum amount of results to return. 0 means unbounded. - FindProviders(ctx context.Context, key cid.Cid, limit int) (iter.ResultIter[types.ProviderResponse], error) + FindProviders(ctx context.Context, key cid.Cid, limit int) (iter.ResultIter[types.Record], error) // FindIPNSRecord searches for an [ipns.Record] for the given [ipns.Name]. FindIPNSRecord(ctx context.Context, name ipns.Name) (*ipns.Record, error) @@ -119,7 +119,7 @@ func (s *server) findProviders(w http.ResponseWriter, httpReq *http.Request) { return } - var handlerFunc func(w http.ResponseWriter, provIter iter.ResultIter[types.ProviderResponse]) + var handlerFunc func(w http.ResponseWriter, provIter iter.ResultIter[types.Record]) var supportsNDJSON bool var supportsJSON bool @@ -167,11 +167,11 @@ func (s *server) findProviders(w http.ResponseWriter, httpReq *http.Request) { handlerFunc(w, provIter) } -func (s *server) findProvidersJSON(w http.ResponseWriter, provIter iter.ResultIter[types.ProviderResponse]) { +func (s *server) findProvidersJSON(w http.ResponseWriter, provIter iter.ResultIter[types.Record]) { defer provIter.Close() var ( - providers []types.ProviderResponse + providers []types.Record i int ) @@ -188,7 +188,7 @@ func (s *server) findProvidersJSON(w http.ResponseWriter, provIter iter.ResultIt writeJSONResult(w, "FindProviders", response) } -func (s *server) findProvidersNDJSON(w http.ResponseWriter, provIter iter.ResultIter[types.ProviderResponse]) { +func (s *server) findProvidersNDJSON(w http.ResponseWriter, provIter iter.ResultIter[types.Record]) { defer provIter.Close() w.Header().Set("Content-Type", mediaTypeNDJSON) diff --git a/routing/http/server/server_test.go b/routing/http/server/server_test.go index bb40491982..64adb705b8 100644 --- a/routing/http/server/server_test.go +++ b/routing/http/server/server_test.go @@ -28,10 +28,10 @@ func TestHeaders(t *testing.T) { t.Cleanup(server.Close) serverAddr := "http://" + server.Listener.Addr().String() - results := iter.FromSlice([]iter.Result[types.ProviderResponse]{ - {Val: &types.ReadBitswapProviderRecord{ - Protocol: "transport-bitswap", - Schema: types.SchemaBitswap, + results := iter.FromSlice([]iter.Result[types.Record]{ + {Val: &types.PeerRecord{ + Schema: types.SchemaPeer, + Protocols: []string{"transport-bitswap"}, }}}, ) @@ -72,18 +72,18 @@ func TestResponse(t *testing.T) { runTest := func(t *testing.T, contentType string, expectedStream bool, expectedBody string) { t.Parallel() - results := iter.FromSlice([]iter.Result[types.ProviderResponse]{ - {Val: &types.ReadBitswapProviderRecord{ - Protocol: "transport-bitswap", - Schema: types.SchemaBitswap, - ID: &pid, - Addrs: []types.Multiaddr{}, + results := iter.FromSlice([]iter.Result[types.Record]{ + {Val: &types.PeerRecord{ + Schema: types.SchemaPeer, + ID: &pid, + Protocols: []string{"transport-bitswap"}, + Addrs: []types.Multiaddr{}, }}, - {Val: &types.ReadBitswapProviderRecord{ - Protocol: "transport-bitswap", - Schema: types.SchemaBitswap, - ID: &pid2, - Addrs: []types.Multiaddr{}, + {Val: &types.PeerRecord{ + Schema: types.SchemaPeer, + ID: &pid2, + Protocols: []string{"transport-bitswap"}, + Addrs: []types.Multiaddr{}, }}}, ) @@ -111,15 +111,15 @@ func TestResponse(t *testing.T) { body, err := io.ReadAll(resp.Body) require.NoError(t, err) - require.Equal(t, string(body), expectedBody) + require.Equal(t, expectedBody, string(body)) } t.Run("JSON Response", func(t *testing.T) { - runTest(t, mediaTypeJSON, false, `{"Providers":[{"Protocol":"transport-bitswap","Schema":"bitswap","ID":"12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vn","Addrs":[]},{"Protocol":"transport-bitswap","Schema":"bitswap","ID":"12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vz","Addrs":[]}]}`) + runTest(t, mediaTypeJSON, false, `{"Providers":[{"Addrs":[],"ID":"12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vn","Protocols":["transport-bitswap"],"Schema":"peer"},{"Addrs":[],"ID":"12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vz","Protocols":["transport-bitswap"],"Schema":"peer"}]}`) }) t.Run("NDJSON Response", func(t *testing.T) { - runTest(t, mediaTypeNDJSON, true, `{"Protocol":"transport-bitswap","Schema":"bitswap","ID":"12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vn","Addrs":[]}`+"\n"+`{"Protocol":"transport-bitswap","Schema":"bitswap","ID":"12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vz","Addrs":[]}`+"\n") + runTest(t, mediaTypeNDJSON, true, `{"Addrs":[],"ID":"12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vn","Protocols":["transport-bitswap"],"Schema":"peer"}`+"\n"+`{"Addrs":[],"ID":"12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vz","Protocols":["transport-bitswap"],"Schema":"peer"}`+"\n") }) } @@ -257,9 +257,9 @@ func TestIPNS(t *testing.T) { type mockContentRouter struct{ mock.Mock } -func (m *mockContentRouter) FindProviders(ctx context.Context, key cid.Cid, limit int) (iter.ResultIter[types.ProviderResponse], error) { +func (m *mockContentRouter) FindProviders(ctx context.Context, key cid.Cid, limit int) (iter.ResultIter[types.Record], error) { args := m.Called(ctx, key, limit) - return args.Get(0).(iter.ResultIter[types.ProviderResponse]), args.Error(1) + return args.Get(0).(iter.ResultIter[types.Record]), args.Error(1) } func (m *mockContentRouter) FindIPNSRecord(ctx context.Context, name ipns.Name) (*ipns.Record, error) { diff --git a/routing/http/types/json/provider.go b/routing/http/types/json/providers.go similarity index 82% rename from routing/http/types/json/provider.go rename to routing/http/types/json/providers.go index 3163e11444..03885a9522 100644 --- a/routing/http/types/json/provider.go +++ b/routing/http/types/json/providers.go @@ -8,7 +8,7 @@ import ( // ProvidersResponse is the result of a GET Providers request. type ProvidersResponse struct { - Providers []types.ProviderResponse + Providers []types.Record } func (r *ProvidersResponse) UnmarshalJSON(b []byte) error { @@ -19,15 +19,15 @@ func (r *ProvidersResponse) UnmarshalJSON(b []byte) error { } for _, provBytes := range tempFPR.Providers { - var readProv types.UnknownProviderRecord + var readProv types.UnknownRecord err := json.Unmarshal(provBytes, &readProv) if err != nil { return err } switch readProv.Schema { - case types.SchemaBitswap: - var prov types.ReadBitswapProviderRecord + case types.SchemaPeer: + var prov types.PeerRecord err := json.Unmarshal(readProv.Bytes, &prov) if err != nil { return err diff --git a/routing/http/types/ndjson/provider.go b/routing/http/types/ndjson/provider.go deleted file mode 100644 index 38e28df9a6..0000000000 --- a/routing/http/types/ndjson/provider.go +++ /dev/null @@ -1,36 +0,0 @@ -package ndjson - -import ( - "encoding/json" - "io" - - "github.com/ipfs/boxo/routing/http/types" - "github.com/ipfs/boxo/routing/http/types/iter" -) - -// NewReadProvidersResponseIter returns an iterator that reads Read Provider Records from the given reader. -func NewReadProvidersResponseIter(r io.Reader) iter.Iter[iter.Result[types.ProviderResponse]] { - jsonIter := iter.FromReaderJSON[types.UnknownProviderRecord](r) - mapFn := func(upr iter.Result[types.UnknownProviderRecord]) iter.Result[types.ProviderResponse] { - var result iter.Result[types.ProviderResponse] - if upr.Err != nil { - result.Err = upr.Err - return result - } - switch upr.Val.Schema { - case types.SchemaBitswap: - var prov types.ReadBitswapProviderRecord - err := json.Unmarshal(upr.Val.Bytes, &prov) - if err != nil { - result.Err = err - return result - } - result.Val = &prov - default: - result.Val = &upr.Val - } - return result - } - - return iter.Map[iter.Result[types.UnknownProviderRecord]](jsonIter, mapFn) -} diff --git a/routing/http/types/ndjson/providers.go b/routing/http/types/ndjson/providers.go new file mode 100644 index 0000000000..6471a6a7bb --- /dev/null +++ b/routing/http/types/ndjson/providers.go @@ -0,0 +1,36 @@ +package ndjson + +import ( + "encoding/json" + "io" + + "github.com/ipfs/boxo/routing/http/types" + "github.com/ipfs/boxo/routing/http/types/iter" +) + +// NewProvidersResponseIter returns an iterator that reads [types.Record] from the given [io.Reader]. +func NewProvidersResponseIter(r io.Reader) iter.Iter[iter.Result[types.Record]] { + jsonIter := iter.FromReaderJSON[types.UnknownRecord](r) + mapFn := func(upr iter.Result[types.UnknownRecord]) iter.Result[types.Record] { + var result iter.Result[types.Record] + if upr.Err != nil { + result.Err = upr.Err + return result + } + switch upr.Val.Schema { + case types.SchemaPeer: + var prov types.PeerRecord + err := json.Unmarshal(upr.Val.Bytes, &prov) + if err != nil { + result.Err = err + return result + } + result.Val = &prov + default: + result.Val = &upr.Val + } + return result + } + + return iter.Map[iter.Result[types.UnknownRecord]](jsonIter, mapFn) +} diff --git a/routing/http/types/provider.go b/routing/http/types/provider.go deleted file mode 100644 index aa311d8ed6..0000000000 --- a/routing/http/types/provider.go +++ /dev/null @@ -1,6 +0,0 @@ -package types - -// ProviderResponse is implemented for any ProviderResponse. It needs to have a Protocol field. -type ProviderResponse interface { - GetSchema() string -} diff --git a/routing/http/types/provider_bitswap.go b/routing/http/types/provider_bitswap.go deleted file mode 100644 index 9add2e39bc..0000000000 --- a/routing/http/types/provider_bitswap.go +++ /dev/null @@ -1,27 +0,0 @@ -package types - -import ( - "github.com/libp2p/go-libp2p/core/peer" -) - -const SchemaBitswap = "bitswap" - -var _ ProviderResponse = &ReadBitswapProviderRecord{} - -// ReadBitswapProviderRecord is a provider result with parameters for bitswap providers -type ReadBitswapProviderRecord struct { - Protocol string - Schema string - ID *peer.ID - Addrs []Multiaddr -} - -func (rbpr *ReadBitswapProviderRecord) GetProtocol() string { - return rbpr.Protocol -} - -func (rbpr *ReadBitswapProviderRecord) GetSchema() string { - return rbpr.Schema -} - -func (*ReadBitswapProviderRecord) IsReadProviderRecord() {} diff --git a/routing/http/types/provider_unknown.go b/routing/http/types/provider_unknown.go deleted file mode 100644 index d398c69884..0000000000 --- a/routing/http/types/provider_unknown.go +++ /dev/null @@ -1,58 +0,0 @@ -package types - -import ( - "encoding/json" - - "github.com/ipfs/boxo/routing/http/internal/drjson" -) - -var _ ProviderResponse = &UnknownProviderRecord{} - -// UnknownProviderRecord is used when we cannot parse the provider record using `GetProtocol` -type UnknownProviderRecord struct { - Protocol string - Schema string - Bytes []byte -} - -func (u *UnknownProviderRecord) GetProtocol() string { - return u.Protocol -} - -func (u *UnknownProviderRecord) GetSchema() string { - return u.Schema -} - -func (u *UnknownProviderRecord) IsReadProviderRecord() {} - -func (u *UnknownProviderRecord) UnmarshalJSON(b []byte) error { - m := map[string]interface{}{} - if err := json.Unmarshal(b, &m); err != nil { - return err - } - - ps, ok := m["Protocol"].(string) - if ok { - u.Protocol = ps - } - schema, ok := m["Schema"].(string) - if ok { - u.Schema = schema - } - - u.Bytes = b - - return nil -} - -func (u UnknownProviderRecord) MarshalJSON() ([]byte, error) { - m := map[string]interface{}{} - err := json.Unmarshal(u.Bytes, &m) - if err != nil { - return nil, err - } - m["Protocol"] = u.Protocol - m["Schema"] = u.Schema - - return drjson.MarshalJSONBytes(m) -} diff --git a/routing/http/types/record.go b/routing/http/types/record.go new file mode 100644 index 0000000000..4a734d5f5b --- /dev/null +++ b/routing/http/types/record.go @@ -0,0 +1,6 @@ +package types + +// Record is implemented for any record. +type Record interface { + GetSchema() string +} diff --git a/routing/http/types/record_peer.go b/routing/http/types/record_peer.go new file mode 100644 index 0000000000..b9035f668f --- /dev/null +++ b/routing/http/types/record_peer.go @@ -0,0 +1,58 @@ +package types + +import ( + "encoding/json" + + "github.com/ipfs/boxo/routing/http/internal/drjson" + "github.com/libp2p/go-libp2p/core/peer" +) + +const SchemaPeer = "peer" + +var _ Record = &PeerRecord{} + +type PeerRecord struct { + Schema string + ID *peer.ID + Addrs []Multiaddr + Protocols []string + Bytes []byte +} + +func (pr *PeerRecord) GetSchema() string { + return pr.Schema +} + +func (pr *PeerRecord) UnmarshalJSON(b []byte) error { + v := struct { + Schema string + ID *peer.ID + Addrs []Multiaddr + Protocols []string + }{} + if err := json.Unmarshal(b, &v); err != nil { + return err + } + + pr.Schema = v.Schema + pr.ID = v.ID + pr.Addrs = v.Addrs + pr.Protocols = v.Protocols + pr.Bytes = b + return nil +} + +func (pr PeerRecord) MarshalJSON() ([]byte, error) { + v := map[string]interface{}{} + if pr.Bytes != nil { + err := json.Unmarshal(pr.Bytes, &v) + if err != nil { + return nil, err + } + } + v["Schema"] = pr.Schema + v["ID"] = pr.ID + v["Addrs"] = pr.Addrs + v["Protocols"] = pr.Protocols + return drjson.MarshalJSONBytes(v) +} diff --git a/routing/http/types/record_unknown.go b/routing/http/types/record_unknown.go new file mode 100644 index 0000000000..36eedca8e4 --- /dev/null +++ b/routing/http/types/record_unknown.go @@ -0,0 +1,43 @@ +package types + +import ( + "encoding/json" + + "github.com/ipfs/boxo/routing/http/internal/drjson" +) + +var _ Record = &UnknownRecord{} + +type UnknownRecord struct { + Schema string + Bytes []byte +} + +func (ur *UnknownRecord) GetSchema() string { + return ur.Schema +} + +func (ur *UnknownRecord) UnmarshalJSON(b []byte) error { + v := struct { + Schema string + }{} + if err := json.Unmarshal(b, &v); err != nil { + return err + } + + ur.Schema = v.Schema + ur.Bytes = b + return nil +} + +func (ur UnknownRecord) MarshalJSON() ([]byte, error) { + m := map[string]interface{}{} + if ur.Bytes != nil { + err := json.Unmarshal(ur.Bytes, &m) + if err != nil { + return nil, err + } + } + m["Schema"] = ur.Schema + return drjson.MarshalJSONBytes(m) +}