diff --git a/go.mod b/go.mod index 78ab6376..65b12b10 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/filecoin-project/go-cbor-util v0.0.1 github.com/filecoin-project/go-commp-utils v0.1.3 github.com/filecoin-project/go-crypto v0.0.1 // indirect - github.com/filecoin-project/go-data-transfer/v2 v2.0.0-20220603004528-681bfedccef1 + github.com/filecoin-project/go-data-transfer/v2 v2.0.0-20220731093701-aad22878e865 github.com/filecoin-project/go-ds-versioning v0.1.1 github.com/filecoin-project/go-fil-commcid v0.1.0 github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 @@ -31,7 +31,7 @@ require ( github.com/ipfs/go-cidutil v0.1.0 github.com/ipfs/go-datastore v0.5.1 github.com/ipfs/go-filestore v1.1.0 - github.com/ipfs/go-graphsync v0.13.1 + github.com/ipfs/go-graphsync v0.13.3-0.20220625074430-a95496cf1534 github.com/ipfs/go-ipfs-blockstore v1.1.2 github.com/ipfs/go-ipfs-blocksutil v0.0.1 github.com/ipfs/go-ipfs-chunker v0.0.5 diff --git a/go.sum b/go.sum index aaa4dbc8..076d6e9c 100644 --- a/go.sum +++ b/go.sum @@ -240,6 +240,8 @@ github.com/filecoin-project/go-data-transfer v1.14.0 h1:4pnfJk8FYtqcdAg+QRGzaz57 github.com/filecoin-project/go-data-transfer v1.14.0/go.mod h1:wNJKhaLLYBJDM3VFvgvYi4iUjPa69pz/1Q5Q4HzX2wE= github.com/filecoin-project/go-data-transfer/v2 v2.0.0-20220603004528-681bfedccef1 h1:OeqAIz4wyLdkmjQbxSEEmj3tUA6fcH4vwmCXLMm50Mg= github.com/filecoin-project/go-data-transfer/v2 v2.0.0-20220603004528-681bfedccef1/go.mod h1:CKpMVsGd15In1HPfW24jFK/TIf+98iFPQ0qh1LcxyB8= +github.com/filecoin-project/go-data-transfer/v2 v2.0.0-20220731093701-aad22878e865 h1:gFSieS2c4yinh9YB9SasmQXWYpyfoXImEXnZwSWs19c= +github.com/filecoin-project/go-data-transfer/v2 v2.0.0-20220731093701-aad22878e865/go.mod h1:MWO2J4e4mZEa1GvVyAieSQjyWAdg3CsuVhQfkQmDXMY= github.com/filecoin-project/go-ds-versioning v0.0.0-20211206185234-508abd7c2aff/go.mod h1:C9/l9PnB1+mwPa26BBVpCjG/XQCB0yj/q5CK2J8X1I4= github.com/filecoin-project/go-ds-versioning v0.1.1 h1:JiyBqaQlwC+UM0WhcBtVEeT3XrX59mQhT8U3p7nu86o= github.com/filecoin-project/go-ds-versioning v0.1.1/go.mod h1:C9/l9PnB1+mwPa26BBVpCjG/XQCB0yj/q5CK2J8X1I4= @@ -580,6 +582,8 @@ github.com/ipfs/go-graphsync v0.11.0/go.mod h1:wC+c8vGVjAHthsVIl8LKr37cUra2GOaMY github.com/ipfs/go-graphsync v0.12.0/go.mod h1:nASYWYETgsnMbQ3+DirNImOHQ8TY0a5AhAqyOY55tUg= github.com/ipfs/go-graphsync v0.13.1 h1:lWiP/WLycoPUYyj3IDEi1GJNP30kFuYOvimcfeuZyQs= github.com/ipfs/go-graphsync v0.13.1/go.mod h1:y8e8G6CmZeL9Srvx1l15CtGiRdf3h5JdQuqPz/iYL0A= +github.com/ipfs/go-graphsync v0.13.3-0.20220625074430-a95496cf1534 h1:sn7viAPyx3qZVhfRpXhW23mPtzl9rjJKtJ/HM/HsyZg= +github.com/ipfs/go-graphsync v0.13.3-0.20220625074430-a95496cf1534/go.mod h1:RKAui2+/HmlIVnuAXJIn0jltvOAXkl7wz3SYysmYnPI= github.com/ipfs/go-hamt-ipld v0.1.1/go.mod h1:1EZCr2v0jlCnhpa+aZ0JZYp8Tt2w16+JJOAVz17YcDk= github.com/ipfs/go-ipfs v0.11.0/go.mod h1:g68Thu2Ho11AWoHsN34P5fSK7iA6OWWRy3T/g8HLixc= github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08= diff --git a/retrievalmarket/impl/dtutils/dtutils.go b/retrievalmarket/impl/dtutils/dtutils.go index 8c02c2b3..82e67430 100644 --- a/retrievalmarket/impl/dtutils/dtutils.go +++ b/retrievalmarket/impl/dtutils/dtutils.go @@ -65,11 +65,7 @@ func providerEvent(event datatransfer.Event, channelState datatransfer.ChannelSt // event or moving to error if a data transfer error occurs func ProviderDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber { return func(event datatransfer.Event, channelState datatransfer.ChannelState) { - voucher, err := channelState.Voucher() - if err != nil { - log.Errorf("received bad voucher: %s", err.Error()) - return - } + voucher := channelState.Voucher() if voucher.Voucher == nil { log.Errorf("received empty voucher") return @@ -126,11 +122,7 @@ func clientEvent(event datatransfer.Event, channelState datatransfer.ChannelStat case datatransfer.Cancel: return rm.ClientEventProviderCancelled, nil case datatransfer.NewVoucherResult: - voucher, err := channelState.LastVoucherResult() - if err != nil { - log.Errorf("received bad voucher result: %s", err.Error()) - return noEvent, nil - } + voucher := channelState.LastVoucherResult() response, err := rm.DealResponseFromNode(voucher.Voucher) if err != nil { log.Errorf("unexpected voucher result received: %s", err.Error()) @@ -157,11 +149,8 @@ func clientEvent(event datatransfer.Event, channelState datatransfer.ChannelStat // an event to the appropriate state machine func ClientDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber { return func(event datatransfer.Event, channelState datatransfer.ChannelState) { - voucher, err := channelState.Voucher() - if err != nil { - log.Errorf("received bad voucher: %s", err.Error()) - return - } + voucher := channelState.Voucher() + dealProposal, err := rm.DealProposalFromNode(voucher.Voucher) // if this event is for a transfer not related to retrieval, ignore if err != nil { diff --git a/retrievalmarket/impl/dtutils/dtutils_test.go b/retrievalmarket/impl/dtutils/dtutils_test.go index 2e6e1389..c59a6342 100644 --- a/retrievalmarket/impl/dtutils/dtutils_test.go +++ b/retrievalmarket/impl/dtutils/dtutils_test.go @@ -9,7 +9,6 @@ import ( ds "github.com/ipfs/go-datastore" bstore "github.com/ipfs/go-ipfs-blockstore" "github.com/ipld/go-ipld-prime" - "github.com/ipld/go-ipld-prime/datamodel" peer "github.com/libp2p/go-libp2p-core/peer" "github.com/stretchr/testify/require" @@ -411,10 +410,31 @@ type fakeTransport struct{} var _ datatransfer.Transport = (*fakeTransport)(nil) -func (ft *fakeTransport) OpenChannel(ctx context.Context, dataSender peer.ID, channelID datatransfer.ChannelID, root datamodel.Link, stor datamodel.Node, channel datatransfer.ChannelState, msg datatransfer.Message) error { +func (ft *fakeTransport) Capabilities() datatransfer.TransportCapabilities { + return datatransfer.TransportCapabilities{} +} + +func (ft *fakeTransport) ID() datatransfer.TransportID { + return "" +} + +func (ft *fakeTransport) Versions() []datatransfer.Version { + return nil +} + +func (ft *fakeTransport) SendMessage(ctx context.Context, chid datatransfer.ChannelID, msg datatransfer.Message) error { return nil } +func (ft *fakeTransport) ChannelUpdated(ctx context.Context, chid datatransfer.ChannelID, message datatransfer.Message) error { + return nil +} +func (ft *fakeTransport) OpenChannel(context.Context, datatransfer.Channel, datatransfer.Request) error { + return nil +} +func (ft *fakeTransport) RestartChannel(ctx context.Context, channel datatransfer.ChannelState, req datatransfer.Request) error { + return nil +} func (ft *fakeTransport) CloseChannel(ctx context.Context, chid datatransfer.ChannelID) error { return nil } diff --git a/retrievalmarket/impl/integration_test.go b/retrievalmarket/impl/integration_test.go index fed3b419..45e7380e 100644 --- a/retrievalmarket/impl/integration_test.go +++ b/retrievalmarket/impl/integration_test.go @@ -118,8 +118,8 @@ func requireSetupTestClientAndProvider(ctx context.Context, t *testing.T, payChA }) gs1 := graphsyncimpl.New(ctx, network.NewFromLibp2pHost(testData.Host1), testData.LinkSystem1) - dtTransport1 := dtgstransport.NewTransport(testData.Host1.ID(), gs1) - dt1, err := dtimpl.NewDataTransfer(testData.DTStore1, testData.DTNet1, dtTransport1) + dtTransport1 := dtgstransport.NewTransport(gs1, testData.DTNet1) + dt1, err := dtimpl.NewDataTransfer(testData.DTStore1, testData.Host1.ID(), dtTransport1) require.NoError(t, err) tut.StartAndWaitForReadyDT(ctx, t, dt1) require.NoError(t, err) @@ -162,8 +162,8 @@ func requireSetupTestClientAndProvider(ctx context.Context, t *testing.T, payChA paymentAddress := address.TestAddress2 gs2 := graphsyncimpl.New(ctx, network.NewFromLibp2pHost(testData.Host2), testData.LinkSystem2) - dtTransport2 := dtgstransport.NewTransport(testData.Host2.ID(), gs2) - dt2, err := dtimpl.NewDataTransfer(testData.DTStore2, testData.DTNet2, dtTransport2) + dtTransport2 := dtgstransport.NewTransport(gs2, testData.DTNet2) + dt2, err := dtimpl.NewDataTransfer(testData.DTStore2, testData.Host2.ID(), dtTransport2) require.NoError(t, err) tut.StartAndWaitForReadyDT(ctx, t, dt2) require.NoError(t, err) @@ -643,8 +643,8 @@ func setupClient( }) gs1 := graphsyncimpl.New(ctx, network.NewFromLibp2pHost(testData.Host1), testData.LinkSystem1) - dtTransport1 := dtgstransport.NewTransport(testData.Host1.ID(), gs1) - dt1, err := dtimpl.NewDataTransfer(testData.DTStore1, testData.DTNet1, dtTransport1) + dtTransport1 := dtgstransport.NewTransport(gs1, testData.DTNet1) + dt1, err := dtimpl.NewDataTransfer(testData.DTStore1, testData.Host1.ID(), dtTransport1) require.NoError(t, err) tut.StartAndWaitForReadyDT(ctx, t, dt1) require.NoError(t, err) @@ -682,8 +682,8 @@ func setupProvider( pieceStore.ExpectPiece(expectedPiece, pieceInfo) gs2 := graphsyncimpl.New(ctx, network.NewFromLibp2pHost(testData.Host2), testData.LinkSystem2) - dtTransport2 := dtgstransport.NewTransport(testData.Host2.ID(), gs2) - dt2, err := dtimpl.NewDataTransfer(testData.DTStore2, testData.DTNet2, dtTransport2) + dtTransport2 := dtgstransport.NewTransport(gs2, testData.DTNet2) + dt2, err := dtimpl.NewDataTransfer(testData.DTStore2, testData.Host2.ID(), dtTransport2) require.NoError(t, err) tut.StartAndWaitForReadyDT(ctx, t, dt2) require.NoError(t, err) diff --git a/retrievalmarket/impl/providerstates/provider_states.go b/retrievalmarket/impl/providerstates/provider_states.go index 4ad9d430..949db147 100644 --- a/retrievalmarket/impl/providerstates/provider_states.go +++ b/retrievalmarket/impl/providerstates/provider_states.go @@ -159,10 +159,7 @@ func savePayment(ctx fsm.Context, env ProviderDealEnvironment, payment *rm.DealP } func processLastVoucher(ctx fsm.Context, env ProviderDealEnvironment, channelState datatransfer.ChannelState) (abi.TokenAmount, error) { - voucher, err := channelState.LastVoucher() - if err != nil { - return abi.TokenAmount{}, err - } + voucher := channelState.LastVoucher() // read payment and return response if present if payment, err := rm.DealPaymentFromNode(voucher.Voucher); err == nil { diff --git a/retrievalmarket/impl/requestvalidation/requestvalidation.go b/retrievalmarket/impl/requestvalidation/requestvalidation.go index 5c4f6194..85359ab8 100644 --- a/retrievalmarket/impl/requestvalidation/requestvalidation.go +++ b/retrievalmarket/impl/requestvalidation/requestvalidation.go @@ -165,10 +165,8 @@ func (rv *ProviderRequestValidator) validatePull(receiver peer.ID, proposal *rm. // ValidateRestart validates a request on restart, based on its current state func (rv *ProviderRequestValidator) ValidateRestart(channelID datatransfer.ChannelID, channelState datatransfer.ChannelState) (datatransfer.ValidationResult, error) { - voucher, err := channelState.Voucher() - if err != nil { - return datatransfer.ValidationResult{}, err - } + voucher := channelState.Voucher() + proposal, err := rm.DealProposalFromNode(voucher.Voucher) if err != nil { return datatransfer.ValidationResult{}, errors.New("wrong voucher type") diff --git a/retrievalmarket/retrieval_restart_integration_test.go b/retrievalmarket/retrieval_restart_integration_test.go index 52bfc4a2..5dde9f34 100644 --- a/retrievalmarket/retrieval_restart_integration_test.go +++ b/retrievalmarket/retrieval_restart_integration_test.go @@ -7,12 +7,13 @@ import ( "github.com/ipfs/go-datastore" logger "github.com/ipfs/go-log/v2" + "github.com/libp2p/go-libp2p-core/peer" "github.com/stretchr/testify/require" datatransfer "github.com/filecoin-project/go-data-transfer/v2" "github.com/filecoin-project/go-data-transfer/v2/channelmonitor" dtimpl "github.com/filecoin-project/go-data-transfer/v2/impl" - dtnet "github.com/filecoin-project/go-data-transfer/v2/network" + dtnet "github.com/filecoin-project/go-data-transfer/v2/transport/helpers/network" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" @@ -97,8 +98,8 @@ func TestBounceConnectionDealTransferOngoing(t *testing.T) { td := shared_testutil.NewLibp2pTestData(bgCtx, t) td.DTNet1 = dtnet.NewFromLibp2pHost(td.Host1, dtClientNetRetry) depGen := dependencies.NewDepGenerator() - depGen.ClientNewDataTransfer = func(ds datastore.Batching, dir string, transferNetwork dtnet.DataTransferNetwork, transport datatransfer.Transport) (datatransfer.Manager, error) { - return dtimpl.NewDataTransfer(ds, transferNetwork, transport, restartConf) + depGen.ClientNewDataTransfer = func(ds datastore.Batching, peerID peer.ID, transport datatransfer.Transport) (datatransfer.Manager, error) { + return dtimpl.NewDataTransfer(ds, peerID, transport, restartConf) } deps := depGen.New(t, bgCtx, td, testnodes.NewStorageMarketState(), "", noOpDelay, noOpDelay) providerNode := testnodes2.NewTestRetrievalProviderNode() @@ -227,8 +228,8 @@ func TestBounceConnectionDealTransferUnsealing(t *testing.T) { td := shared_testutil.NewLibp2pTestData(bgCtx, t) td.DTNet1 = dtnet.NewFromLibp2pHost(td.Host1, dtClientNetRetry) depGen := dependencies.NewDepGenerator() - depGen.ClientNewDataTransfer = func(ds datastore.Batching, dir string, transferNetwork dtnet.DataTransferNetwork, transport datatransfer.Transport) (datatransfer.Manager, error) { - return dtimpl.NewDataTransfer(ds, transferNetwork, transport, restartConf) + depGen.ClientNewDataTransfer = func(ds datastore.Batching, peerID peer.ID, transport datatransfer.Transport) (datatransfer.Manager, error) { + return dtimpl.NewDataTransfer(ds, peerID, transport, restartConf) } deps := depGen.New(t, bgCtx, td, testnodes.NewStorageMarketState(), "", noOpDelay, noOpDelay) providerNode := testnodes2.NewTestRetrievalProviderNode() diff --git a/shared_testutil/mocknet.go b/shared_testutil/mocknet.go index f885dffb..247a2931 100644 --- a/shared_testutil/mocknet.go +++ b/shared_testutil/mocknet.go @@ -24,7 +24,7 @@ import ( "github.com/stretchr/testify/require" "golang.org/x/net/context" - dtnet "github.com/filecoin-project/go-data-transfer/v2/network" + dtnet "github.com/filecoin-project/go-data-transfer/v2/transport/helpers/network" "github.com/filecoin-project/go-fil-markets/shared_testutil/unixfs" ) diff --git a/shared_testutil/testchannel.go b/shared_testutil/testchannel.go index dd17592b..d20ff48e 100644 --- a/shared_testutil/testchannel.go +++ b/shared_testutil/testchannel.go @@ -112,12 +112,8 @@ func NewTestChannel(params TestChannelParams) datatransfer.ChannelState { return tc } -func (tc *TestChannel) ReceivedCidsLen() int { - return len(tc.receivedCids) -} - -func (tc *TestChannel) ReceivedCidsTotal() int64 { - return int64(len(tc.receivedCids)) +func (tc *TestChannel) ReceivedIndex() datamodel.Node { + return basicnode.NewInt(int64(len(tc.receivedCids))) } // TransferID returns the transfer id for this channel @@ -136,27 +132,22 @@ func (tc *TestChannel) Selector() datamodel.Node { return tc.selector } -// ReceivedCids returns the cids received so far -func (tc *TestChannel) ReceivedCids() []cid.Cid { - return tc.receivedCids -} - // TODO actual implementation of those func (tc *TestChannel) MissingCids() []cid.Cid { return nil } -func (tc *TestChannel) QueuedCidsTotal() int64 { - return 0 +func (tc *TestChannel) QueuedIndex() datamodel.Node { + return basicnode.NewInt(0) } -func (tc *TestChannel) SentCidsTotal() int64 { - return 0 +func (tc *TestChannel) SentIndex() datamodel.Node { + return basicnode.NewInt(0) } // Voucher returns the voucher for this data transfer -func (tc *TestChannel) Voucher() (datatransfer.TypedVoucher, error) { - return tc.vouchers[0], nil +func (tc *TestChannel) Voucher() datatransfer.TypedVoucher { + return tc.vouchers[0] } // Sender returns the peer id for the node that is sending data @@ -209,6 +200,11 @@ func (tc *TestChannel) OtherParty(thisParty peer.ID) peer.ID { return tc.sender } +func (tc *TestChannel) BothPaused() bool { return false } +func (tc *TestChannel) ResponderPaused() bool { return false } +func (tc *TestChannel) InitiatorPaused() bool { return false } +func (tc *TestChannel) SelfPaused() bool { return false } + // Status is the current status of this channel func (tc *TestChannel) Status() datatransfer.Status { return tc.status @@ -235,23 +231,23 @@ func (tc *TestChannel) Message() string { } // Vouchers returns all vouchers sent on this channel -func (tc *TestChannel) Vouchers() ([]datatransfer.TypedVoucher, error) { - return tc.vouchers, nil +func (tc *TestChannel) Vouchers() []datatransfer.TypedVoucher { + return tc.vouchers } // VoucherResults are results of vouchers sent on the channel -func (tc *TestChannel) VoucherResults() ([]datatransfer.TypedVoucher, error) { - return tc.voucherResults, nil +func (tc *TestChannel) VoucherResults() []datatransfer.TypedVoucher { + return tc.voucherResults } // LastVoucher returns the last voucher sent on the channel -func (tc *TestChannel) LastVoucher() (datatransfer.TypedVoucher, error) { - return tc.vouchers[len(tc.vouchers)-1], nil +func (tc *TestChannel) LastVoucher() datatransfer.TypedVoucher { + return tc.vouchers[len(tc.vouchers)-1] } // LastVoucherResult returns the last voucher result sent on the channel -func (tc *TestChannel) LastVoucherResult() (datatransfer.TypedVoucher, error) { - return tc.voucherResults[len(tc.voucherResults)-1], nil +func (tc *TestChannel) LastVoucherResult() datatransfer.TypedVoucher { + return tc.voucherResults[len(tc.voucherResults)-1] } func (tc *TestChannel) Stages() *datatransfer.ChannelStages { diff --git a/storagemarket/impl/dtutils/dtutils.go b/storagemarket/impl/dtutils/dtutils.go index e53a6a84..f7cab3c5 100644 --- a/storagemarket/impl/dtutils/dtutils.go +++ b/storagemarket/impl/dtutils/dtutils.go @@ -32,12 +32,7 @@ type EventReceiver interface { // event or moving to error if a data transfer error occurs func ProviderDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber { return func(event datatransfer.Event, channelState datatransfer.ChannelState) { - node, err := channelState.Voucher() - if err != nil { - log.Errorf("ignoring data-transfer event as the voucher is invalid, event=%s, channelID=%s: %s", datatransfer.Events[event.Code], "channelID", - channelState.ChannelID().String(), err.Error()) - return - } + node := channelState.Voucher() if node.Voucher == nil { log.Debugw("ignoring data-transfer event as it's not storage related", "event", datatransfer.Events[event.Code], "channelID", channelState.ChannelID()) @@ -95,12 +90,7 @@ func ProviderDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber func ClientDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber { return func(event datatransfer.Event, channelState datatransfer.ChannelState) { // TODO: are these log messages valid for Client? - node, err := channelState.Voucher() - if err != nil { - log.Errorf("ignoring data-transfer event as the voucher is invalid, event=%s, channelID=%s: %s", datatransfer.Events[event.Code], "channelID", - channelState.ChannelID().String(), err.Error()) - return - } + node := channelState.Voucher() if node.Voucher == nil { log.Debugw("ignoring data-transfer event as it's not storage related", "event", datatransfer.Events[event.Code], "channelID", channelState.ChannelID()) @@ -135,9 +125,9 @@ func ClientDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber { return deals.Send(voucher.Proposal, storagemarket.ClientEventDataTransferRestarted, channelState.ChannelID()) case datatransfer.Disconnected: return deals.Send(voucher.Proposal, storagemarket.ClientEventDataTransferStalled) - case datatransfer.TransferRequestQueued: - return deals.Send(voucher.Proposal, storagemarket.ClientEventDataTransferQueued, channelState.ChannelID()) case datatransfer.Accept: + return deals.Send(voucher.Proposal, storagemarket.ClientEventDataTransferQueued, channelState.ChannelID()) + case datatransfer.TransferInitiated: return deals.Send(voucher.Proposal, storagemarket.ClientEventDataTransferInitiated, channelState.ChannelID()) case datatransfer.Error: return deals.Send(voucher.Proposal, storagemarket.ClientEventDataTransferFailed, fmt.Errorf("deal data transfer failed: %s", event.Message)) diff --git a/storagemarket/impl/dtutils/dtutils_test.go b/storagemarket/impl/dtutils/dtutils_test.go index fa784725..cf994bb7 100644 --- a/storagemarket/impl/dtutils/dtutils_test.go +++ b/storagemarket/impl/dtutils/dtutils_test.go @@ -9,9 +9,7 @@ import ( ds "github.com/ipfs/go-datastore" bs "github.com/ipfs/go-ipfs-blockstore" "github.com/ipld/go-ipld-prime" - "github.com/ipld/go-ipld-prime/datamodel" "github.com/ipld/go-ipld-prime/node/basicnode" - peer "github.com/libp2p/go-libp2p-core/peer" "github.com/stretchr/testify/require" datatransfer "github.com/filecoin-project/go-data-transfer/v2" @@ -177,7 +175,16 @@ func TestClientDataTransferSubscriber(t *testing.T) { }, "accept event": { code: datatransfer.Accept, - status: datatransfer.Requested, + status: datatransfer.Queued, + called: true, + voucher: storageDataTransferVoucher(t, expectedProposalCID), + expectedID: expectedProposalCID, + expectedEvent: storagemarket.ClientEventDataTransferQueued, + expectedArgs: []interface{}{datatransfer.ChannelID{Initiator: init, Responder: resp, ID: tid}}, + }, + "transfer initiated event": { + code: datatransfer.TransferInitiated, + status: datatransfer.Ongoing, called: true, voucher: storageDataTransferVoucher(t, expectedProposalCID), expectedID: expectedProposalCID, @@ -314,10 +321,33 @@ func (fsg *fakeStoreGetter) Get(proposalCid cid.Cid) (bs.Blockstore, error) { type fakeTransport struct{} -func (ft *fakeTransport) OpenChannel(ctx context.Context, dataSender peer.ID, channelID datatransfer.ChannelID, root datamodel.Link, stor datamodel.Node, channel datatransfer.ChannelState, msg datatransfer.Message) error { +var _ datatransfer.Transport = (*fakeTransport)(nil) + +func (ft *fakeTransport) Capabilities() datatransfer.TransportCapabilities { + return datatransfer.TransportCapabilities{} +} + +func (ft *fakeTransport) ID() datatransfer.TransportID { + return "" +} + +func (ft *fakeTransport) Versions() []datatransfer.Version { return nil } +func (ft *fakeTransport) SendMessage(ctx context.Context, chid datatransfer.ChannelID, msg datatransfer.Message) error { + return nil +} + +func (ft *fakeTransport) ChannelUpdated(ctx context.Context, chid datatransfer.ChannelID, message datatransfer.Message) error { + return nil +} +func (ft *fakeTransport) OpenChannel(context.Context, datatransfer.Channel, datatransfer.Request) error { + return nil +} +func (ft *fakeTransport) RestartChannel(ctx context.Context, channel datatransfer.ChannelState, req datatransfer.Request) error { + return nil +} func (ft *fakeTransport) CloseChannel(ctx context.Context, chid datatransfer.ChannelID) error { return nil } diff --git a/storagemarket/impl/requestvalidation/unified_request_validator.go b/storagemarket/impl/requestvalidation/unified_request_validator.go index 0c80d9a8..8e0be1cd 100644 --- a/storagemarket/impl/requestvalidation/unified_request_validator.go +++ b/storagemarket/impl/requestvalidation/unified_request_validator.go @@ -79,16 +79,10 @@ func (v *UnifiedRequestValidator) ValidatePull(_ datatransfer.ChannelID, receive func (v *UnifiedRequestValidator) ValidateRestart(chid datatransfer.ChannelID, channelState datatransfer.ChannelState) (datatransfer.ValidationResult, error) { if channelState.IsPull() { - voucher, err := channelState.Voucher() - if err != nil { - return datatransfer.ValidationResult{}, err - } + voucher := channelState.Voucher() return v.ValidatePull(chid, channelState.Recipient(), voucher.Voucher, channelState.BaseCID(), channelState.Selector()) } else { - voucher, err := channelState.Voucher() - if err != nil { - return datatransfer.ValidationResult{}, err - } + voucher := channelState.Voucher() return v.ValidatePush(chid, channelState.Sender(), voucher.Voucher, channelState.BaseCID(), channelState.Selector()) } } diff --git a/storagemarket/integration_test.go b/storagemarket/integration_test.go index 6a555aef..5f338fb2 100644 --- a/storagemarket/integration_test.go +++ b/storagemarket/integration_test.go @@ -13,13 +13,14 @@ import ( "github.com/ipfs/go-datastore" "github.com/ipld/go-car" selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse" + "github.com/libp2p/go-libp2p-core/peer" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" datatransfer "github.com/filecoin-project/go-data-transfer/v2" "github.com/filecoin-project/go-data-transfer/v2/channelmonitor" dtimpl "github.com/filecoin-project/go-data-transfer/v2/impl" - dtnet "github.com/filecoin-project/go-data-transfer/v2/network" + dtnet "github.com/filecoin-project/go-data-transfer/v2/transport/helpers/network" "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-fil-markets/shared_testutil" @@ -361,8 +362,8 @@ func TestRestartOnlyProviderDataTransfer(t *testing.T) { }) smState := testnodes.NewStorageMarketState() depGen := dependencies.NewDepGenerator() - depGen.ClientNewDataTransfer = func(ds datastore.Batching, dir string, transferNetwork dtnet.DataTransferNetwork, transport datatransfer.Transport) (datatransfer.Manager, error) { - return dtimpl.NewDataTransfer(ds, transferNetwork, transport, restartConf) + depGen.ClientNewDataTransfer = func(ds datastore.Batching, peerID peer.ID, transport datatransfer.Transport) (datatransfer.Manager, error) { + return dtimpl.NewDataTransfer(ds, peerID, transport, restartConf) } deps := depGen.New(t, ctx, td, smState, "", noOpDelay, noOpDelay) h := testharness.NewHarnessWithTestData(t, td, deps, true, false) @@ -399,7 +400,7 @@ func TestRestartOnlyProviderDataTransfer(t *testing.T) { var providerState []storagemarket.MinerDeal h.DTClient.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) { if event.Code == datatransfer.Accept { - t.Log("client has accepted data-transfer query, shutting down provider") + t.Log("client has accepted data-transfer query and transfer started, shutting down provider") require.NoError(t, h.TestData.MockNet.UnlinkPeers(host1.ID(), host2.ID())) require.NoError(t, h.TestData.MockNet.DisconnectPeers(host1.ID(), host2.ID())) @@ -424,7 +425,7 @@ func TestRestartOnlyProviderDataTransfer(t *testing.T) { cd, err := client.GetLocalDeal(ctx, proposalCid) require.NoError(t, err) t.Logf("client state after stopping is %s", storagemarket.DealStates[cd.State]) - require.True(t, cd.State == storagemarket.StorageDealStartDataTransfer || cd.State == storagemarket.StorageDealTransferring) + require.True(t, cd.State == storagemarket.StorageDealStartDataTransfer || cd.State == storagemarket.StorageDealTransferQueued || cd.State == storagemarket.StorageDealTransferring) // Create new provider (but don't restart yet) newProvider := h.CreateNewProvider(t, ctx, h.TestData) @@ -800,8 +801,8 @@ func TestBounceConnectionDataTransfer(t *testing.T) { }) smState := testnodes.NewStorageMarketState() depGen := dependencies.NewDepGenerator() - depGen.ClientNewDataTransfer = func(ds datastore.Batching, dir string, transferNetwork dtnet.DataTransferNetwork, transport datatransfer.Transport) (datatransfer.Manager, error) { - return dtimpl.NewDataTransfer(ds, transferNetwork, transport, restartConf) + depGen.ClientNewDataTransfer = func(ds datastore.Batching, peerID peer.ID, transport datatransfer.Transport) (datatransfer.Manager, error) { + return dtimpl.NewDataTransfer(ds, peerID, transport, restartConf) } deps := depGen.New(t, ctx, td, smState, "", noOpDelay, noOpDelay) h := testharness.NewHarnessWithTestData(t, td, deps, true, false) diff --git a/storagemarket/testharness/dependencies/dependencies.go b/storagemarket/testharness/dependencies/dependencies.go index 31bf6881..a35d9ea0 100644 --- a/storagemarket/testharness/dependencies/dependencies.go +++ b/storagemarket/testharness/dependencies/dependencies.go @@ -12,13 +12,13 @@ import ( "github.com/ipfs/go-datastore/namespace" graphsyncimpl "github.com/ipfs/go-graphsync/impl" "github.com/ipfs/go-graphsync/network" + peer "github.com/libp2p/go-libp2p-core/peer" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/filecoin-project/go-address" datatransfer "github.com/filecoin-project/go-data-transfer/v2" dtimpl "github.com/filecoin-project/go-data-transfer/v2/impl" - network2 "github.com/filecoin-project/go-data-transfer/v2/network" dtgstransport "github.com/filecoin-project/go-data-transfer/v2/transport/graphsync" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/specs-actors/v7/actors/builtin/market" @@ -68,10 +68,10 @@ func NewDependenciesWithTestData(t *testing.T, return NewDepGenerator().New(t, ctx, td, smState, tempPath, cd, pd) } -type NewDataTransfer func(ds datastore.Batching, cidListsDir string, dataTransferNetwork network2.DataTransferNetwork, transport datatransfer.Transport) (datatransfer.Manager, error) +type NewDataTransfer func(ds datastore.Batching, peerID peer.ID, transport datatransfer.Transport) (datatransfer.Manager, error) -func defaultNewDataTransfer(ds datastore.Batching, dir string, transferNetwork network2.DataTransferNetwork, transport datatransfer.Transport) (datatransfer.Manager, error) { - return dtimpl.NewDataTransfer(ds, transferNetwork, transport) +func defaultNewDataTransfer(ds datastore.Batching, peerID peer.ID, transport datatransfer.Transport) (datatransfer.Manager, error) { + return dtimpl.NewDataTransfer(ds, peerID, transport) } type DepGenerator struct { @@ -147,8 +147,8 @@ func (gen *DepGenerator) New( // create provider and client gs1 := graphsyncimpl.New(ctx, network.NewFromLibp2pHost(td.Host1), td.LinkSystem1) - dtTransport1 := dtgstransport.NewTransport(td.Host1.ID(), gs1) - dt1, err := gen.ClientNewDataTransfer(td.DTStore1, td.DTTmpDir1, td.DTNet1, dtTransport1) + dtTransport1 := dtgstransport.NewTransport(gs1, td.DTNet1) + dt1, err := gen.ClientNewDataTransfer(td.DTStore1, td.Host1.ID(), dtTransport1) require.NoError(t, err) shared_testutil.StartAndWaitForReadyDT(ctx, t, dt1) @@ -157,8 +157,8 @@ func (gen *DepGenerator) New( shared_testutil.StartAndWaitForReady(ctx, t, discovery) gs2 := graphsyncimpl.New(ctx, network.NewFromLibp2pHost(td.Host2), td.LinkSystem2) - dtTransport2 := dtgstransport.NewTransport(td.Host2.ID(), gs2) - dt2, err := gen.ProviderNewDataTransfer(td.DTStore2, td.DTTmpDir2, td.DTNet2, dtTransport2) + dtTransport2 := dtgstransport.NewTransport(gs2, td.DTNet2) + dt2, err := gen.ProviderNewDataTransfer(td.DTStore2, td.Host2.ID(), dtTransport2) require.NoError(t, err) shared_testutil.StartAndWaitForReadyDT(ctx, t, dt2) diff --git a/storagemarket/testharness/testharness.go b/storagemarket/testharness/testharness.go index a1adff79..5abc7b75 100644 --- a/storagemarket/testharness/testharness.go +++ b/storagemarket/testharness/testharness.go @@ -143,8 +143,8 @@ func NewHarnessWithTestData(t *testing.T, td *shared_testutil.Libp2pTestData, de func (h *StorageHarness) CreateNewProvider(t *testing.T, ctx context.Context, td *shared_testutil.Libp2pTestData) storagemarket.StorageProvider { gs2 := graphsyncimpl.New(ctx, gsnetwork.NewFromLibp2pHost(td.Host2), td.LinkSystem2) - dtTransport2 := dtgstransport.NewTransport(td.Host2.ID(), gs2) - dt2, err := dtimpl.NewDataTransfer(td.DTStore2, td.DTNet2, dtTransport2) + dtTransport2 := dtgstransport.NewTransport(gs2, td.DTNet2) + dt2, err := dtimpl.NewDataTransfer(td.DTStore2, td.Host2.ID(), dtTransport2) require.NoError(t, err) shared_testutil.StartAndWaitForReadyDT(ctx, t, dt2)