diff --git a/changelog/unreleased/delete-stale-shares.md b/changelog/unreleased/delete-stale-shares.md new file mode 100644 index 0000000000..323867b85f --- /dev/null +++ b/changelog/unreleased/delete-stale-shares.md @@ -0,0 +1,5 @@ +Bugfix: Delete stale shares in the jsoncs3 share manager + +The jsoncs3 share manager now properly deletes all references to removed shares and shares that belong to a space that was deleted + +https://github.com/cs3org/reva/pull/4975 diff --git a/pkg/share/manager/jsoncs3/jsoncs3.go b/pkg/share/manager/jsoncs3/jsoncs3.go index e8e00dd410..12c17075a0 100644 --- a/pkg/share/manager/jsoncs3/jsoncs3.go +++ b/pkg/share/manager/jsoncs3/jsoncs3.go @@ -34,6 +34,7 @@ import ( "github.com/cs3org/reva/v2/pkg/errtypes" "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/events/stream" + "github.com/cs3org/reva/v2/pkg/logger" "github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool" "github.com/cs3org/reva/v2/pkg/share" "github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/providercache" @@ -114,6 +115,12 @@ func init() { registry.Register("jsoncs3", NewDefault) } +var ( + _registeredEvents = []events.Unmarshaller{ + events.SpaceDeleted{}, + } +) + type config struct { GatewayAddr string `mapstructure:"gateway_addr"` MaxConcurrency int `mapstructure:"max_concurrency"` @@ -188,7 +195,8 @@ func NewDefault(m map[string]interface{}) (share.Manager, error) { // New returns a new manager instance. func New(s metadata.Storage, gatewaySelector pool.Selectable[gatewayv1beta1.GatewayAPIClient], ttlSeconds int, es events.Stream, maxconcurrency int) (*Manager, error) { ttl := time.Duration(ttlSeconds) * time.Second - return &Manager{ + + m := &Manager{ Cache: providercache.New(s, ttl), CreatedCache: sharecache.New(s, "users", "created.json", ttl), UserReceivedStates: receivedsharecache.New(s, ttl), @@ -197,7 +205,18 @@ func New(s metadata.Storage, gatewaySelector pool.Selectable[gatewayv1beta1.Gate gatewaySelector: gatewaySelector, eventStream: es, MaxConcurrency: maxconcurrency, - }, nil + } + + // listen for events + if m.eventStream != nil { + ch, err := events.Consume(m.eventStream, "jsoncs3sharemanager", _registeredEvents...) + if err != nil { + appctx.GetLogger(context.Background()).Error().Err(err).Msg("error consuming events") + } + go m.ProcessEvents(ch) + } + + return m, nil } func (m *Manager) initialize(ctx context.Context) error { @@ -248,6 +267,22 @@ func (m *Manager) initialize(ctx context.Context) error { return nil } +func (m *Manager) ProcessEvents(ch <-chan events.Event) { + log := logger.New() + for event := range ch { + ctx := context.Background() + + if err := m.initialize(ctx); err != nil { + log.Error().Err(err).Msg("error initializing manager") + } + + if ev, ok := event.Event.(events.SpaceDeleted); ok { + log.Debug().Msgf("space deleted event: %v", ev) + go func() { m.purgeSpace(ctx, ev.ID) }() + } + } +} + // Share creates a new share func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *collaboration.ShareGrant) (*collaboration.Share, error) { ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Share") @@ -420,7 +455,7 @@ func (m *Manager) GetShare(ctx context.Context, ref *collaboration.ShareReferenc return nil, err } if share.IsExpired(s) { - if err := m.removeShare(ctx, s); err != nil { + if err := m.removeShare(ctx, s, false); err != nil { sublog.Error().Err(err). Msg("failed to unshare expired share") } @@ -485,7 +520,7 @@ func (m *Manager) Unshare(ctx context.Context, ref *collaboration.ShareReference return errtypes.NotFound(ref.String()) } - return m.removeShare(ctx, s) + return m.removeShare(ctx, s, false) } // UpdateShare updates the mode of the given share. @@ -622,7 +657,7 @@ func (m *Manager) listSharesByIDs(ctx context.Context, user *userv1beta1.User, f resourceID := s.GetResourceId() sublog = sublog.With().Str("storageid", resourceID.GetStorageId()).Str("spaceid", resourceID.GetSpaceId()).Str("opaqueid", resourceID.GetOpaqueId()).Logger() if share.IsExpired(s) { - if err := m.removeShare(ctx, s); err != nil { + if err := m.removeShare(ctx, s, false); err != nil { sublog.Error().Err(err). Msg("failed to unshare expired share") } @@ -740,7 +775,7 @@ func (m *Manager) listCreatedShares(ctx context.Context, user *userv1beta1.User, continue } if share.IsExpired(s) { - if err := m.removeShare(ctx, s); err != nil { + if err := m.removeShare(ctx, s, false); err != nil { sublog.Error().Err(err). Msg("failed to unshare expired share") } @@ -906,7 +941,7 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati } sublogr = sublogr.With().Str("shareid", shareID).Logger() if share.IsExpired(s) { - if err := m.removeShare(ctx, s); err != nil { + if err := m.removeShare(ctx, s, false); err != nil { sublogr.Error().Err(err). Msg("failed to unshare expired share") } @@ -1009,7 +1044,7 @@ func (m *Manager) getReceived(ctx context.Context, ref *collaboration.ShareRefer return nil, errtypes.NotFound(ref.String()) } if share.IsExpired(s) { - if err := m.removeShare(ctx, s); err != nil { + if err := m.removeShare(ctx, s, false); err != nil { sublog.Error().Err(err). Msg("failed to unshare expired share") } @@ -1136,24 +1171,59 @@ func (m *Manager) Load(ctx context.Context, shareChan <-chan *collaboration.Shar return nil } -func (m *Manager) removeShare(ctx context.Context, s *collaboration.Share) error { +func (m *Manager) purgeSpace(ctx context.Context, id *provider.StorageSpaceId) { + log := appctx.GetLogger(ctx) + storageID, spaceID := storagespace.SplitStorageID(id.OpaqueId) + + shares, err := m.Cache.ListSpace(ctx, storageID, spaceID) + if err != nil { + log.Error().Err(err).Msg("error listing shares in space") + return + } + + // iterate over all shares in the space and remove them + for _, share := range shares.Shares { + err := m.removeShare(ctx, share, true) + if err != nil { + log.Error().Err(err).Msg("error removing share") + } + } + + // remove all shares in the space + err = m.Cache.PurgeSpace(ctx, storageID, spaceID) + if err != nil { + log.Error().Err(err).Msg("error purging space") + } +} + +func (m *Manager) removeShare(ctx context.Context, s *collaboration.Share, skipSpaceCache bool) error { ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "removeShare") defer span.End() eg, ctx := errgroup.WithContext(ctx) - eg.Go(func() error { - storageID, spaceID, _ := shareid.Decode(s.Id.OpaqueId) - err := m.Cache.Remove(ctx, storageID, spaceID, s.Id.OpaqueId) + if !skipSpaceCache { + eg.Go(func() error { + storageID, spaceID, _ := shareid.Decode(s.Id.OpaqueId) + err := m.Cache.Remove(ctx, storageID, spaceID, s.Id.OpaqueId) - return err - }) + return err + }) + } eg.Go(func() error { // remove from created cache return m.CreatedCache.Remove(ctx, s.GetCreator().GetOpaqueId(), s.Id.OpaqueId) }) - // TODO remove from grantee cache + eg.Go(func() error { + // remove from user received states + if s.GetGrantee().Type == provider.GranteeType_GRANTEE_TYPE_USER { + return m.UserReceivedStates.Remove(ctx, s.GetGrantee().GetUserId().GetOpaqueId(), s.GetResourceId().GetStorageId()+shareid.IDDelimiter+s.GetResourceId().GetSpaceId(), s.Id.OpaqueId) + } else if s.GetGrantee().Type == provider.GranteeType_GRANTEE_TYPE_GROUP { + return m.GroupReceivedCache.Remove(ctx, s.GetGrantee().GetGroupId().GetOpaqueId(), s.Id.OpaqueId) + } + return nil + }) return eg.Wait() } diff --git a/pkg/share/manager/jsoncs3/providercache/providercache.go b/pkg/share/manager/jsoncs3/providercache/providercache.go index 5286be783d..bb30c1d3f6 100644 --- a/pkg/share/manager/jsoncs3/providercache/providercache.go +++ b/pkg/share/manager/jsoncs3/providercache/providercache.go @@ -418,6 +418,31 @@ func (c *Cache) Persist(ctx context.Context, storageID, spaceID string) error { return nil } +// PurgeSpace removes a space from the cache +func (c *Cache) PurgeSpace(ctx context.Context, storageID, spaceID string) error { + ctx, span := tracer.Start(ctx, "PurgeSpace") + defer span.End() + + unlock := c.LockSpace(spaceID) + defer unlock() + span.AddEvent("got lock") + + if !c.isSpaceCached(storageID, spaceID) { + err := c.syncWithLock(ctx, storageID, spaceID) + if err != nil { + return err + } + } + + spaces, ok := c.Providers.Load(storageID) + if !ok { + return nil + } + spaces.Spaces.Store(spaceID, &Shares{}) + + return c.Persist(ctx, storageID, spaceID) +} + func (c *Cache) syncWithLock(ctx context.Context, storageID, spaceID string) error { ctx, span := tracer.Start(ctx, "syncWithLock") defer span.End() diff --git a/pkg/share/manager/jsoncs3/providercache/providercache_test.go b/pkg/share/manager/jsoncs3/providercache/providercache_test.go index 5d62212aa2..f1e15a470b 100644 --- a/pkg/share/manager/jsoncs3/providercache/providercache_test.go +++ b/pkg/share/manager/jsoncs3/providercache/providercache_test.go @@ -181,5 +181,15 @@ var _ = Describe("Cache", func() { Expect(c.Persist(ctx, storageID, spaceID)).ToNot(Succeed()) }) }) + + Describe("PurgeSpace", func() { + It("removes the entry", func() { + Expect(c.PurgeSpace(ctx, storageID, spaceID)).To(Succeed()) + + s, err := c.Get(ctx, storageID, spaceID, shareID, false) + Expect(err).ToNot(HaveOccurred()) + Expect(s).To(BeNil()) + }) + }) }) }) diff --git a/pkg/share/manager/jsoncs3/receivedsharecache/receivedsharecache.go b/pkg/share/manager/jsoncs3/receivedsharecache/receivedsharecache.go index 8bb6bb6ad0..cc32bcbff5 100644 --- a/pkg/share/manager/jsoncs3/receivedsharecache/receivedsharecache.go +++ b/pkg/share/manager/jsoncs3/receivedsharecache/receivedsharecache.go @@ -185,6 +185,74 @@ func (c *Cache) Get(ctx context.Context, userID, spaceID, shareID string) (*Stat return rss.Spaces[spaceID].States[shareID], nil } +// Remove removes an entry from the cache +func (c *Cache) Remove(ctx context.Context, userID, spaceID, shareID string) error { + ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Grab lock") + unlock := c.lockUser(userID) + span.End() + span.SetAttributes(attribute.String("cs3.userid", userID)) + defer unlock() + + ctx, span = appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Add") + defer span.End() + span.SetAttributes(attribute.String("cs3.userid", userID), attribute.String("cs3.spaceid", spaceID)) + + persistFunc := func() error { + c.initializeIfNeeded(userID, spaceID) + + rss, _ := c.ReceivedSpaces.Load(userID) + receivedSpace := rss.Spaces[spaceID] + if receivedSpace.States == nil { + receivedSpace.States = map[string]*State{} + } + delete(receivedSpace.States, shareID) + if len(receivedSpace.States) == 0 { + delete(rss.Spaces, spaceID) + } + + return c.persist(ctx, userID) + } + + log := appctx.GetLogger(ctx).With(). + Str("hostname", os.Getenv("HOSTNAME")). + Str("userID", userID). + Str("spaceID", spaceID).Logger() + + var err error + for retries := 100; retries > 0; retries-- { + err = persistFunc() + switch err.(type) { + case nil: + span.SetStatus(codes.Ok, "") + return nil + case errtypes.Aborted: + log.Debug().Msg("aborted when persisting added received share: etag changed. retrying...") + // this is the expected status code from the server when the if-match etag check fails + // continue with sync below + case errtypes.PreconditionFailed: + log.Debug().Msg("precondition failed when persisting added received share: etag changed. retrying...") + // actually, this is the wrong status code and we treat it like errtypes.Aborted because of inconsistencies on the server side + // continue with sync below + case errtypes.AlreadyExists: + log.Debug().Msg("already exists when persisting added received share. retrying...") + // CS3 uses an already exists error instead of precondition failed when using an If-None-Match=* header / IfExists flag in the InitiateFileUpload call. + // Thas happens when the cache thinks there is no file. + // continue with sync below + default: + span.SetStatus(codes.Error, fmt.Sprintf("persisting added received share failed. giving up: %s", err.Error())) + log.Error().Err(err).Msg("persisting added received share failed") + return err + } + if err := c.syncWithLock(ctx, userID); err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + log.Error().Err(err).Msg("persisting added received share failed. giving up.") + return err + } + } + return err +} + // List returns a list of received shares for a given user // The return list is guaranteed to be thread-safe func (c *Cache) List(ctx context.Context, userID string) (map[string]*Space, error) { diff --git a/pkg/share/manager/jsoncs3/receivedsharecache/receivedsharecache_test.go b/pkg/share/manager/jsoncs3/receivedsharecache/receivedsharecache_test.go index 028dd1d054..1a79a6d864 100644 --- a/pkg/share/manager/jsoncs3/receivedsharecache/receivedsharecache_test.go +++ b/pkg/share/manager/jsoncs3/receivedsharecache/receivedsharecache_test.go @@ -134,5 +134,26 @@ var _ = Describe("Cache", func() { Expect(s).ToNot(BeNil()) }) }) + + Describe("Remove", func() { + It("removes the entry", func() { + err := c.Remove(ctx, userID, spaceID, shareID) + Expect(err).ToNot(HaveOccurred()) + + s, err := c.Get(ctx, userID, spaceID, shareID) + Expect(err).ToNot(HaveOccurred()) + Expect(s).To(BeNil()) + }) + + It("persists the removal", func() { + err := c.Remove(ctx, userID, spaceID, shareID) + Expect(err).ToNot(HaveOccurred()) + + c = receivedsharecache.New(storage, 0*time.Second) + s, err := c.Get(ctx, userID, spaceID, shareID) + Expect(err).ToNot(HaveOccurred()) + Expect(s).To(BeNil()) + }) + }) }) })