Skip to content

Commit

Permalink
Resolve 175 race condition, no change to hook timing (#178)
Browse files Browse the repository at this point in the history
  • Loading branch information
hannahhoward authored Jul 22, 2021
1 parent af0285f commit 24d32d0
Show file tree
Hide file tree
Showing 11 changed files with 199 additions and 172 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ require (
github.com/ipfs/go-ipfs-util v0.0.2
github.com/ipfs/go-ipld-cbor v0.0.4 // indirect
github.com/ipfs/go-ipld-format v0.2.0
github.com/ipfs/go-log v1.0.4
github.com/ipfs/go-log/v2 v2.1.1
github.com/ipfs/go-merkledag v0.3.1
github.com/ipfs/go-peertaskqueue v0.2.0
Expand Down
2 changes: 1 addition & 1 deletion impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package graphsync
import (
"context"

logging "github.com/ipfs/go-log"
logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/go-peertaskqueue"
ipld "github.com/ipld/go-ipld-prime"
"github.com/libp2p/go-libp2p-core/peer"
Expand Down
2 changes: 1 addition & 1 deletion messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"sync"
"time"

logging "github.com/ipfs/go-log"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-core/peer"

gsmsg "github.com/ipfs/go-graphsync/message"
Expand Down
2 changes: 1 addition & 1 deletion network/libp2p_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"io"
"time"

logging "github.com/ipfs/go-log"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
Expand Down
2 changes: 1 addition & 1 deletion requestmanager/asyncloader/responsecache/responsecache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"sync"

blocks "github.com/ipfs/go-block-format"
logging "github.com/ipfs/go-log"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"

Expand Down
17 changes: 9 additions & 8 deletions requestmanager/requestmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import (
"context"
"errors"
"fmt"
"sync/atomic"

"github.com/hannahhoward/go-pubsub"
"golang.org/x/xerrors"
"sync/atomic"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/libp2p/go-libp2p-core/peer"
Expand Down Expand Up @@ -66,12 +67,12 @@ type AsyncLoader interface {
// RequestManager tracks outgoing requests and processes incoming reponses
// to them.
type RequestManager struct {
ctx context.Context
cancel func()
messages chan requestManagerMessage
peerHandler PeerHandler
rc *responseCollector
asyncLoader AsyncLoader
ctx context.Context
cancel func()
messages chan requestManagerMessage
peerHandler PeerHandler
rc *responseCollector
asyncLoader AsyncLoader
disconnectNotif *pubsub.PubSub
// dont touch out side of run loop
nextRequestID graphsync.RequestID
Expand Down
130 changes: 4 additions & 126 deletions responsemanager/queryexecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,10 @@ import (
"strings"
"time"

"github.com/ipfs/go-cid"
ipld "github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/libp2p/go-libp2p-core/peer"

"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/cidset"
"github.com/ipfs/go-graphsync/dedupkey"
"github.com/ipfs/go-graphsync/ipldutil"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/notifications"
Expand All @@ -26,12 +22,10 @@ var errCancelledByCommand = errors.New("response cancelled by responder")

// TODO: Move this into a seperate module and fully seperate from the ResponseManager
type queryExecutor struct {
requestHooks RequestHooks
blockHooks BlockHooks
updateHooks UpdateHooks
cancelledListeners CancelledListeners
responseAssembler ResponseAssembler
loader ipld.Loader
queryQueue QueryQueue
messages chan responseManagerMessage
ctx context.Context
Expand All @@ -57,9 +51,8 @@ func (qe *queryExecutor) processQueriesWorker() {
}
}
for _, task := range tasks {
key := task.Topic.(responseKey)
select {
case qe.messages <- &responseDataRequest{key, taskDataChan}:
case qe.messages <- &startTaskRequest{task, taskDataChan}:
case <-qe.ctx.Done():
return
}
Expand All @@ -72,132 +65,17 @@ func (qe *queryExecutor) processQueriesWorker() {
log.Info("Empty task on peer request stack")
continue
}
status, err := qe.executeTask(key, taskData)
status, err := qe.executeQuery(pid, taskData.request, taskData.loader, taskData.traverser, taskData.signals, taskData.subscriber)
isCancelled := err != nil && isContextErr(err)
if isCancelled {
qe.cancelledListeners.NotifyCancelledListeners(key.p, taskData.request)
qe.cancelledListeners.NotifyCancelledListeners(pid, taskData.request)
}
select {
case qe.messages <- &finishTaskRequest{key, status, err}:
case qe.messages <- &finishTaskRequest{task, status, err}:
case <-qe.ctx.Done():
}
}
qe.queryQueue.TasksDone(pid, tasks...)

}

}

func (qe *queryExecutor) executeTask(key responseKey, taskData responseTaskData) (graphsync.ResponseStatusCode, error) {
var err error
loader := taskData.loader
traverser := taskData.traverser
if loader == nil || traverser == nil {
var isPaused bool
loader, traverser, isPaused, err = qe.prepareQuery(taskData.ctx, key.p, taskData.request, taskData.signals, taskData.subscriber)
if err != nil {
return graphsync.RequestFailedUnknown, err
}
select {
case <-qe.ctx.Done():
return graphsync.RequestFailedUnknown, errors.New("context cancelled")
case qe.messages <- &setResponseDataRequest{key, loader, traverser}:
}
if isPaused {
return graphsync.RequestPaused, hooks.ErrPaused{}
}
}
return qe.executeQuery(key.p, taskData.request, loader, traverser, taskData.signals, taskData.subscriber)
}

func (qe *queryExecutor) prepareQuery(ctx context.Context,
p peer.ID,
request gsmsg.GraphSyncRequest, signals signals, sub *notifications.TopicDataSubscriber) (ipld.Loader, ipldutil.Traverser, bool, error) {
result := qe.requestHooks.ProcessRequestHooks(p, request)
var transactionError error
var isPaused bool
failNotifee := notifications.Notifee{Data: graphsync.RequestFailedUnknown, Subscriber: sub}
err := qe.responseAssembler.Transaction(p, request.ID(), func(rb responseassembler.ResponseBuilder) error {
for _, extension := range result.Extensions {
rb.SendExtensionData(extension)
}
if result.Err != nil || !result.IsValidated {
rb.FinishWithError(graphsync.RequestFailedUnknown)
rb.AddNotifee(failNotifee)
transactionError = errors.New("request not valid")
} else if result.IsPaused {
rb.PauseRequest()
isPaused = true
}
return nil
})
if err != nil {
return nil, nil, false, err
}
if transactionError != nil {
return nil, nil, false, transactionError
}
if err := qe.processDedupByKey(request, p, failNotifee); err != nil {
return nil, nil, false, err
}
if err := qe.processDoNoSendCids(request, p, failNotifee); err != nil {
return nil, nil, false, err
}
rootLink := cidlink.Link{Cid: request.Root()}
traverser := ipldutil.TraversalBuilder{
Root: rootLink,
Selector: request.Selector(),
Chooser: result.CustomChooser,
}.Start(ctx)
loader := result.CustomLoader
if loader == nil {
loader = qe.loader
}
return loader, traverser, isPaused, nil
}

func (qe *queryExecutor) processDedupByKey(request gsmsg.GraphSyncRequest, p peer.ID, failNotifee notifications.Notifee) error {
dedupData, has := request.Extension(graphsync.ExtensionDeDupByKey)
if !has {
return nil
}
key, err := dedupkey.DecodeDedupKey(dedupData)
if err != nil {
_ = qe.responseAssembler.Transaction(p, request.ID(), func(rb responseassembler.ResponseBuilder) error {
rb.FinishWithError(graphsync.RequestFailedUnknown)
rb.AddNotifee(failNotifee)
return nil
})
return err
}
qe.responseAssembler.DedupKey(p, request.ID(), key)
return nil
}

func (qe *queryExecutor) processDoNoSendCids(request gsmsg.GraphSyncRequest, p peer.ID, failNotifee notifications.Notifee) error {
doNotSendCidsData, has := request.Extension(graphsync.ExtensionDoNotSendCIDs)
if !has {
return nil
}
cidSet, err := cidset.DecodeCidSet(doNotSendCidsData)
if err != nil {
_ = qe.responseAssembler.Transaction(p, request.ID(), func(rb responseassembler.ResponseBuilder) error {
rb.FinishWithError(graphsync.RequestFailedUnknown)
rb.AddNotifee(failNotifee)
return nil
})
return err
}
links := make([]ipld.Link, 0, cidSet.Len())
err = cidSet.ForEach(func(c cid.Cid) error {
links = append(links, cidlink.Link{Cid: c})
return nil
})
if err != nil {
return err
}
qe.responseAssembler.IgnoreBlocks(p, request.ID(), links)
return nil
}

func (qe *queryExecutor) executeQuery(
Expand Down
115 changes: 115 additions & 0 deletions responsemanager/querypreparer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package responsemanager

import (
"context"
"errors"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/cidset"
"github.com/ipfs/go-graphsync/dedupkey"
"github.com/ipfs/go-graphsync/ipldutil"
gsmsg "github.com/ipfs/go-graphsync/message"
"github.com/ipfs/go-graphsync/notifications"
"github.com/ipfs/go-graphsync/responsemanager/responseassembler"
ipld "github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/libp2p/go-libp2p-core/peer"
)

type queryPreparer struct {
requestHooks RequestHooks
responseAssembler ResponseAssembler
loader ipld.Loader
}

func (qe *queryPreparer) prepareQuery(ctx context.Context,
p peer.ID,
request gsmsg.GraphSyncRequest, signals signals, sub *notifications.TopicDataSubscriber) (ipld.Loader, ipldutil.Traverser, bool, error) {
log.Infof("Processing request hooks for request: %d", request.ID())
result := qe.requestHooks.ProcessRequestHooks(p, request)
var transactionError error
var isPaused bool
failNotifee := notifications.Notifee{Data: graphsync.RequestFailedUnknown, Subscriber: sub}
err := qe.responseAssembler.Transaction(p, request.ID(), func(rb responseassembler.ResponseBuilder) error {
for _, extension := range result.Extensions {
rb.SendExtensionData(extension)
}
if result.Err != nil || !result.IsValidated {
rb.FinishWithError(graphsync.RequestFailedUnknown)
rb.AddNotifee(failNotifee)
transactionError = errors.New("request not valid")
} else if result.IsPaused {
rb.PauseRequest()
isPaused = true
}
return nil
})
if err != nil {
return nil, nil, false, err
}
if transactionError != nil {
return nil, nil, false, transactionError
}
if err := qe.processDedupByKey(request, p, failNotifee); err != nil {
return nil, nil, false, err
}
if err := qe.processDoNoSendCids(request, p, failNotifee); err != nil {
return nil, nil, false, err
}
rootLink := cidlink.Link{Cid: request.Root()}
traverser := ipldutil.TraversalBuilder{
Root: rootLink,
Selector: request.Selector(),
Chooser: result.CustomChooser,
}.Start(ctx)
loader := result.CustomLoader
if loader == nil {
loader = qe.loader
}
return loader, traverser, isPaused, nil
}

func (qe *queryPreparer) processDedupByKey(request gsmsg.GraphSyncRequest, p peer.ID, failNotifee notifications.Notifee) error {
dedupData, has := request.Extension(graphsync.ExtensionDeDupByKey)
if !has {
return nil
}
key, err := dedupkey.DecodeDedupKey(dedupData)
if err != nil {
_ = qe.responseAssembler.Transaction(p, request.ID(), func(rb responseassembler.ResponseBuilder) error {
rb.FinishWithError(graphsync.RequestFailedUnknown)
rb.AddNotifee(failNotifee)
return nil
})
return err
}
qe.responseAssembler.DedupKey(p, request.ID(), key)
return nil
}

func (qe *queryPreparer) processDoNoSendCids(request gsmsg.GraphSyncRequest, p peer.ID, failNotifee notifications.Notifee) error {
doNotSendCidsData, has := request.Extension(graphsync.ExtensionDoNotSendCIDs)
if !has {
return nil
}
cidSet, err := cidset.DecodeCidSet(doNotSendCidsData)
if err != nil {
_ = qe.responseAssembler.Transaction(p, request.ID(), func(rb responseassembler.ResponseBuilder) error {
rb.FinishWithError(graphsync.RequestFailedUnknown)
rb.AddNotifee(failNotifee)
return nil
})
return err
}
links := make([]ipld.Link, 0, cidSet.Len())
err = cidSet.ForEach(func(c cid.Cid) error {
links = append(links, cidlink.Link{Cid: c})
return nil
})
if err != nil {
return err
}
qe.responseAssembler.IgnoreBlocks(p, request.ID(), links)
return nil
}
2 changes: 1 addition & 1 deletion responsemanager/responseassembler/responseBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package responseassembler

import (
blocks "github.com/ipfs/go-block-format"
logging "github.com/ipfs/go-log"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"

Expand Down
Loading

0 comments on commit 24d32d0

Please sign in to comment.