Skip to content

Commit

Permalink
Centralize all purge API calls
Browse files Browse the repository at this point in the history
  • Loading branch information
turt2live committed Aug 4, 2023
1 parent 902eaa3 commit 4116e78
Show file tree
Hide file tree
Showing 9 changed files with 443 additions and 168 deletions.
184 changes: 100 additions & 84 deletions api/custom/purge.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,20 @@
package custom

import (
"database/sql"
"net/http"
"strconv"

"github.com/getsentry/sentry-go"
"github.com/turt2live/matrix-media-repo/api/_apimeta"
"github.com/turt2live/matrix-media-repo/api/_responses"
"github.com/turt2live/matrix-media-repo/api/_routers"
"github.com/turt2live/matrix-media-repo/database"
"github.com/turt2live/matrix-media-repo/tasks/task_runner"

"github.com/sirupsen/logrus"
"github.com/turt2live/matrix-media-repo/common"
"github.com/turt2live/matrix-media-repo/common/rcontext"
"github.com/turt2live/matrix-media-repo/controllers/maintenance_controller"
"github.com/turt2live/matrix-media-repo/matrix"
"github.com/turt2live/matrix-media-repo/storage"
"github.com/turt2live/matrix-media-repo/types"
"github.com/turt2live/matrix-media-repo/util"
)

Expand Down Expand Up @@ -51,8 +48,7 @@ func PurgeRemoteMedia(r *http.Request, rctx rcontext.RequestContext, user _apime
}

func PurgeIndividualRecord(r *http.Request, rctx rcontext.RequestContext, user _apimeta.UserInfo) interface{} {
isGlobalAdmin, isLocalAdmin := _apimeta.GetRequestUserAdminStatus(r, rctx, user)
localServerName := r.Host
authCtx, _, _ := getPurgeAuthContext(rctx, r, user)

server := _routers.GetParam("server", r)
mediaId := _routers.GetParam("mediaId", r)
Expand All @@ -66,66 +62,54 @@ func PurgeIndividualRecord(r *http.Request, rctx rcontext.RequestContext, user _
"mediaId": mediaId,
})

