Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DB Schema migration tool #9

Draft
wants to merge 2 commits into
base: fix/tx-sharing
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions cmd/migrator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package main

import (
"flag"
"fmt"

"github.com/cernbox/reva-plugins/share/sql"
)

func main() {
username := flag.String("username", "cernbox_server", "Database username")
password := flag.String("password", "", "Database password")
host := flag.String("host", "dbod-cboxeos.cern.ch", "Database host")
port := flag.Int("port", 5504, "Database port")
name := flag.String("name", "cernboxngcopy", "Database name")
gatewaysvc := flag.String("gatewaysvc", "localhost:9142", "Gateway service location")
token := flag.String("token", "", "JWT token for gateway svc")
dryRun := flag.Bool("dryrun", true, "Use dry run?")

flag.Parse()

fmt.Printf("Connecting to %s@%s:%d\n", *username, *host, *port)
sql.RunMigration(*username, *password, *host, *name, *gatewaysvc, *token, *port, *dryRun)
}
4 changes: 2 additions & 2 deletions share/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type ProtoShare struct {
Instance string
Orphan bool
Description string
Expiration datatypes.Null[datatypes.Date]
Expiration datatypes.NullTime //datatypes.Null[datatypes.Date]
}

type Share struct {
Expand Down Expand Up @@ -122,7 +122,7 @@ func (p *PublicLink) AsCS3PublicShare() *link.PublicShare {
}
var expires *typespb.Timestamp
if p.Expiration.Valid {
exp, err := p.Expiration.V.Value()
exp, err := p.Expiration.Value()
if err == nil {
expiration := exp.(time.Time)
expires = &typespb.Timestamp{
Expand Down
320 changes: 320 additions & 0 deletions share/sql/migrate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,320 @@
package sql

import (
"context"
"database/sql"
"fmt"
"os"
"time"

model "github.com/cernbox/reva-plugins/share"
providerv1beta1 "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/pkg/appctx"
"github.com/cs3org/reva/pkg/errtypes"
"github.com/pkg/errors"
"google.golang.org/grpc/metadata"
"gorm.io/datatypes"
"gorm.io/gorm"
)

type Migrator struct {
NewDb *gorm.DB
OldDb *sql.DB
ShareMgr *mgr
}

type ShareOrLink struct {
IsShare bool
Share *model.Share
Link *model.PublicLink
}

type OldShareEntry struct {
ID int
UIDOwner string
UIDInitiator string
Prefix string
ItemSource string
ItemType string
ShareWith string
Token string
Expiration string
Permissions int
ShareType int
ShareName string
STime int
FileTarget string
State int
Quicklink bool
Description string
NotifyUploads bool
NotifyUploadsExtraRecipients sql.NullString
Orphan bool
}

type OldShareState struct {
id int
recipient string
state int
}

const (
bufferSize = 10
numWorkers = 10
)

func RunMigration(username, password, host, name, gatewaysvc, token string, port int, dryRun bool) {
// Config
config := map[string]interface{}{
"engine": "mysql",
"db_username": username,
"db_password": password,
"db_host": host,
"db_port": port,
"db_name": name,
"gatewaysvc": gatewaysvc,
"dry_run": dryRun,
}
// Authenticate to gateway service
tokenlessCtx, cancel := context.WithCancel(context.Background())
ctx := appctx.ContextSetToken(tokenlessCtx, token)
ctx = metadata.AppendToOutgoingContext(ctx, appctx.TokenHeader, token)
defer cancel()

// Set up migrator
shareManager, err := New(ctx, config)
if err != nil {
fmt.Println("Failed to create shareManager: " + err.Error())
os.Exit(1)
}
sharemgr := shareManager.(*mgr)
oldDb, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", username, password, host, port, name))
if err != nil {
fmt.Println("Failed to create db: " + err.Error())
os.Exit(1)
}
migrator := Migrator{
OldDb: oldDb,
NewDb: sharemgr.db,
ShareMgr: sharemgr,
}

if dryRun {
migrator.NewDb = migrator.NewDb.Debug()
}

migrateShares(ctx, migrator)
fmt.Println("---------------------------------")
migrateShareStatuses(ctx, migrator)

}

func migrateShares(ctx context.Context, migrator Migrator) {
// Check how many shares are to be migrated
count, err := getCount(migrator, "oc_share")
if err != nil {
fmt.Println("Error getting count: " + err.Error())
return
}
fmt.Printf("Migrating %d shares\n", count)

// Get all old shares
query := "select id, coalesce(uid_owner, '') as uid_owner, coalesce(uid_initiator, '') as uid_initiator, lower(coalesce(share_with, '')) as share_with, coalesce(fileid_prefix, '') as fileid_prefix, coalesce(item_source, '') as item_source, coalesce(item_type, '') as item_type, stime, permissions, share_type, orphan FROM oc_share order by id desc" // AND id=?"
params := []interface{}{}

res, err := migrator.OldDb.Query(query, params...)

if err != nil {
fmt.Printf("Fatal error: %s", err.Error())
os.Exit(1)
}

// Create channel for workers
ch := make(chan *OldShareEntry, bufferSize)
defer close(ch)

// Start all workers
for range numWorkers {
go workerShare(ctx, migrator, ch)
}

for res.Next() {
var s OldShareEntry
res.Scan(&s.ID, &s.UIDOwner, &s.UIDInitiator, &s.ShareWith, &s.Prefix, &s.ItemSource, &s.ItemType, &s.STime, &s.Permissions, &s.ShareType, &s.Orphan)
if err == nil {
ch <- &s
} else {
fmt.Printf("Error occured for share %d: %s\n", s.ID, err.Error())
}
}
}

func migrateShareStatuses(ctx context.Context, migrator Migrator) {
// Check how many shares are to be migrated
count, err := getCount(migrator, "oc_share")
if err != nil {
fmt.Println("Error getting count: " + err.Error())
return
}
fmt.Printf("Migrating %d share statuses\n", count)

// Get all old shares
query := "select id, coalesce(recipient, '') as recipient, state FROM oc_share_status order by id desc"
params := []interface{}{}

res, err := migrator.OldDb.Query(query, params...)

if err != nil {
fmt.Printf("Fatal error: %s", err.Error())
os.Exit(1)
}

// Create channel for workers
ch := make(chan *OldShareState, bufferSize)
defer close(ch)

// Start all workers
for range numWorkers {
go workerState(ctx, migrator, ch)
}

for res.Next() {
var s OldShareState
res.Scan(&s.id, &s.recipient, &s.state)
if err == nil {
ch <- &s
} else {
fmt.Printf("Error occured for share status%d: %s\n", s.id, err.Error())
}
}
}

func workerShare(ctx context.Context, migrator Migrator, ch chan *OldShareEntry) {
for share := range ch {
handleSingleShare(ctx, migrator, share)
}
}

func workerState(ctx context.Context, migrator Migrator, ch chan *OldShareState) {
for state := range ch {
handleSingleState(ctx, migrator, state)
}
}

func handleSingleShare(ctx context.Context, migrator Migrator, s *OldShareEntry) {
share, err := oldShareToNewShare(ctx, migrator, s)
if err != nil {
return
}
// TODO error handling
if share.IsShare {
migrator.NewDb.Create(&share.Share)
} else {
migrator.NewDb.Create(&share.Link)
}
}

func handleSingleState(ctx context.Context, migrator Migrator, s *OldShareState) {
// case collaboration.ShareState_SHARE_STATE_REJECTED:
// state = -1
// case collaboration.ShareState_SHARE_STATE_ACCEPTED:
// state = 1

newShareState := &model.ShareState{
ShareID: uint(s.id),
Model: gorm.Model{
ID: uint(s.id),
},
User: s.recipient,
Hidden: s.state == -1, // Hidden if REJECTED
Synced: true, // for now, we always sync? or not? TODO
}
migrator.NewDb.Create(&newShareState)
}

func oldShareToNewShare(ctx context.Context, migrator Migrator, s *OldShareEntry) (*ShareOrLink, error) {
expirationDate, expirationError := time.Parse("2006-01-02 15:04:05", s.Expiration)

protoShare := model.ProtoShare{
Model: gorm.Model{
ID: uint(s.ID),
CreatedAt: time.Unix(int64(s.STime), 0),
UpdatedAt: time.Unix(int64(s.STime), 0),
},
UIDOwner: s.UIDOwner,
UIDInitiator: s.UIDInitiator,
Description: s.Description,
Permissions: uint8(s.Permissions),
Orphan: s.Orphan, // will be re-checked later
Expiration: datatypes.Null[time.Time]{
V: expirationDate,
Valid: expirationError == nil,
},
ItemType: model.ItemType(s.ItemType),
InitialPath: "", // set later
Inode: s.ItemSource,
Instance: s.Prefix,
}

// Getting InitialPath
if !protoShare.Orphan {
path, err := migrator.ShareMgr.getPath(ctx, &providerv1beta1.ResourceId{
StorageId: protoShare.Instance,
OpaqueId: protoShare.Inode,
})
if err == nil {
protoShare.InitialPath = path
} else if errors.Is(err, errtypes.NotFound(protoShare.Inode)) {
protoShare.Orphan = true
} else {
// We do not set, because of a general error
fmt.Printf("An error occured for share %d while statting (%s, %s): %s\n", s.ID, protoShare.Instance, protoShare.Inode, err.Error())
}
}

// ShareTypeUser = 0
// ShareTypeGroup = 1
// ShareTypePublicLink = 3
// ShareTypeFederatedCloudShare = 6
// ShareTypeSpaceMembership = 7
if s.ShareType == 0 || s.ShareType == 1 {
return &ShareOrLink{
IsShare: true,
Share: &model.Share{
ProtoShare: protoShare,
ShareWith: s.ShareWith,
SharedWithIsGroup: s.ShareType == 1,
},
}, nil
} else if s.ShareType == 3 {
notifyUploadsExtraRecipients := ""
if s.NotifyUploadsExtraRecipients.Valid {
notifyUploadsExtraRecipients = s.NotifyUploadsExtraRecipients.String
}
return &ShareOrLink{
IsShare: false,
Link: &model.PublicLink{
ProtoShare: protoShare,
Token: s.Token,
Quicklink: s.Quicklink,
NotifyUploads: s.NotifyUploads,
NotifyUploadsExtraRecipients: notifyUploadsExtraRecipients,
Password: s.ShareWith,
ShareName: s.ShareName,
},
}, nil
} else {
return nil, errors.New("Invalid share type")
}
}

func getCount(migrator Migrator, table string) (int, error) {
res := 0
query := "select count(*) from " + table
params := []interface{}{}

if err := migrator.OldDb.QueryRow(query, params...).Scan(&res); err != nil {
return 0, err
}
return res, nil
}
Loading