Skip to content

Commit

Permalink
Make migrations sequential - 2
Browse files Browse the repository at this point in the history
  • Loading branch information
aditya1702 committed Aug 8, 2024
1 parent 2277d08 commit 78a9e23
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 116 deletions.
78 changes: 47 additions & 31 deletions cmd/soroban-rpc/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,44 +317,60 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows
}
ledgerSeqRange = ledgerSeqRange.Merge(applicableRange)

dataMigrations, err := db.BuildMigrations(readTxMetaCtx, d.logger, d.db, cfg)
dataMigrations, err := db.BuildMigrations(readTxMetaCtx, d.logger, d.db, cfg.NetworkPassphrase, ledgerSeqRange)
if err != nil {
d.logger.WithError(err).Fatal("could not build migrations")
}

err = db.NewLedgerReader(d.db).StreamLedgerRange(
readTxMetaCtx,
ledgerSeqRange.FirstLedgerSeq,
ledgerSeqRange.LastLedgerSeq,
func(txmeta xdr.LedgerCloseMeta) error {
currentSeq = txmeta.LedgerSequence()
if initialSeq == 0 {
initialSeq = currentSeq
d.logger.WithFields(supportlog.F{
"seq": currentSeq,
}).Info("initializing in-memory store")
} else if (currentSeq-initialSeq)%inMemoryInitializationLedgerLogPeriod == 0 {
d.logger.WithFields(supportlog.F{
"seq": currentSeq,
}).Debug("still initializing in-memory store")
}
for _, migrationFactory := range dataMigrations {
// create guarded migration
guardedMigration, err := db.NewGuardedDataMigration(
readTxMetaCtx,
migrationFactory.MigrationName,
migrationFactory.Logger,
migrationFactory.Factory,
migrationFactory.DB,
)
if err != nil {
d.logger.WithError(err).Fatal("could not create guarded migration for: %s",

Check failure on line 335 in cmd/soroban-rpc/internal/daemon/daemon.go

View workflow job for this annotation

GitHub Actions / golangci-lint

printf: (*github.com/stellar/go/support/log.Entry).Fatal call has possible Printf formatting directive %s (govet)

Check failure on line 335 in cmd/soroban-rpc/internal/daemon/daemon.go

View workflow job for this annotation

GitHub Actions / Unit tests (ubuntu-20.04)

(*github.com/stellar/go/support/log.Entry).Fatal call has possible Printf formatting directive %s

Check failure on line 335 in cmd/soroban-rpc/internal/daemon/daemon.go

View workflow job for this annotation

GitHub Actions / Unit tests (ubuntu-22.04)

(*github.com/stellar/go/support/log.Entry).Fatal call has possible Printf formatting directive %s
migrationFactory.MigrationName)
}

if err := feewindows.IngestFees(txmeta); err != nil {
d.logger.WithError(err).Fatal("could not initialize fee stats")
}
err = db.NewLedgerReader(d.db).StreamLedgerRange(
readTxMetaCtx,
ledgerSeqRange.FirstLedgerSeq,
ledgerSeqRange.LastLedgerSeq,
func(txmeta xdr.LedgerCloseMeta) error {
currentSeq = txmeta.LedgerSequence()
if initialSeq == 0 {
initialSeq = currentSeq
d.logger.WithFields(supportlog.F{
"seq": currentSeq,
}).Info("initializing in-memory store")
} else if (currentSeq-initialSeq)%inMemoryInitializationLedgerLogPeriod == 0 {
d.logger.WithFields(supportlog.F{
"seq": currentSeq,
}).Debug("still initializing in-memory store")
}

if applicableRange.IsLedgerIncluded(currentSeq) {
if err := dataMigrations.Apply(readTxMetaCtx, txmeta); err != nil {
d.logger.WithError(err).Fatal("could not run migrations")
if err := feewindows.IngestFees(txmeta); err != nil {
d.logger.WithError(err).Fatal("could not initialize fee stats")
}
}
return nil
})
if err != nil {
d.logger.WithError(err).Fatal("could not obtain txmeta cache from the database")
}
if err := dataMigrations.Commit(readTxMetaCtx); err != nil {
d.logger.WithError(err).Fatal("could not commit data migrations")

if applicableRange.IsLedgerIncluded(currentSeq) {
if err := guardedMigration.Apply(readTxMetaCtx, txmeta); err != nil {
d.logger.WithError(err).Fatal("could not run migrations")
}
}
return nil
})
if err != nil {
d.logger.WithError(err).Fatal("could not obtain txmeta cache from the database")
}
if err := guardedMigration.Commit(readTxMetaCtx); err != nil {
d.logger.WithError(err).Fatal("could not commit migration for: %s",

Check failure on line 371 in cmd/soroban-rpc/internal/daemon/daemon.go

View workflow job for this annotation

GitHub Actions / golangci-lint

printf: (*github.com/stellar/go/support/log.Entry).Fatal call has possible Printf formatting directive %s (govet)

Check failure on line 371 in cmd/soroban-rpc/internal/daemon/daemon.go

View workflow job for this annotation

GitHub Actions / Unit tests (ubuntu-20.04)

(*github.com/stellar/go/support/log.Entry).Fatal call has possible Printf formatting directive %s

Check failure on line 371 in cmd/soroban-rpc/internal/daemon/daemon.go

View workflow job for this annotation

GitHub Actions / Unit tests (ubuntu-22.04)

(*github.com/stellar/go/support/log.Entry).Fatal call has possible Printf formatting directive %s
migrationFactory.MigrationName)
}
}

if currentSeq != 0 {
Expand Down
10 changes: 3 additions & 7 deletions cmd/soroban-rpc/internal/db/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,24 +245,20 @@ func (e *eventTableMigration) Apply(_ context.Context, meta xdr.LedgerCloseMeta)

func newEventTableMigration(
logger *log.Entry,
retentionWindow uint32,
passphrase string,
ledgerSeqRange *LedgerSeqRange,
) migrationApplierFactory {
return migrationApplierFactoryF(func(db *DB, latestLedger uint32) (MigrationApplier, error) {
firstLedgerToMigrate := firstLedger
writer := &eventHandler{
log: logger,
db: db,
stmtCache: sq.NewStmtCache(db.GetTx()),
passphrase: passphrase,
}
if latestLedger > retentionWindow {
firstLedgerToMigrate = latestLedger - retentionWindow
}

migration := eventTableMigration{
firstLedger: firstLedgerToMigrate,
lastLedger: latestLedger,
firstLedger: ledgerSeqRange.FirstLedgerSeq,
lastLedger: ledgerSeqRange.LastLedgerSeq,
writer: writer,
}
return &migration, nil
Expand Down
103 changes: 33 additions & 70 deletions cmd/soroban-rpc/internal/db/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ import (

"github.com/stellar/go/support/log"
"github.com/stellar/go/xdr"
)

"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/config"
const (
TransactionsMigrationName = "TransactionsTable"
EventsMigrationName = "EventsTable"
)

type LedgerSeqRange struct {
Expand Down Expand Up @@ -63,49 +66,11 @@ type Migration interface {
Rollback(ctx context.Context) error
}

type multiMigration []Migration

func (mm multiMigration) ApplicableRange() *LedgerSeqRange {
var result *LedgerSeqRange
for _, m := range mm {
result = m.ApplicableRange().Merge(result)
}
return result
}

func (mm multiMigration) Apply(ctx context.Context, meta xdr.LedgerCloseMeta) error {
var err error
for _, m := range mm {
ledgerSeq := meta.LedgerSequence()
if !m.ApplicableRange().IsLedgerIncluded(ledgerSeq) {
// The range of a sub-migration can be smaller than the global range.
continue
}
if localErr := m.Apply(ctx, meta); localErr != nil {
err = errors.Join(err, localErr)
}
}
return err
}

func (mm multiMigration) Commit(ctx context.Context) error {
var err error
for _, m := range mm {
if localErr := m.Commit(ctx); localErr != nil {
err = errors.Join(err, localErr)
}
}
return err
}

func (mm multiMigration) Rollback(ctx context.Context) error {
var err error
for _, m := range mm {
if localErr := m.Rollback(ctx); localErr != nil {
err = errors.Join(err, localErr)
}
}
return err
type MigrationFactory struct {
Factory migrationApplierFactory
DB *DB
Logger *log.Entry
MigrationName string
}

// guardedMigration is a db data migration whose application is guarded by a boolean in the meta table
Expand All @@ -119,7 +84,7 @@ type guardedMigration struct {
applyLogged bool
}

func newGuardedDataMigration(
func NewGuardedDataMigration(
ctx context.Context, uniqueMigrationName string, logger *log.Entry, factory migrationApplierFactory, db *DB,
) (Migration, error) {
migrationDB := &DB{
Expand Down Expand Up @@ -205,35 +170,33 @@ func GetMigrationLedgerRange(ctx context.Context, db *DB, retentionWindow uint32
}, nil
}

func BuildMigrations(ctx context.Context, logger *log.Entry, db *DB, cfg *config.Config) (Migration, error) {
var migrations []Migration
func BuildMigrations(ctx context.Context, logger *log.Entry, db *DB, networkPassphrase string, ledgerSeqRange *LedgerSeqRange) ([]MigrationFactory, error) {
var migrations []MigrationFactory

migrationName := "TransactionsTable"
logger = logger.WithField("migration", migrationName)
factory := newTransactionTableMigration(
transactionFactory := newTransactionTableMigration(
ctx,
logger,
cfg.HistoryRetentionWindow,
cfg.NetworkPassphrase,
logger.WithField("migration", TransactionsMigrationName),
networkPassphrase,
ledgerSeqRange,
)
migrations = append(migrations, MigrationFactory{
Factory: transactionFactory,
DB: db,
Logger: logger,
MigrationName: TransactionsMigrationName,
})

m1, err := newGuardedDataMigration(ctx, migrationName, logger, factory, db)
if err != nil {
return nil, fmt.Errorf("could not create guarded transaction migration: %w", err)
}
migrations = append(migrations, m1)

eventMigrationName := "EventsTable"
eventFactory := newEventTableMigration(
logger.WithField("migration", eventMigrationName),
cfg.HistoryRetentionWindow,
cfg.NetworkPassphrase,
logger.WithField("migration", EventsMigrationName),
networkPassphrase,
ledgerSeqRange,
)
m2, err := newGuardedDataMigration(ctx, eventMigrationName, logger, eventFactory, db)
if err != nil {
return nil, fmt.Errorf("could not create guarded event migration: %w", err)
}
migrations = append(migrations, m2)

return multiMigration(migrations), nil
migrations = append(migrations, MigrationFactory{
Factory: eventFactory,
DB: db,
Logger: logger,
MigrationName: EventsMigrationName,
})

return migrations, nil
}
15 changes: 7 additions & 8 deletions cmd/soroban-rpc/internal/db/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,29 +266,28 @@ func (t *transactionTableMigration) Apply(_ context.Context, meta xdr.LedgerClos
return t.writer.InsertTransactions(meta)
}

func newTransactionTableMigration(ctx context.Context, logger *log.Entry,
retentionWindow uint32, passphrase string,
func newTransactionTableMigration(
ctx context.Context,
logger *log.Entry,
passphrase string,
ledgerSeqRange *LedgerSeqRange,
) migrationApplierFactory {
return migrationApplierFactoryF(func(db *DB, latestLedger uint32) (MigrationApplier, error) {
firstLedgerToMigrate := uint32(2) //nolint:mnd
writer := &transactionHandler{
log: logger,
db: db,
stmtCache: sq.NewStmtCache(db.GetTx()),
passphrase: passphrase,
}
if latestLedger > retentionWindow {
firstLedgerToMigrate = latestLedger - retentionWindow
}
// Truncate the table, since it may contain data, causing insert conflicts later on.
// (the migration was shipped after the actual transactions table change)
_, err := db.Exec(ctx, sq.Delete(transactionTableName))
if err != nil {
return nil, fmt.Errorf("couldn't delete table %q: %w", transactionTableName, err)
}
migration := transactionTableMigration{
firstLedger: firstLedgerToMigrate,
lastLedger: latestLedger,
firstLedger: ledgerSeqRange.FirstLedgerSeq,
lastLedger: ledgerSeqRange.LastLedgerSeq,
writer: writer,
}
return &migration, nil
Expand Down

0 comments on commit 78a9e23

Please sign in to comment.