From 1c8a21781b90a095809ee25696c3f6d12952ce86 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Tue, 25 Apr 2023 15:48:58 +0200 Subject: [PATCH] Break notifiers into a map so that only other requests under the same root are notified --- lib/graph_gateway.go | 92 ++++++++++++++++++++++++++++++++++---------- 1 file changed, 71 insertions(+), 21 deletions(-) diff --git a/lib/graph_gateway.go b/lib/graph_gateway.go index 833fbbc..ca87e83 100644 --- a/lib/graph_gateway.go +++ b/lib/graph_gateway.go @@ -22,6 +22,7 @@ import ( "github.com/ipfs/boxo/ipld/car" "github.com/ipfs/boxo/namesys" "github.com/ipfs/boxo/namesys/resolve" + "github.com/ipfs/boxo/path" ipfspath "github.com/ipfs/boxo/path" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" @@ -83,6 +84,12 @@ type Notifier interface { NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error } +type notifiersForCid struct { + lk sync.RWMutex + deleted int8 + notifiers []Notifier +} + type GraphGateway struct { fetcher CarFetcher blockFetcher exchange.Fetcher @@ -90,8 +97,7 @@ type GraphGateway struct { namesys namesys.NameSystem bstore blockstore.Blockstore - lk sync.RWMutex - notifiers map[Notifier]struct{} + notifiers sync.Map // cid -> notifiersForCid metrics *GraphGatewayMetrics } @@ -152,7 +158,7 @@ func NewGraphGatewayBackend(f CarFetcher, blockFetcher exchange.Fetcher, opts .. routing: vs, namesys: ns, bstore: bs, - notifiers: make(map[Notifier]struct{}), + notifiers: sync.Map{}, metrics: registerGraphGatewayMetrics(), }, nil } @@ -242,6 +248,18 @@ func registerGraphGatewayMetrics() *GraphGatewayMetrics { } } +func (api *GraphGateway) getRootOfPath(reqPath string) string { + pth, err := path.ParsePath(reqPath) + if err != nil { + return reqPath + } + if pth.IsJustAKey() { + return pth.Segments()[0] + } else { + return pth.Segments()[1] + } +} + /* Implementation iteration plan: @@ -252,7 +270,7 @@ Implementation iteration plan: 5. Don't redo the last segment fully if it's part of a UnixFS file and we can do range requests */ -func (api *GraphGateway) loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx context.Context, path string) (gateway.IPFSBackend, func(), error) { +func (api *GraphGateway) loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx context.Context, reqPath string) (gateway.IPFSBackend, func(), error) { bstore := api.bstore carFetchingExch := newInboundBlockExchange() doneWithFetcher := make(chan struct{}, 1) @@ -263,15 +281,31 @@ func (api *GraphGateway) loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx con metrics: api.metrics, } - api.lk.Lock() - api.notifiers[exch] = struct{}{} - api.lk.Unlock() + notifierKey := api.getRootOfPath(reqPath) + var notifier *notifiersForCid + for { + notifiers, _ := api.notifiers.LoadOrStore(notifierKey, ¬ifiersForCid{notifiers: []Notifier{}}) + if n, ok := notifiers.(*notifiersForCid); ok { + n.lk.Lock() + // could have been deleted after our load. try again. + if n.deleted != 0 { + n.lk.Unlock() + continue + } + notifier = n + n.notifiers = append(n.notifiers, exch) + n.lk.Unlock() + break + } else { + return nil, nil, errors.New("failed to get notifier") + } + } go func(metrics *GraphGatewayMetrics) { defer func() { if r := recover(); r != nil { // TODO: move to Debugw? - graphLog.Errorw("Recovered fetcher error", "path", path, "error", r) + graphLog.Errorw("Recovered fetcher error", "path", reqPath, "error", r) } }() metrics.carFetchAttemptMetric.Inc() @@ -280,7 +314,7 @@ func (api *GraphGateway) loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx con metrics.contextAlreadyCancelledMetric.Inc() } - err := api.fetcher.Fetch(ctx, path, func(resource string, reader io.Reader) error { + err := api.fetcher.Fetch(ctx, reqPath, func(resource string, reader io.Reader) error { cr, err := car.NewCarReader(reader) if err != nil { return err @@ -297,11 +331,11 @@ func (api *GraphGateway) loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx con return err } metrics.carBlocksFetchedMetric.Inc() - api.notifyAllOngoingRequests(ctx, blk) + api.notifyOngoingRequests(ctx, notifierKey, blk) } }) if err != nil { - graphLog.Debugw("car Fetch failed", "path", path, "error", err) + graphLog.Debugw("car Fetch failed", "path", reqPath, "error", err) } if err := carFetchingExch.Close(); err != nil { graphLog.Errorw("carFetchingExch.Close()", "error", err) @@ -317,21 +351,37 @@ func (api *GraphGateway) loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx con } return blkgw, func() { - api.lk.Lock() - delete(api.notifiers, exch) - api.lk.Unlock() + notifier.lk.Lock() + for i, e := range notifier.notifiers { + if e == exch { + notifier.notifiers = append(notifier.notifiers[0:i], notifier.notifiers[i+1:]...) + break + } + } + if len(notifier.notifiers) == 0 { + notifier.deleted = 1 + api.notifiers.Delete(notifierKey) + } + notifier.lk.Unlock() }, nil } -func (api *GraphGateway) notifyAllOngoingRequests(ctx context.Context, blks ...blocks.Block) { - api.lk.RLock() - for n := range api.notifiers { - err := n.NotifyNewBlocks(ctx, blks...) - if err != nil { - graphLog.Errorw("notifyAllOngoingRequests failed", "error", err) +func (api *GraphGateway) notifyOngoingRequests(ctx context.Context, key string, blks ...blocks.Block) { + if notifiers, ok := api.notifiers.Load(key); ok { + notifier, ok := notifiers.(*notifiersForCid) + if !ok { + graphLog.Errorw("notifyOngoingRequests failed", "error", "could not get notifier") + return + } + notifier.lk.RLock() + for _, n := range notifier.notifiers { + err := n.NotifyNewBlocks(ctx, blks...) + if err != nil { + graphLog.Errorw("notifyAllOngoingRequests failed", "error", err) + } } + notifier.lk.RUnlock() } - api.lk.RUnlock() } type fileCloseWrapper struct {