// If the user is NOT a global admin, ensure they are speaking to the right server
if !isGlobalAdmin {
if server != localServerName {
_, err := task_runner.PurgeMedia(rctx, authCtx, &task_runner.QuarantineThis{
Single: &task_runner.QuarantineRecord{
Origin: server,
MediaId: mediaId,
},
})
if err != nil {
if err == common.ErrWrongUser {
return _responses.AuthFailed()
}
// If the user is NOT a local admin, ensure they uploaded the content in the first place
if !isLocalAdmin {
db := storage.GetDatabase().GetMediaStore(rctx)
m, err := db.Get(server, mediaId)
if err == sql.ErrNoRows {
return _responses.NotFoundError()
}
if err != nil {
rctx.Log.Error("Error checking ownership of media: ", err)
sentry.CaptureException(err)
return _responses.InternalServerError("error checking media ownership")
}
if m.UserId != user.UserId {
return _responses.AuthFailed()
}
}
}

err := maintenance_controller.PurgeMedia(server, mediaId, rctx)
if err == sql.ErrNoRows || err == common.ErrMediaNotFound {
return _responses.NotFoundError()
}
if err != nil {
rctx.Log.Error("Error purging media: ", err)
rctx.Log.Error(err)
sentry.CaptureException(err)
return _responses.InternalServerError("error purging media")
return _responses.InternalServerError("unexpected error")
}

return &_responses.DoNotCacheResponse{Payload: map[string]interface{}{"purged": true}}
}

func PurgeQuarantined(r *http.Request, rctx rcontext.RequestContext, user _apimeta.UserInfo) interface{} {
isGlobalAdmin, isLocalAdmin := _apimeta.GetRequestUserAdminStatus(r, rctx, user)
localServerName := r.Host
authCtx, isGlobalAdmin, isLocalAdmin := getPurgeAuthContext(rctx, r, user)

var affected []*types.Media
var affected []*database.DbMedia
var err error

mediaDb := database.GetInstance().Media.Prepare(rctx)
if isGlobalAdmin {
affected, err = maintenance_controller.PurgeQuarantined(rctx)
affected, err = mediaDb.GetByQuarantine()
} else if isLocalAdmin {
affected, err = maintenance_controller.PurgeQuarantinedFor(localServerName, rctx)
affected, err = mediaDb.GetByOriginQuarantine(r.Host)
} else {
return _responses.AuthFailed()
}

if err != nil {
rctx.Log.Error("Error purging media: ", err)
rctx.Log.Error(err)
sentry.CaptureException(err)
return _responses.InternalServerError("error purging media")
return _responses.InternalServerError("error fetching media records")
}

mxcs := make([]string, 0)
for _, a := range affected {
mxcs = append(mxcs, a.MxcUri())
mxcs, err := task_runner.PurgeMedia(rctx, authCtx, &task_runner.QuarantineThis{
DbMedia: affected,
})
if err != nil {
if err == common.ErrWrongUser {
return _responses.AuthFailed()
}
rctx.Log.Error(err)
sentry.CaptureException(err)
return _responses.InternalServerError("unexpected error")
}

return &_responses.DoNotCacheResponse{Payload: map[string]interface{}{"purged": true, "affected": mxcs}}
Expand Down Expand Up @@ -156,24 +140,36 @@ func PurgeOldMedia(r *http.Request, rctx rcontext.RequestContext, user _apimeta.
"include_local": includeLocal,
})

affected, err := maintenance_controller.PurgeOldMedia(beforeTs, includeLocal, rctx)
domains := make([]string, 0)
if !includeLocal {
domains = util.GetOurDomains()
}

mediaDb := database.GetInstance().Media.Prepare(rctx)
records, err := mediaDb.GetOldExcluding(domains, beforeTs)
if err != nil {
rctx.Log.Error("Error purging media: ", err)
rctx.Log.Error(err)
sentry.CaptureException(err)
return _responses.InternalServerError("error purging media")
return _responses.InternalServerError("error fetching media records")
}

mxcs := make([]string, 0)
for _, a := range affected {
mxcs = append(mxcs, a.MxcUri())
mxcs, err := task_runner.PurgeMedia(rctx, &task_runner.PurgeAuthContext{}, &task_runner.QuarantineThis{
DbMedia: records,
})
if err != nil {
if err == common.ErrWrongUser {
return _responses.AuthFailed()
}
rctx.Log.Error(err)
sentry.CaptureException(err)
return _responses.InternalServerError("unexpected error")
}

return &_responses.DoNotCacheResponse{Payload: map[string]interface{}{"purged": true, "affected": mxcs}}
}

func PurgeUserMedia(r *http.Request, rctx rcontext.RequestContext, user _apimeta.UserInfo) interface{} {
isGlobalAdmin, isLocalAdmin := _apimeta.GetRequestUserAdminStatus(r, rctx, user)
authCtx, isGlobalAdmin, isLocalAdmin := getPurgeAuthContext(rctx, r, user)
if !isGlobalAdmin && !isLocalAdmin {
return _responses.AuthFailed()
}
Expand Down Expand Up @@ -206,24 +202,31 @@ func PurgeUserMedia(r *http.Request, rctx rcontext.RequestContext, user _apimeta
return _responses.AuthFailed()
}

affected, err := maintenance_controller.PurgeUserMedia(userId, beforeTs, rctx)

mediaDb := database.GetInstance().Media.Prepare(rctx)
records, err := mediaDb.GetOldByUserId(userId, beforeTs)
if err != nil {
rctx.Log.Error("Error purging media: ", err)
rctx.Log.Error(err)
sentry.CaptureException(err)
return _responses.InternalServerError("error purging media")
return _responses.InternalServerError("error fetching media records")
}

mxcs := make([]string, 0)
for _, a := range affected {
mxcs = append(mxcs, a.MxcUri())
mxcs, err := task_runner.PurgeMedia(rctx, authCtx, &task_runner.QuarantineThis{
DbMedia: records,
})
if err != nil {
if err == common.ErrWrongUser {
return _responses.AuthFailed()
}
rctx.Log.Error(err)
sentry.CaptureException(err)
return _responses.InternalServerError("unexpected error")
}

return &_responses.DoNotCacheResponse{Payload: map[string]interface{}{"purged": true, "affected": mxcs}}
}

func PurgeRoomMedia(r *http.Request, rctx rcontext.RequestContext, user _apimeta.UserInfo) interface{} {
isGlobalAdmin, isLocalAdmin := _apimeta.GetRequestUserAdminStatus(r, rctx, user)
authCtx, isGlobalAdmin, isLocalAdmin := getPurgeAuthContext(rctx, r, user)
if !isGlobalAdmin && !isLocalAdmin {
return _responses.AuthFailed()
}
Expand Down Expand Up @@ -276,32 +279,27 @@ func PurgeRoomMedia(r *http.Request, rctx rcontext.RequestContext, user _apimeta
mxcs = append(mxcs, mxc)
}
} else {
for _, mxc := range allMedia.LocalMxcs {
mxcs = append(mxcs, mxc)
}
for _, mxc := range allMedia.RemoteMxcs {
mxcs = append(mxcs, mxc)
}
mxcs = append(mxcs, allMedia.LocalMxcs...)
mxcs = append(mxcs, allMedia.RemoteMxcs...)
}

affected, err := maintenance_controller.PurgeRoomMedia(mxcs, beforeTs, rctx)

mxcs2, err := task_runner.PurgeMedia(rctx, authCtx, &task_runner.QuarantineThis{
MxcUris: mxcs,
})
if err != nil {
rctx.Log.Error("Error purging media: ", err)
if err == common.ErrWrongUser {
return _responses.AuthFailed()
}
rctx.Log.Error(err)
sentry.CaptureException(err)
return _responses.InternalServerError("error purging media")
return _responses.InternalServerError("unexpected error")
}

mxcs = make([]string, 0)
for _, a := range affected {
mxcs = append(mxcs, a.MxcUri())
}

return &_responses.DoNotCacheResponse{Payload: map[string]interface{}{"purged": true, "affected": mxcs}}
return &_responses.DoNotCacheResponse{Payload: map[string]interface{}{"purged": true, "affected": mxcs2}}
}

func PurgeDomainMedia(r *http.Request, rctx rcontext.RequestContext, user _apimeta.UserInfo) interface{} {
isGlobalAdmin, isLocalAdmin := _apimeta.GetRequestUserAdminStatus(r, rctx, user)
authCtx, isGlobalAdmin, isLocalAdmin := getPurgeAuthContext(rctx, r, user)
if !isGlobalAdmin && !isLocalAdmin {
return _responses.AuthFailed()
}
Expand Down Expand Up @@ -331,18 +329,36 @@ func PurgeDomainMedia(r *http.Request, rctx rcontext.RequestContext, user _apime
return _responses.AuthFailed()
}

affected, err := maintenance_controller.PurgeDomainMedia(serverName, beforeTs, rctx)

mediaDb := database.GetInstance().Media.Prepare(rctx)
records, err := mediaDb.GetOldByOrigin(serverName, beforeTs)
if err != nil {
rctx.Log.Error("Error purging media: ", err)
rctx.Log.Error(err)
sentry.CaptureException(err)
return _responses.InternalServerError("error purging media")
return _responses.InternalServerError("error fetching media records")
}

mxcs := make([]string, 0)
for _, a := range affected {
mxcs = append(mxcs, a.MxcUri())
mxcs, err := task_runner.PurgeMedia(rctx, authCtx, &task_runner.QuarantineThis{
DbMedia: records,
})
if err != nil {
if err == common.ErrWrongUser {
return _responses.AuthFailed()
}
rctx.Log.Error(err)
sentry.CaptureException(err)
return _responses.InternalServerError("unexpected error")
}

return &_responses.DoNotCacheResponse{Payload: map[string]interface{}{"purged": true, "affected": mxcs}}
}

func getPurgeAuthContext(ctx rcontext.RequestContext, r *http.Request, user _apimeta.UserInfo) (*task_runner.PurgeAuthContext, bool, bool) {
globalAdmin, localAdmin := _apimeta.GetRequestUserAdminStatus(r, ctx, user)
if globalAdmin {
return &task_runner.PurgeAuthContext{}, true, localAdmin
}
if localAdmin {
return &task_runner.PurgeAuthContext{SourceOrigin: r.Host}, false, true
}
return &task_runner.PurgeAuthContext{UploaderUserId: user.UserId}, false, false
}
Loading

0 comments on commit 4116e78

Please sign in to comment.