diff --git a/pkg/internal/testutil/collectingeventlsubscriber.go b/pkg/internal/testutil/collectingeventlsubscriber.go index 444b95f3..70dddfd1 100644 --- a/pkg/internal/testutil/collectingeventlsubscriber.go +++ b/pkg/internal/testutil/collectingeventlsubscriber.go @@ -3,8 +3,10 @@ package testutil import ( "context" "fmt" + "strings" "sync" "testing" + "time" "github.com/filecoin-project/lassie/pkg/events" "github.com/filecoin-project/lassie/pkg/types" @@ -30,14 +32,29 @@ func (ev *AsyncCollectingEventsListener) Collect(evt types.RetrievalEvent) { } } -func (ev *AsyncCollectingEventsListener) VerifyNextEvents(t *testing.T, expectedEvents []types.RetrievalEvent) { +func (ev *AsyncCollectingEventsListener) VerifyNextEvents(t *testing.T, afterStart time.Duration, expectedEvents []types.RetrievalEvent) { + got := make([]types.EventCode, 0) for i := 0; i < len(expectedEvents); i++ { select { case evt := <-ev.retrievalEventChan: t.Logf("received event: %s", evt) - VerifyContainsCollectedEvent(t, expectedEvents, evt) + got = append(got, VerifyContainsCollectedEvent(t, afterStart, expectedEvents, evt)) case <-ev.ctx.Done(): - require.FailNow(t, "did not receive expected events") + // work out which codes we didn't have + missing := make([]string, 0) + for _, expected := range expectedEvents { + found := false + for _, g := range got { + if g == expected.Code() { + found = true + break + } + } + if !found { + missing = append(missing, string(expected.Code())) + } + } + require.FailNowf(t, "did not receive expected events", "missing: %s", strings.Join(missing, ", ")) } } } @@ -76,7 +93,7 @@ func VerifyCollectedEventTimings(t *testing.T, events []types.RetrievalEvent) { } } -func VerifyContainsCollectedEvent(t *testing.T, expectedList []types.RetrievalEvent, actual types.RetrievalEvent) { +func VerifyContainsCollectedEvent(t *testing.T, afterStart time.Duration, expectedList []types.RetrievalEvent, actual types.RetrievalEvent) types.EventCode { for _, expected := range expectedList { // this matching might need to evolve to be more sophisticated, particularly SP ID if actual.Code() == expected.Code() && @@ -85,11 +102,12 @@ func VerifyContainsCollectedEvent(t *testing.T, expectedList []types.RetrievalEv actual.Phase() == expected.Phase() { if actual.StorageProviderId() == expected.StorageProviderId() { VerifyCollectedEvent(t, actual, expected) - return + return actual.Code() } } } - require.Fail(t, "event not found", actual.Code()) + require.Failf(t, "unexpected event", "got '%s' @ %s", actual.Code(), afterStart) + return "" } func VerifyCollectedEvent(t *testing.T, actual types.RetrievalEvent, expected types.RetrievalEvent) { diff --git a/pkg/internal/testutil/mockcandidatefinder.go b/pkg/internal/testutil/mockcandidatefinder.go index 296bdb2c..ddc10344 100644 --- a/pkg/internal/testutil/mockcandidatefinder.go +++ b/pkg/internal/testutil/mockcandidatefinder.go @@ -3,6 +3,7 @@ package testutil import ( "context" "testing" + "time" "github.com/filecoin-project/lassie/pkg/types" "github.com/ipfs/go-cid" @@ -28,14 +29,14 @@ func NewMockCandidateFinder(err error, candidates map[cid.Cid][]types.RetrievalC } } -func (me *MockCandidateFinder) VerifyCandidatesDiscovered(ctx context.Context, t *testing.T, expectedCandidatesDiscovered []DiscoveredCandidate) { +func (me *MockCandidateFinder) VerifyCandidatesDiscovered(ctx context.Context, t *testing.T, afterStart time.Duration, expectedCandidatesDiscovered []DiscoveredCandidate) { candidatesDiscovered := make([]DiscoveredCandidate, 0, len(expectedCandidatesDiscovered)) for i := 0; i < len(expectedCandidatesDiscovered); i++ { select { case candidate := <-me.discoveredCandidates: candidatesDiscovered = append(candidatesDiscovered, candidate) case <-ctx.Done(): - require.FailNowf(t, "failed to receive expected candidates", "expected %d, received %d", len(expectedCandidatesDiscovered), i) + require.FailNowf(t, "failed to receive expected candidates", "expected %d, received %d @", len(expectedCandidatesDiscovered), i, afterStart) } } require.ElementsMatch(t, expectedCandidatesDiscovered, candidatesDiscovered) diff --git a/pkg/internal/testutil/mockclient.go b/pkg/internal/testutil/mockclient.go index 391bf209..b9824fb0 100644 --- a/pkg/internal/testutil/mockclient.go +++ b/pkg/internal/testutil/mockclient.go @@ -56,7 +56,7 @@ func NewMockClient(connectReturns map[string]DelayedConnectReturn, retrievalRetu } } -func (mc *MockClient) VerifyConnectionsReceived(ctx context.Context, t *testing.T, expectedConnections []peer.ID) { +func (mc *MockClient) VerifyConnectionsReceived(ctx context.Context, t *testing.T, afterStart time.Duration, expectedConnections []peer.ID) { connections := make([]peer.ID, 0, len(expectedConnections)) for i := 0; i < len(expectedConnections); i++ { select { @@ -64,29 +64,35 @@ func (mc *MockClient) VerifyConnectionsReceived(ctx context.Context, t *testing. t.Logf("connecting to peer: %s", connection) connections = append(connections, connection) case <-ctx.Done(): - require.FailNowf(t, "failed to receive expected connections", "expected %d, received %d", len(expectedConnections), i) + require.FailNowf(t, "failed to receive expected connections", "expected %d, received %d @ %s", len(expectedConnections), i, afterStart) } } require.ElementsMatch(t, expectedConnections, connections) } -func (mc *MockClient) VerifyRetrievalsReceived(ctx context.Context, t *testing.T, expectedRetrievals []peer.ID) { +func (mc *MockClient) VerifyRetrievalsReceived(ctx context.Context, t *testing.T, afterStart time.Duration, expectedRetrievals []peer.ID) { retrievals := make([]peer.ID, 0, len(expectedRetrievals)) for i := 0; i < len(expectedRetrievals); i++ { select { case retrieval := <-mc.received_retrievals: retrievals = append(retrievals, retrieval.Peer) case <-ctx.Done(): - require.FailNowf(t, "failed to receive expected retrievals", "expected %d, received %d", len(expectedRetrievals), i) + require.FailNowf(t, "failed to receive expected retrievals", "expected %d, received %d @ %s", len(expectedRetrievals), i, afterStart) } } require.ElementsMatch(t, expectedRetrievals, retrievals) } -func (mc *MockClient) VerifyRetrievalsServed(ctx context.Context, t *testing.T, expectedServed []RemoteStats) { +func (mc *MockClient) VerifyRetrievalsServed(ctx context.Context, t *testing.T, afterStart time.Duration, expectedServed []RemoteStats) { + if len(expectedServed) > 0 { + require.FailNowf(t, "unexpected RetrievalsServed", "@ %s", afterStart) + } } -func (mc *MockClient) VerifyRetrievalsCompleted(ctx context.Context, t *testing.T, expectedRetrievals []peer.ID) { +func (mc *MockClient) VerifyRetrievalsCompleted(ctx context.Context, t *testing.T, afterStart time.Duration, expectedRetrievals []peer.ID) { + if len(expectedRetrievals) > 0 { + require.FailNowf(t, "unexpected RetrievalsCompleted", "@ %s", afterStart) + } } func (mc *MockClient) VerifyReceivedRetrievalFrom(ctx context.Context, t *testing.T, p peer.ID) ClientRetrievalRequest { diff --git a/pkg/internal/testutil/verifier.go b/pkg/internal/testutil/verifier.go index 288d8842..89182493 100644 --- a/pkg/internal/testutil/verifier.go +++ b/pkg/internal/testutil/verifier.go @@ -27,6 +27,7 @@ type RemoteStats struct { Root cid.Cid ByteCount uint64 Blocks []cid.Cid + Err struct{} } type RetrievalVerifier struct { @@ -36,10 +37,10 @@ type RetrievalVerifier struct { type RunRetrieval func(cb func(types.RetrievalEvent)) (*types.RetrievalStats, error) type VerifierClient interface { - VerifyConnectionsReceived(ctx context.Context, t *testing.T, expectedConnections []peer.ID) - VerifyRetrievalsReceived(ctx context.Context, t *testing.T, expectedRetrievals []peer.ID) - VerifyRetrievalsServed(ctx context.Context, t *testing.T, expectedServed []RemoteStats) - VerifyRetrievalsCompleted(ctx context.Context, t *testing.T, expectedRetrievals []peer.ID) + VerifyConnectionsReceived(ctx context.Context, t *testing.T, afterStart time.Duration, expectedConnections []peer.ID) + VerifyRetrievalsReceived(ctx context.Context, t *testing.T, afterStart time.Duration, expectedRetrievals []peer.ID) + VerifyRetrievalsServed(ctx context.Context, t *testing.T, afterStart time.Duration, expectedServed []RemoteStats) + VerifyRetrievalsCompleted(ctx context.Context, t *testing.T, afterStart time.Duration, expectedRetrievals []peer.ID) } func (rv RetrievalVerifier) RunWithVerification(ctx context.Context, @@ -64,15 +65,15 @@ func (rv RetrievalVerifier) RunWithVerification(ctx context.Context, clock.Add(expectedActionsAtTime.AfterStart - currentTime) currentTime = expectedActionsAtTime.AfterStart t.Logf("current time: %s", clock.Now()) - asyncCollectingEventsListener.VerifyNextEvents(t, expectedActionsAtTime.ExpectedEvents) + asyncCollectingEventsListener.VerifyNextEvents(t, expectedActionsAtTime.AfterStart, expectedActionsAtTime.ExpectedEvents) if mockCandidateFinder != nil { - mockCandidateFinder.VerifyCandidatesDiscovered(ctx, t, expectedActionsAtTime.CandidatesDiscovered) + mockCandidateFinder.VerifyCandidatesDiscovered(ctx, t, expectedActionsAtTime.AfterStart, expectedActionsAtTime.CandidatesDiscovered) } if client != nil { - client.VerifyConnectionsReceived(ctx, t, expectedActionsAtTime.ReceivedConnections) - client.VerifyRetrievalsReceived(ctx, t, expectedActionsAtTime.ReceivedRetrievals) - client.VerifyRetrievalsServed(ctx, t, expectedActionsAtTime.ServedRetrievals) - client.VerifyRetrievalsCompleted(ctx, t, expectedActionsAtTime.CompletedRetrievals) + client.VerifyConnectionsReceived(ctx, t, expectedActionsAtTime.AfterStart, expectedActionsAtTime.ReceivedConnections) + client.VerifyRetrievalsReceived(ctx, t, expectedActionsAtTime.AfterStart, expectedActionsAtTime.ReceivedRetrievals) + client.VerifyRetrievalsServed(ctx, t, expectedActionsAtTime.AfterStart, expectedActionsAtTime.ServedRetrievals) + client.VerifyRetrievalsCompleted(ctx, t, expectedActionsAtTime.AfterStart, expectedActionsAtTime.CompletedRetrievals) } } results := make([]types.RetrievalResult, 0, len(runRetrievals)) diff --git a/pkg/retriever/graphsyncretriever.go b/pkg/retriever/graphsyncretriever.go index 99207802..f3ec6b67 100644 --- a/pkg/retriever/graphsyncretriever.go +++ b/pkg/retriever/graphsyncretriever.go @@ -102,7 +102,7 @@ func graphsyncMetadataCompare(a, b *metadata.GraphsyncFilecoinV1, defaultValue b return defaultValue } -func (pg ProtocolGraphsync) CompareCandidates(a, b connectCandidate, mda, mdb metadata.Protocol) bool { +func (pg ProtocolGraphsync) CompareCandidates(a, b ComparableCandidate, mda, mdb metadata.Protocol) bool { gsmda := mda.(*metadata.GraphsyncFilecoinV1) gsmdb := mdb.(*metadata.GraphsyncFilecoinV1) return graphsyncMetadataCompare(gsmda, gsmdb, a.Duration < b.Duration) diff --git a/pkg/retriever/httpretriever.go b/pkg/retriever/httpretriever.go index 377c9632..9467f7a7 100644 --- a/pkg/retriever/httpretriever.go +++ b/pkg/retriever/httpretriever.go @@ -31,23 +31,34 @@ var _ TransportProtocol = &ProtocolHttp{} type ProtocolHttp struct { Client *http.Client - req *http.Request - resp *http.Response + // customCompare is for testing only, and should be removed when we have more + // ways to compare candidates so we can control ordering deterministically in + // our tests. + customCompare func(a, b ComparableCandidate, mda, mdb metadata.Protocol) bool } // NewHttpRetriever makes a new CandidateRetriever for verified CAR HTTP // retrievals (transport-ipfs-gateway-http). func NewHttpRetriever(getStorageProviderTimeout GetStorageProviderTimeout, client *http.Client) types.CandidateRetriever { - return NewHttpRetrieverWithDeps(getStorageProviderTimeout, client, clock.New(), nil) + return NewHttpRetrieverWithDeps(getStorageProviderTimeout, client, clock.New(), nil, 0, nil) } -func NewHttpRetrieverWithDeps(getStorageProviderTimeout GetStorageProviderTimeout, client *http.Client, clock clock.Clock, awaitReceivedCandidates chan<- struct{}) types.CandidateRetriever { +func NewHttpRetrieverWithDeps( + getStorageProviderTimeout GetStorageProviderTimeout, + client *http.Client, + clock clock.Clock, + awaitReceivedCandidates chan<- struct{}, + initialPause time.Duration, + customCompare func(a, b ComparableCandidate, mda, mdb metadata.Protocol) bool, +) types.CandidateRetriever { return ¶llelPeerRetriever{ Protocol: &ProtocolHttp{ - Client: client, + Client: client, + customCompare: customCompare, }, GetStorageProviderTimeout: getStorageProviderTimeout, Clock: clock, + QueueInitialPause: initialPause, awaitReceivedCandidates: awaitReceivedCandidates, } } @@ -60,8 +71,12 @@ func (ph ProtocolHttp) GetMergedMetadata(cid cid.Cid, currentMetadata, newMetada return &metadata.IpfsGatewayHttp{} } -func (ph ProtocolHttp) CompareCandidates(a, b connectCandidate, mda, mdb metadata.Protocol) bool { - // we only have duration .. currently +func (ph ProtocolHttp) CompareCandidates(a, b ComparableCandidate, mda, mdb metadata.Protocol) bool { + if ph.customCompare != nil { + return ph.customCompare(a, b, mda, mdb) + } + // since Connect is a noop, Duration should be ~0; i.e. meaningless since it + // mainly relates to internal timings, including goroutine scheduling. return a.Duration < b.Duration } @@ -69,6 +84,12 @@ func (ph *ProtocolHttp) Connect(ctx context.Context, retrieval *retrieval, candi // We could begin the request here by moving ph.beginRequest() to this function. // That would result in parallel connections to candidates as they are received, // then serial reading of bodies. + // If/when we need to share connection state between a Connect() and Retrieve() + // call, we'll need a shared state that we can pass - either return a Context + // here that we pick up in Retrieve, or have something on `retrieval` that can + // be keyed by `candidate` to do this; or similar. ProtocolHttp is not + // per-connection, it's per-protocol, and `retrieval` is not per-candidate + // either, it's per-retrieval. return nil } @@ -85,14 +106,15 @@ func (ph *ProtocolHttp) Retrieve( // to parallelise connections if we have confidence in not wasting server time // by requesting but not reading bodies (or delayed reading which may result in // timeouts). - if err := ph.beginRequest(ctx, retrieval.request, candidate); err != nil { + resp, err := ph.beginRequest(ctx, retrieval.request, candidate) + if err != nil { return nil, err } - defer ph.resp.Body.Close() + defer resp.Body.Close() var blockBytes uint64 - cbr, err := car.NewBlockReader(ph.resp.Body) + cbr, err := car.NewBlockReader(resp.Body) if err != nil { return nil, err } @@ -141,13 +163,13 @@ func (ph *ProtocolHttp) Retrieve( }, nil } -func (ph *ProtocolHttp) beginRequest(ctx context.Context, request types.RetrievalRequest, candidate types.RetrievalCandidate) error { - var err error - ph.req, err = makeRequest(ctx, request, candidate) +func (ph *ProtocolHttp) beginRequest(ctx context.Context, request types.RetrievalRequest, candidate types.RetrievalCandidate) (resp *http.Response, err error) { + var req *http.Request + req, err = makeRequest(ctx, request, candidate) if err == nil { - ph.resp, err = ph.Client.Do(ph.req) + resp, err = ph.Client.Do(req) } - return err + return resp, err } func makeRequest(ctx context.Context, request types.RetrievalRequest, candidate types.RetrievalCandidate) (*http.Request, error) { diff --git a/pkg/retriever/httpretriever_test.go b/pkg/retriever/httpretriever_test.go index 41f1092c..8143b59f 100644 --- a/pkg/retriever/httpretriever_test.go +++ b/pkg/retriever/httpretriever_test.go @@ -41,6 +41,7 @@ type httpRemote struct { lsys *linking.LinkSystem sel ipld.Node responseDelay time.Duration + malformed bool } func TestHTTPRetriever(t *testing.T) { @@ -71,6 +72,7 @@ func TestHTTPRetriever(t *testing.T) { remoteBlockDuration := 50 * time.Millisecond allSelector := selectorparse.CommonSelector_ExploreAllRecursively getTimeout := func(_ peer.ID) time.Duration { return 5 * time.Second } + initialPause := 10 * time.Millisecond startTime := time.Now().Add(time.Hour) testCases := []struct { @@ -79,13 +81,13 @@ func TestHTTPRetriever(t *testing.T) { requestPath map[cid.Cid]string requestScope map[cid.Cid]types.CarScope remotes map[cid.Cid][]httpRemote - expectedStats map[cid.Cid]types.RetrievalStats + expectedStats map[cid.Cid]*types.RetrievalStats expectedErrors map[cid.Cid]struct{} - expectedCids map[cid.Cid][]cid.Cid + expectedCids map[cid.Cid][]cid.Cid // expected in this order expectSequence []testutil.ExpectedActionsAtTime }{ { - name: "single full fetch, one peer", + name: "single, one peer, success", requests: map[cid.Cid]types.RetrievalID{cid1: rid1}, remotes: map[cid.Cid][]httpRemote{ cid1: { @@ -97,18 +99,16 @@ func TestHTTPRetriever(t *testing.T) { }, }, }, - expectedCids: map[cid.Cid][]cid.Cid{ - cid1: tbc1Cids, - }, - expectedStats: map[cid.Cid]types.RetrievalStats{ + expectedCids: map[cid.Cid][]cid.Cid{cid1: tbc1Cids}, + expectedStats: map[cid.Cid]*types.RetrievalStats{ cid1: { RootCid: cid1, StorageProviderId: cid1Cands[0].MinerPeer.ID, Size: sizeOf(tbc1.AllBlocks()), Blocks: 100, - Duration: 40*time.Millisecond + remoteBlockDuration*100, - AverageSpeed: uint64(float64(sizeOf(tbc1.AllBlocks())) / (40*time.Millisecond + remoteBlockDuration*100).Seconds()), - TimeToFirstByte: 40 * time.Millisecond, + Duration: initialPause + 40*time.Millisecond + remoteBlockDuration*100, + AverageSpeed: uint64(float64(sizeOf(tbc1.AllBlocks())) / (initialPause + 40*time.Millisecond + remoteBlockDuration*100).Seconds()), + TimeToFirstByte: initialPause + 40*time.Millisecond, TotalPayment: big.Zero(), AskPrice: big.Zero(), }, @@ -122,19 +122,19 @@ func TestHTTPRetriever(t *testing.T) { }, }, { - AfterStart: 0, + AfterStart: initialPause, ReceivedRetrievals: []peer.ID{cid1Cands[0].MinerPeer.ID}, }, { - AfterStart: time.Millisecond * 40, + AfterStart: initialPause + time.Millisecond*40, ExpectedEvents: []types.RetrievalEvent{ - events.FirstByte(startTime.Add(time.Millisecond*40), rid1, startTime, toCandidate(cid1, cid1Cands[0].MinerPeer)), + events.FirstByte(startTime.Add(initialPause+time.Millisecond*40), rid1, startTime, toCandidate(cid1, cid1Cands[0].MinerPeer)), }, }, { - AfterStart: time.Millisecond*40 + remoteBlockDuration*100, + AfterStart: initialPause + time.Millisecond*40 + remoteBlockDuration*100, ExpectedEvents: []types.RetrievalEvent{ - events.Success(startTime.Add(time.Millisecond*40+remoteBlockDuration*100), rid1, startTime, toCandidate(cid1, cid1Cands[0].MinerPeer), sizeOf(tbc2.AllBlocks()), 100, 40*time.Millisecond+remoteBlockDuration*100, big.Zero(), 0), + events.Success(startTime.Add(initialPause+time.Millisecond*40+remoteBlockDuration*100), rid1, startTime, toCandidate(cid1, cid1Cands[0].MinerPeer), sizeOf(tbc2.AllBlocks()), 100, initialPause+40*time.Millisecond+remoteBlockDuration*100, big.Zero(), 0), }, ServedRetrievals: []testutil.RemoteStats{ { @@ -149,7 +149,7 @@ func TestHTTPRetriever(t *testing.T) { }, }, { - name: "two parallel full fetch, small offset, one peer each", + name: "two parallel, one peer each, success", requests: map[cid.Cid]types.RetrievalID{cid1: rid1, cid2: rid2}, remotes: map[cid.Cid][]httpRemote{ cid1: { @@ -169,19 +169,16 @@ func TestHTTPRetriever(t *testing.T) { }, }, }, - expectedCids: map[cid.Cid][]cid.Cid{ - cid1: tbc1Cids, - cid2: tbc2Cids, - }, - expectedStats: map[cid.Cid]types.RetrievalStats{ + expectedCids: map[cid.Cid][]cid.Cid{cid1: tbc1Cids, cid2: tbc2Cids}, + expectedStats: map[cid.Cid]*types.RetrievalStats{ cid1: { RootCid: cid1, StorageProviderId: cid1Cands[0].MinerPeer.ID, Size: sizeOf(tbc1.AllBlocks()), Blocks: 100, - Duration: 40*time.Millisecond + remoteBlockDuration*100, - AverageSpeed: uint64(float64(sizeOf(tbc1.AllBlocks())) / (40*time.Millisecond + remoteBlockDuration*100).Seconds()), - TimeToFirstByte: 40 * time.Millisecond, + Duration: initialPause + 40*time.Millisecond + remoteBlockDuration*100, + AverageSpeed: uint64(float64(sizeOf(tbc1.AllBlocks())) / (initialPause + 40*time.Millisecond + remoteBlockDuration*100).Seconds()), + TimeToFirstByte: initialPause + 40*time.Millisecond, TotalPayment: big.Zero(), AskPrice: big.Zero(), }, @@ -190,9 +187,9 @@ func TestHTTPRetriever(t *testing.T) { StorageProviderId: cid2Cands[0].MinerPeer.ID, Size: sizeOf(tbc2.AllBlocks()), Blocks: 100, - Duration: 10*time.Millisecond + remoteBlockDuration*100, - AverageSpeed: uint64(float64(sizeOf(tbc2.AllBlocks())) / (10*time.Millisecond + remoteBlockDuration*100).Seconds()), - TimeToFirstByte: 10 * time.Millisecond, + Duration: initialPause + 10*time.Millisecond + remoteBlockDuration*100, + AverageSpeed: uint64(float64(sizeOf(tbc2.AllBlocks())) / (initialPause + 10*time.Millisecond + remoteBlockDuration*100).Seconds()), + TimeToFirstByte: initialPause + 10*time.Millisecond, TotalPayment: big.Zero(), AskPrice: big.Zero(), }, @@ -208,25 +205,25 @@ func TestHTTPRetriever(t *testing.T) { }, }, { - AfterStart: 0, + AfterStart: initialPause, ReceivedRetrievals: []peer.ID{cid1Cands[0].MinerPeer.ID, cid2Cands[0].MinerPeer.ID}, }, { - AfterStart: time.Millisecond * 10, + AfterStart: initialPause + time.Millisecond*10, ExpectedEvents: []types.RetrievalEvent{ - events.FirstByte(startTime.Add(time.Millisecond*10), rid2, startTime, toCandidate(cid2, cid2Cands[0].MinerPeer)), + events.FirstByte(startTime.Add(initialPause+time.Millisecond*10), rid2, startTime, toCandidate(cid2, cid2Cands[0].MinerPeer)), }, }, { - AfterStart: time.Millisecond * 40, + AfterStart: initialPause + time.Millisecond*40, ExpectedEvents: []types.RetrievalEvent{ - events.FirstByte(startTime.Add(time.Millisecond*40), rid1, startTime, toCandidate(cid1, cid1Cands[0].MinerPeer)), + events.FirstByte(startTime.Add(initialPause+time.Millisecond*40), rid1, startTime, toCandidate(cid1, cid1Cands[0].MinerPeer)), }, }, { - AfterStart: time.Millisecond*10 + remoteBlockDuration*100, + AfterStart: initialPause + time.Millisecond*10 + remoteBlockDuration*100, ExpectedEvents: []types.RetrievalEvent{ - events.Success(startTime.Add(time.Millisecond*10+remoteBlockDuration*100), rid2, startTime, toCandidate(cid2, cid2Cands[0].MinerPeer), sizeOf(tbc2.AllBlocks()), 100, 10*time.Millisecond+remoteBlockDuration*100, big.Zero(), 0), + events.Success(startTime.Add(initialPause+time.Millisecond*10+remoteBlockDuration*100), rid2, startTime, toCandidate(cid2, cid2Cands[0].MinerPeer), sizeOf(tbc2.AllBlocks()), 100, initialPause+10*time.Millisecond+remoteBlockDuration*100, big.Zero(), 0), }, ServedRetrievals: []testutil.RemoteStats{ { @@ -239,9 +236,9 @@ func TestHTTPRetriever(t *testing.T) { CompletedRetrievals: []peer.ID{cid2Cands[0].MinerPeer.ID}, }, { - AfterStart: time.Millisecond*40 + remoteBlockDuration*100, + AfterStart: initialPause + time.Millisecond*40 + remoteBlockDuration*100, ExpectedEvents: []types.RetrievalEvent{ - events.Success(startTime.Add(time.Millisecond*40+remoteBlockDuration*100), rid1, startTime, toCandidate(cid1, cid1Cands[0].MinerPeer), sizeOf(tbc2.AllBlocks()), 100, 40*time.Millisecond+remoteBlockDuration*100, big.Zero(), 0), + events.Success(startTime.Add(initialPause+time.Millisecond*40+remoteBlockDuration*100), rid1, startTime, toCandidate(cid1, cid1Cands[0].MinerPeer), sizeOf(tbc2.AllBlocks()), 100, initialPause+40*time.Millisecond+remoteBlockDuration*100, big.Zero(), 0), }, ServedRetrievals: []testutil.RemoteStats{ { @@ -255,12 +252,286 @@ func TestHTTPRetriever(t *testing.T) { }, }, }, + { + name: "single, multiple errors", + requests: map[cid.Cid]types.RetrievalID{cid1: rid1}, + remotes: map[cid.Cid][]httpRemote{ + cid1: { + { + peer: cid1Cands[0].MinerPeer, + lsys: makeLsys(nil), + sel: allSelector, + responseDelay: time.Millisecond * 10, + malformed: true, + }, + { + peer: cid1Cands[1].MinerPeer, + lsys: makeLsys(nil), + sel: allSelector, + responseDelay: time.Millisecond * 10, + malformed: true, + }, + { + peer: cid1Cands[2].MinerPeer, + lsys: makeLsys(nil), + sel: allSelector, + responseDelay: time.Millisecond * 10, + malformed: true, + }, + }, + }, + expectedErrors: map[cid.Cid]struct{}{ + cid1: {}, + }, + expectSequence: []testutil.ExpectedActionsAtTime{ + { + AfterStart: 0, + ExpectedEvents: []types.RetrievalEvent{ + events.Started(startTime, rid1, startTime, types.RetrievalPhase, toCandidate(cid1, cid1Cands[0].MinerPeer)), + events.Started(startTime, rid1, startTime, types.RetrievalPhase, toCandidate(cid1, cid1Cands[1].MinerPeer)), + events.Started(startTime, rid1, startTime, types.RetrievalPhase, toCandidate(cid1, cid1Cands[2].MinerPeer)), + events.Connected(startTime, rid1, startTime, types.RetrievalPhase, toCandidate(cid1, cid1Cands[0].MinerPeer)), + events.Connected(startTime, rid1, startTime, types.RetrievalPhase, toCandidate(cid1, cid1Cands[1].MinerPeer)), + events.Connected(startTime, rid1, startTime, types.RetrievalPhase, toCandidate(cid1, cid1Cands[2].MinerPeer)), + }, + }, + { + AfterStart: initialPause, + ReceivedRetrievals: []peer.ID{cid1Cands[0].MinerPeer.ID}, + }, + { + AfterStart: initialPause + time.Millisecond*10, + ExpectedEvents: []types.RetrievalEvent{ + events.Failed(startTime.Add(initialPause+time.Millisecond*10), rid1, startTime, types.RetrievalPhase, toCandidate(cid1, cid1Cands[0].MinerPeer), "unexpected EOF"), + }, + CompletedRetrievals: []peer.ID{cid1Cands[0].MinerPeer.ID}, + ReceivedRetrievals: []peer.ID{cid1Cands[1].MinerPeer.ID}, + ServedRetrievals: []testutil.RemoteStats{ + { + Peer: cid1Cands[0].MinerPeer.ID, + Root: cid1, + ByteCount: 0, + Blocks: []cid.Cid{}, + }, + }, + }, + { + AfterStart: initialPause + time.Millisecond*20, + ExpectedEvents: []types.RetrievalEvent{ + events.Failed(startTime.Add(initialPause+time.Millisecond*20), rid1, startTime, types.RetrievalPhase, toCandidate(cid1, cid1Cands[1].MinerPeer), "unexpected EOF"), + }, + CompletedRetrievals: []peer.ID{cid1Cands[1].MinerPeer.ID}, + ReceivedRetrievals: []peer.ID{cid1Cands[2].MinerPeer.ID}, + ServedRetrievals: []testutil.RemoteStats{ + { + Peer: cid1Cands[1].MinerPeer.ID, + Root: cid1, + ByteCount: 0, + Blocks: []cid.Cid{}, + }, + }, + }, + { + AfterStart: initialPause + time.Millisecond*30, + ExpectedEvents: []types.RetrievalEvent{ + events.Failed(startTime.Add(initialPause+time.Millisecond*30), rid1, startTime, types.RetrievalPhase, toCandidate(cid1, cid1Cands[2].MinerPeer), "unexpected EOF"), + }, + CompletedRetrievals: []peer.ID{cid1Cands[2].MinerPeer.ID}, + ServedRetrievals: []testutil.RemoteStats{ + { + Peer: cid1Cands[2].MinerPeer.ID, + Root: cid1, + ByteCount: 0, + Blocks: []cid.Cid{}, + }, + }, + }, + }, + }, + { + name: "single, multiple errors, one success", + requests: map[cid.Cid]types.RetrievalID{cid1: rid1}, + remotes: map[cid.Cid][]httpRemote{ + cid1: { + { + peer: cid1Cands[0].MinerPeer, + lsys: makeLsys(nil), + sel: allSelector, + responseDelay: time.Millisecond * 10, + malformed: true, + }, + { + peer: cid1Cands[1].MinerPeer, + lsys: makeLsys(nil), + sel: allSelector, + responseDelay: time.Millisecond * 10, + malformed: true, + }, + { + peer: cid1Cands[2].MinerPeer, + lsys: makeLsys(tbc1.AllBlocks()), + sel: allSelector, + responseDelay: time.Millisecond * 10, + }, + }, + }, + expectedCids: map[cid.Cid][]cid.Cid{cid1: tbc1Cids}, + expectedStats: map[cid.Cid]*types.RetrievalStats{ + cid1: { + RootCid: cid1, + StorageProviderId: cid1Cands[2].MinerPeer.ID, + Size: sizeOf(tbc1.AllBlocks()), + Blocks: 100, + Duration: initialPause + 30*time.Millisecond + remoteBlockDuration*100, + AverageSpeed: uint64(float64(sizeOf(tbc1.AllBlocks())) / (initialPause + 30*time.Millisecond + remoteBlockDuration*100).Seconds()), + TimeToFirstByte: initialPause + 30*time.Millisecond, + TotalPayment: big.Zero(), + AskPrice: big.Zero(), + }, + }, + expectSequence: []testutil.ExpectedActionsAtTime{ + { + AfterStart: 0, + ExpectedEvents: []types.RetrievalEvent{ + events.Started(startTime, rid1, startTime, types.RetrievalPhase, toCandidate(cid1, cid1Cands[0].MinerPeer)), + events.Started(startTime, rid1, startTime, types.RetrievalPhase, toCandidate(cid1, cid1Cands[1].MinerPeer)), + events.Started(startTime, rid1, startTime, types.RetrievalPhase, toCandidate(cid1, cid1Cands[2].MinerPeer)), + events.Connected(startTime, rid1, startTime, types.RetrievalPhase, toCandidate(cid1, cid1Cands[0].MinerPeer)), + events.Connected(startTime, rid1, startTime, types.RetrievalPhase, toCandidate(cid1, cid1Cands[1].MinerPeer)), + events.Connected(startTime, rid1, startTime, types.RetrievalPhase, toCandidate(cid1, cid1Cands[2].MinerPeer)), + }, + }, + { + AfterStart: initialPause, + ReceivedRetrievals: []peer.ID{cid1Cands[0].MinerPeer.ID}, + }, + { + AfterStart: initialPause + time.Millisecond*10, + ExpectedEvents: []types.RetrievalEvent{ + events.Failed(startTime.Add(initialPause+time.Millisecond*10), rid1, startTime, types.RetrievalPhase, toCandidate(cid1, cid1Cands[0].MinerPeer), "unexpected EOF"), + }, + CompletedRetrievals: []peer.ID{cid1Cands[0].MinerPeer.ID}, + ReceivedRetrievals: []peer.ID{cid1Cands[1].MinerPeer.ID}, + ServedRetrievals: []testutil.RemoteStats{ + { + Peer: cid1Cands[0].MinerPeer.ID, + Root: cid1, + ByteCount: 0, + Blocks: []cid.Cid{}, + }, + }, + }, + { + AfterStart: initialPause + time.Millisecond*20, + ExpectedEvents: []types.RetrievalEvent{ + events.Failed(startTime.Add(initialPause+time.Millisecond*20), rid1, startTime, types.RetrievalPhase, toCandidate(cid1, cid1Cands[1].MinerPeer), "unexpected EOF"), + }, + CompletedRetrievals: []peer.ID{cid1Cands[1].MinerPeer.ID}, + ReceivedRetrievals: []peer.ID{cid1Cands[2].MinerPeer.ID}, + ServedRetrievals: []testutil.RemoteStats{ + { + Peer: cid1Cands[1].MinerPeer.ID, + Root: cid1, + ByteCount: 0, + Blocks: []cid.Cid{}, + }, + }, + }, + { + AfterStart: initialPause + time.Millisecond*30, + ExpectedEvents: []types.RetrievalEvent{ + events.FirstByte(startTime.Add(initialPause+time.Millisecond*30), rid1, startTime, toCandidate(cid1, cid1Cands[2].MinerPeer)), + }, + }, + { + AfterStart: initialPause + time.Millisecond*30 + remoteBlockDuration*100, + ExpectedEvents: []types.RetrievalEvent{ + events.Success(startTime.Add(initialPause+time.Millisecond*30+remoteBlockDuration*100), rid1, startTime, toCandidate(cid1, cid1Cands[2].MinerPeer), sizeOf(tbc2.AllBlocks()), 100, initialPause+30*time.Millisecond+remoteBlockDuration*100, big.Zero(), 0), + }, + CompletedRetrievals: []peer.ID{cid1Cands[2].MinerPeer.ID}, + ServedRetrievals: []testutil.RemoteStats{ + { + Peer: cid1Cands[2].MinerPeer.ID, + Root: cid1, + ByteCount: sizeOf(tbc1.AllBlocks()), + Blocks: tbc1Cids, + }, + }, + }, + }, + }, + // TODO: this test demonstrates the incompleteness of the http implementation - it's counted + // as a success and we only signal an "error" because the selector on the server errors but + // that in no way carries over to the client. + { + name: "single, one peer, partial served", + requests: map[cid.Cid]types.RetrievalID{cid1: rid1}, + remotes: map[cid.Cid][]httpRemote{ + cid1: { + { + peer: cid1Cands[0].MinerPeer, + lsys: makeLsys(tbc1.AllBlocks()[0:50]), + sel: allSelector, + responseDelay: time.Millisecond * 40, + }, + }, + }, + expectedCids: map[cid.Cid][]cid.Cid{cid1: tbc1Cids[0:50]}, + expectedStats: map[cid.Cid]*types.RetrievalStats{ + cid1: { + RootCid: cid1, + StorageProviderId: cid1Cands[0].MinerPeer.ID, + Size: sizeOf(tbc1.AllBlocks()[0:50]), + Blocks: 50, + Duration: initialPause + 40*time.Millisecond + remoteBlockDuration*50, + AverageSpeed: uint64(float64(sizeOf(tbc1.AllBlocks()[0:50])) / (initialPause + 40*time.Millisecond + remoteBlockDuration*50).Seconds()), + TimeToFirstByte: initialPause + 40*time.Millisecond, + TotalPayment: big.Zero(), + AskPrice: big.Zero(), + }, + }, + expectSequence: []testutil.ExpectedActionsAtTime{ + { + AfterStart: 0, + ExpectedEvents: []types.RetrievalEvent{ + events.Started(startTime, rid1, startTime, types.RetrievalPhase, toCandidate(cid1, cid1Cands[0].MinerPeer)), + events.Connected(startTime, rid1, startTime, types.RetrievalPhase, toCandidate(cid1, cid1Cands[0].MinerPeer)), + }, + }, + { + AfterStart: initialPause, + ReceivedRetrievals: []peer.ID{cid1Cands[0].MinerPeer.ID}, + }, + { + AfterStart: initialPause + time.Millisecond*40, + ExpectedEvents: []types.RetrievalEvent{ + events.FirstByte(startTime.Add(initialPause+time.Millisecond*40), rid1, startTime, toCandidate(cid1, cid1Cands[0].MinerPeer)), + }, + }, + { + AfterStart: initialPause + time.Millisecond*40 + remoteBlockDuration*50, + ExpectedEvents: []types.RetrievalEvent{ + events.Success(startTime.Add(initialPause+time.Millisecond*40+remoteBlockDuration*50), rid1, startTime, toCandidate(cid1, cid1Cands[0].MinerPeer), sizeOf(tbc2.AllBlocks()[0:50]), 50, initialPause+40*time.Millisecond+remoteBlockDuration*50, big.Zero(), 0), + }, + ServedRetrievals: []testutil.RemoteStats{ + { + Peer: cid1Cands[0].MinerPeer.ID, + Root: cid1, + ByteCount: sizeOf(tbc1.AllBlocks()[0:50]), + Blocks: tbc1Cids[0:50], + Err: struct{}{}, + }, + }, + CompletedRetrievals: []peer.ID{cid1Cands[0].MinerPeer.ID}, + }, + }, + }, } for _, testCase := range testCases { testCase := testCase t.Run(testCase.name, func(t *testing.T) { - // TODO: t.Parallel() + t.Parallel() req := require.New(t) ctx, cancel := context.WithTimeout(ctx, 5*time.Second) @@ -285,23 +556,55 @@ func TestHTTPRetriever(t *testing.T) { roundTripper := NewCannedBytesRoundTripper(t, ctx, clock, remoteBlockDuration, testCase.requestPath, testCase.requestScope, getRemote) defer roundTripper.Close() client := &http.Client{Transport: roundTripper} - retriever := retriever.NewHttpRetrieverWithDeps(getTimeout, client, clock, awaitReceivedCandidates) + // customCompare lets us order candidates when they queue, since we currently + // have no other way to deterministically order them for testing. + customCompare := func(a, b retriever.ComparableCandidate, mda, mdb metadata.Protocol) bool { + for _, c := range cid1Cands { + if c.MinerPeer.ID == a.PeerID { + return true + } + if c.MinerPeer.ID == b.PeerID { + return false + } + } + for _, c := range cid2Cands { + if c.MinerPeer.ID == a.PeerID { + return true + } + if c.MinerPeer.ID == b.PeerID { + return false + } + } + return false + } + retriever := retriever.NewHttpRetrieverWithDeps(getTimeout, client, clock, awaitReceivedCandidates, initialPause, customCompare) + blockAccounting := make([]*blockAccounter, 0) + expectedCids := make([][]cid.Cid, 0) retrievals := make([]testutil.RunRetrieval, 0) - expectedStats := make([]types.RetrievalStats, 0) - for cid, rid := range testCase.requests { - cid := cid + expectedStats := make([]*types.RetrievalStats, 0) + expectedErrors := make([]struct{}, 0) + for c, rid := range testCase.requests { + c := c rid := rid - expectedStats = append(expectedStats, testCase.expectedStats[cid]) + ec := testCase.expectedCids[c] + if ec == nil { + ec = []cid.Cid{} + } + expectedCids = append(expectedCids, ec) + expectedStats = append(expectedStats, testCase.expectedStats[c]) + expectedErrors = append(expectedErrors, testCase.expectedErrors[c]) + lsys := makeLsys(nil) + blockAccounting = append(blockAccounting, NewBlockAccounter(lsys)) retrievals = append(retrievals, func(eventsCb func(types.RetrievalEvent)) (*types.RetrievalStats, error) { request := types.RetrievalRequest{ RetrievalID: rid, - Cid: cid, - LinkSystem: *makeLsys(nil), - Path: testCase.requestPath[cid], - Scope: testCase.requestScope[cid], + Cid: c, + LinkSystem: *lsys, + Path: testCase.requestPath[c], + Scope: testCase.requestScope[c], } - candidates := toCandidates(cid, testCase.remotes[cid]) + candidates := toCandidates(c, testCase.remotes[c]) return retriever.Retrieve(context.Background(), request, eventsCb). RetrieveFromAsyncCandidates(makeAsyncCandidates(t, candidates)) }) @@ -311,15 +614,20 @@ func TestHTTPRetriever(t *testing.T) { ExpectedSequence: testCase.expectSequence, }.RunWithVerification(ctx, t, clock, roundTripper, nil, retrievals) - require.Len(t, results, len(testCase.requests)) - actualStats := make([]types.RetrievalStats, 0) - for _, result := range results { - stats, err := result.Stats, result.Err - require.NoError(t, err) - require.NotNil(t, stats) - actualStats = append(actualStats, *stats) + req.Len(results, len(testCase.requests)) + actualStats := make([]*types.RetrievalStats, len(results)) + actualErrors := make([]struct{}, len(results)) + actualCids := make([][]cid.Cid, len(results)) + for i, result := range results { + actualStats[i] = result.Stats + if result.Err != nil { + actualErrors[i] = struct{}{} + } + actualCids[i] = blockAccounting[i].cids } - require.ElementsMatch(t, expectedStats, actualStats) + req.ElementsMatch(expectedStats, actualStats) + req.ElementsMatch(expectedErrors, actualErrors) + req.Equal(expectedCids, actualCids) }) } } @@ -336,6 +644,28 @@ func toCandidate(root cid.Cid, peer peer.AddrInfo) types.RetrievalCandidate { return types.NewRetrievalCandidate(peer.ID, peer.Addrs, root, &metadata.IpfsGatewayHttp{}) } +type blockAccounter struct { + cids []cid.Cid + bwo linking.BlockWriteOpener +} + +func NewBlockAccounter(lsys *linking.LinkSystem) *blockAccounter { + ba := &blockAccounter{ + cids: make([]cid.Cid, 0), + bwo: lsys.StorageWriteOpener, + } + lsys.StorageWriteOpener = ba.StorageWriteOpener + return ba +} + +func (ba *blockAccounter) StorageWriteOpener(lctx linking.LinkContext) (io.Writer, linking.BlockWriteCommitter, error) { + w, wc, err := ba.bwo(lctx) + return w, func(l datamodel.Link) error { + ba.cids = append(ba.cids, l.(cidlink.Link).Cid) + return wc(l) + }, err +} + type cannedBytesRoundTripper struct { StartsCh chan peer.ID StatsCh chan testutil.RemoteStats @@ -406,7 +736,7 @@ func (c *cannedBytesRoundTripper) RoundTrip(req *http.Request) (*http.Response, makeBody := func(root cid.Cid, maddr string) io.ReadCloser { carR, carW := io.Pipe() - statsCh := traverseCar(c.t, c.ctx, remote.peer.ID, c.clock, c.remoteBlockDuration, carW, remote.lsys, root, remote.sel) + statsCh := traverseCar(c.t, c.ctx, remote.peer.ID, c.clock, c.remoteBlockDuration, carW, remote.malformed, remote.lsys, root, remote.sel) go func() { select { case <-c.ctx.Done(): @@ -428,44 +758,46 @@ func (c *cannedBytesRoundTripper) RoundTrip(req *http.Request) (*http.Response, }, nil } -func (c *cannedBytesRoundTripper) VerifyConnectionsReceived(ctx context.Context, t *testing.T, expectedConnections []peer.ID) { - // connection is currently a noop +func (c *cannedBytesRoundTripper) VerifyConnectionsReceived(ctx context.Context, t *testing.T, afterStart time.Duration, expectedConnections []peer.ID) { + if len(expectedConnections) > 0 { + require.FailNowf(t, "unexpected ConnectionsReceived", "@ %s", afterStart) + } } -func (c *cannedBytesRoundTripper) VerifyRetrievalsReceived(ctx context.Context, t *testing.T, expectedRetrievals []peer.ID) { +func (c *cannedBytesRoundTripper) VerifyRetrievalsReceived(ctx context.Context, t *testing.T, afterStart time.Duration, expectedRetrievals []peer.ID) { retrievals := make([]peer.ID, 0, len(expectedRetrievals)) for i := 0; i < len(expectedRetrievals); i++ { select { case retrieval := <-c.StartsCh: retrievals = append(retrievals, retrieval) case <-ctx.Done(): - require.FailNowf(t, "failed to receive expected retrievals", "expected %d, received %d", len(expectedRetrievals), i) + require.FailNowf(t, "failed to receive expected retrievals", "expected %d, received %d @ %s", len(expectedRetrievals), i, afterStart) } } require.ElementsMatch(t, expectedRetrievals, retrievals) } -func (c *cannedBytesRoundTripper) VerifyRetrievalsServed(ctx context.Context, t *testing.T, expectedServed []testutil.RemoteStats) { +func (c *cannedBytesRoundTripper) VerifyRetrievalsServed(ctx context.Context, t *testing.T, afterStart time.Duration, expectedServed []testutil.RemoteStats) { remoteStats := make([]testutil.RemoteStats, 0, len(expectedServed)) for i := 0; i < len(expectedServed); i++ { select { case stats := <-c.StatsCh: remoteStats = append(remoteStats, stats) case <-ctx.Done(): - require.FailNowf(t, "failed to receive expected served", "expected %d, received %d", len(expectedServed), i) + require.FailNowf(t, "failed to receive expected served", "expected %d, received %d @ %s", len(expectedServed), i, afterStart) } } require.ElementsMatch(t, expectedServed, remoteStats) } -func (c *cannedBytesRoundTripper) VerifyRetrievalsCompleted(ctx context.Context, t *testing.T, expectedRetrievals []peer.ID) { +func (c *cannedBytesRoundTripper) VerifyRetrievalsCompleted(ctx context.Context, t *testing.T, afterStart time.Duration, expectedRetrievals []peer.ID) { retrievals := make([]peer.ID, 0, len(expectedRetrievals)) for i := 0; i < len(expectedRetrievals); i++ { select { case retrieval := <-c.EndsCh: retrievals = append(retrievals, retrieval) case <-ctx.Done(): - require.FailNowf(t, "failed to complete expected retrievals", "expected %d, received %d", len(expectedRetrievals), i) + require.FailNowf(t, "failed to complete expected retrievals", "expected %d, received %d @ %s", len(expectedRetrievals), i, afterStart) } } require.ElementsMatch(t, expectedRetrievals, retrievals) @@ -507,12 +839,14 @@ func (d *deferredReader) Close() error { // given a writer (carW), a linkSystem, a root CID and a selector, traverse the graph // and write the blocks in CARv1 format to the writer. Return a channel that will // receive basic stats on what was written _after_ the write is finished. -func traverseCar(t *testing.T, +func traverseCar( + t *testing.T, ctx context.Context, id peer.ID, clock *clock.Mock, blockDuration time.Duration, carW io.WriteCloser, + malformed bool, lsys *linking.LinkSystem, root cid.Cid, selNode ipld.Node, @@ -530,6 +864,16 @@ func traverseCar(t *testing.T, Blocks: make([]cid.Cid, 0), } + defer func() { + statsCh <- stats + req.NoError(carW.Close()) + }() + + if malformed { + carW.Write([]byte("nope, this is not what you're looking for")) + return + } + // instantiating this writes a CARv1 header and waits for more Put()s carWriter, err := storage.NewWritable(carW, []cid.Cid{root}, car.WriteAsCarV1(true), car.AllowDuplicatePuts(false)) req.NoError(err) @@ -539,7 +883,6 @@ func traverseCar(t *testing.T, // to the CARv1 writer. originalSRO := lsys.StorageReadOpener lsys.StorageReadOpener = func(lc linking.LinkContext, lnk datamodel.Link) (io.Reader, error) { - stats.Blocks = append(stats.Blocks, lnk.(cidlink.Link).Cid) r, err := originalSRO(lc, lnk) if err != nil { return nil, err @@ -550,6 +893,7 @@ func traverseCar(t *testing.T, } err = carWriter.Put(ctx, lnk.(cidlink.Link).Cid.KeyString(), byts) req.NoError(err) + stats.Blocks = append(stats.Blocks, lnk.(cidlink.Link).Cid) stats.ByteCount += uint64(len(byts)) // only the length of the bytes, not the rest of the CAR infrastructure clock.Sleep(blockDuration) return bytes.NewReader(byts), nil @@ -559,18 +903,21 @@ func traverseCar(t *testing.T, // the traverser won't load it (we feed the traverser the rood _node_ // not the link) rootNode, err := lsys.Load(linking.LinkContext{}, cidlink.Link{Cid: root}, basicnode.Prototype.Any) - req.NoError(err) - // begin traversal - traversal.Progress{ - Cfg: &traversal.Config{ - Ctx: ctx, - LinkSystem: *lsys, - LinkTargetNodePrototypeChooser: basicnode.Chooser, - }, - }.WalkAdv(rootNode, sel, func(p traversal.Progress, n datamodel.Node, vr traversal.VisitReason) error { return nil }) - req.NoError(carW.Close()) - - statsCh <- stats + if err != nil { + stats.Err = struct{}{} + } else { + // begin traversal + err := traversal.Progress{ + Cfg: &traversal.Config{ + Ctx: ctx, + LinkSystem: *lsys, + LinkTargetNodePrototypeChooser: basicnode.Chooser, + }, + }.WalkAdv(rootNode, sel, func(p traversal.Progress, n datamodel.Node, vr traversal.VisitReason) error { return nil }) + if err != nil { + stats.Err = struct{}{} + } + } }() return statsCh } diff --git a/pkg/retriever/parallelpeerretriever.go b/pkg/retriever/parallelpeerretriever.go index 5bbe3bf1..60173ebc 100644 --- a/pkg/retriever/parallelpeerretriever.go +++ b/pkg/retriever/parallelpeerretriever.go @@ -26,7 +26,7 @@ type GetStorageProviderTimeout func(peer peer.ID) time.Duration type TransportProtocol interface { Code() multicodec.Code GetMergedMetadata(cid cid.Cid, currentMetadata, newMetadata metadata.Protocol) metadata.Protocol - CompareCandidates(a, b connectCandidate, mda, mdb metadata.Protocol) bool + CompareCandidates(a, b ComparableCandidate, mda, mdb metadata.Protocol) bool Connect(ctx context.Context, retrieval *retrieval, candidate types.RetrievalCandidate) error Retrieve( ctx context.Context, @@ -79,14 +79,14 @@ type retrievalResult struct { Err error } -// connectCandidate is used for the prioritywaitqueue -type connectCandidate struct { +// ComparableCandidate is used for the prioritywaitqueue +type ComparableCandidate struct { PeerID peer.ID Duration time.Duration } type retrievalSession struct { - waitQueue prioritywaitqueue.PriorityWaitQueue[connectCandidate] + waitQueue prioritywaitqueue.PriorityWaitQueue[ComparableCandidate] resultChan chan retrievalResult finishChan chan struct{} } @@ -117,9 +117,9 @@ func (cfg *parallelPeerRetriever) Retrieve( func (retrieval *retrieval) RetrieveFromAsyncCandidates(asyncCandidates types.InboundAsyncCandidates) (*types.RetrievalStats, error) { ctx, cancelCtx := context.WithCancel(retrieval.ctx) - pwqOpts := []prioritywaitqueue.Option[connectCandidate]{prioritywaitqueue.WithClock[connectCandidate](retrieval.Clock)} + pwqOpts := []prioritywaitqueue.Option[ComparableCandidate]{prioritywaitqueue.WithClock[ComparableCandidate](retrieval.Clock)} if retrieval.QueueInitialPause > 0 { - pwqOpts = append(pwqOpts, prioritywaitqueue.WithInitialPause[connectCandidate](retrieval.QueueInitialPause)) + pwqOpts = append(pwqOpts, prioritywaitqueue.WithInitialPause[ComparableCandidate](retrieval.QueueInitialPause)) } session := &retrievalSession{ @@ -169,11 +169,11 @@ func (retrieval *retrieval) RetrieveFromAsyncCandidates(asyncCandidates types.In return stats, err } -// candidateCompare compares two connectCandidates and returns true if the first is +// candidateCompare compares two ComparableCandidates and returns true if the first is // preferable to the second. This is used for the PriorityWaitQueue that will // prioritise execution of retrievals if two candidates are available to compare // at the same time. -func (retrieval *retrieval) candidateCompare(a, b connectCandidate) bool { +func (retrieval *retrieval) candidateCompare(a, b ComparableCandidate) bool { retrieval.candidateMetdataLk.RLock() defer retrieval.candidateMetdataLk.RUnlock() @@ -295,7 +295,7 @@ func (retrieval *retrieval) runRetrievalCandidate( session.sendEvent(events.Connected(retrieval.parallelPeerRetriever.Clock.Now(), retrieval.request.RetrievalID, phaseStartTime, types.RetrievalPhase, candidate)) // Form a queue and run retrieval in serial - done = session.waitQueue.Wait(connectCandidate{ + done = session.waitQueue.Wait(ComparableCandidate{ PeerID: candidate.MinerPeer.ID, Duration: retrieval.parallelPeerRetriever.Clock.Now().Sub(phaseStartTime), })