Skip to content

Commit

Permalink
feat: store retries and discarded certificates on storage (configurable)
Browse files Browse the repository at this point in the history
  • Loading branch information
joanestebanr committed Nov 28, 2024
1 parent 521b968 commit 80d5f0a
Show file tree
Hide file tree
Showing 10 changed files with 113 additions and 35 deletions.
9 changes: 8 additions & 1 deletion aggsender/aggsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ func New(
l1InfoTreeSyncer *l1infotreesync.L1InfoTreeSync,
l2Syncer types.L2BridgeSyncer,
epochNotifier types.EpochNotifier) (*AggSender, error) {
storage, err := db.NewAggSenderSQLStorage(logger, cfg.StoragePath)
storageConfig := db.AggSenderSQLStorageConfig{
DBPath: cfg.StoragePath,
KeepCertificatesHistory: cfg.KeepCertificatesHistory,
}
storage, err := db.NewAggSenderSQLStorage(logger, storageConfig)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -153,12 +157,14 @@ func (a *AggSender) sendCertificate(ctx context.Context) (*agglayer.SignedCertif
return nil, err
}
previousToBlock := uint64(0)
retryCount := 0
if lastSentCertificateInfo != nil {
previousToBlock = lastSentCertificateInfo.ToBlock
if lastSentCertificateInfo.Status == agglayer.InError {
// if the last certificate was in error, we need to resend it
// from the block before the error
previousToBlock = lastSentCertificateInfo.FromBlock - 1
retryCount = lastSentCertificateInfo.RetryCount + 1
}
}

Expand Down Expand Up @@ -216,6 +222,7 @@ func (a *AggSender) sendCertificate(ctx context.Context) (*agglayer.SignedCertif
prevLER := common.BytesToHash(certificate.PrevLocalExitRoot[:])
certInfo := types.CertificateInfo{
Height: certificate.Height,
RetryCount: retryCount,
CertificateID: certificateHash,
NewLocalExitRoot: certificate.NewLocalExitRoot,
PreviousLocalExitRoot: &prevLER,
Expand Down
6 changes: 5 additions & 1 deletion aggsender/aggsender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1951,7 +1951,11 @@ func newAggsenderTestData(t *testing.T, creationFlags testDataFlags) *aggsenderT
pc, _, _, _ := runtime.Caller(1)
part := runtime.FuncForPC(pc)
dbPath := fmt.Sprintf("file:%d?mode=memory&cache=shared", part.Entry())
storage, err = db.NewAggSenderSQLStorage(logger, dbPath)
storageConfig := db.AggSenderSQLStorageConfig{
DBPath: dbPath,
KeepCertificatesHistory: true,
}
storage, err = db.NewAggSenderSQLStorage(logger, storageConfig)
require.NoError(t, err)
}

Expand Down
2 changes: 2 additions & 0 deletions aggsender/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type Config struct {
// DelayBeetweenRetries is the delay between retries:
// is used on store Certificate and also in initial check
DelayBeetweenRetries types.Duration `mapstructure:"DelayBeetweenRetries"`
// KeepCertificatesHistory is a flag to keep the certificates history on storage
KeepCertificatesHistory bool `mapstructure:"KeepCertificatesHistory"`
}

// String returns a string representation of the Config
Expand Down
55 changes: 39 additions & 16 deletions aggsender/db/aggsender_db_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,26 +36,33 @@ type AggSenderStorage interface {

var _ AggSenderStorage = (*AggSenderSQLStorage)(nil)

// AggSenderSQLStorageConfig is the configuration for the AggSenderSQLStorage
type AggSenderSQLStorageConfig struct {
DBPath string
KeepCertificatesHistory bool
}

// AggSenderSQLStorage is the struct that implements the AggSenderStorage interface
type AggSenderSQLStorage struct {
logger *log.Logger
db *sql.DB
cfg AggSenderSQLStorageConfig
}

// NewAggSenderSQLStorage creates a new AggSenderSQLStorage
func NewAggSenderSQLStorage(logger *log.Logger, dbPath string) (*AggSenderSQLStorage, error) {
if err := migrations.RunMigrations(dbPath); err != nil {
func NewAggSenderSQLStorage(logger *log.Logger, cfg AggSenderSQLStorageConfig) (*AggSenderSQLStorage, error) {
db, err := db.NewSQLiteDB(cfg.DBPath)
if err != nil {
return nil, err
}

db, err := db.NewSQLiteDB(dbPath)
if err != nil {
if err := migrations.RunMigrations(logger, db); err != nil {
return nil, err
}

return &AggSenderSQLStorage{
db: db,
logger: logger,
cfg: cfg,
}, nil
}

Expand Down Expand Up @@ -93,7 +100,7 @@ func (a *AggSenderSQLStorage) GetCertificateByHeight(height uint64) (*types.Cert
}

// getCertificateByHeight returns a certificate by its height using the provided db
func getCertificateByHeight(db meddler.DB,
func getCertificateByHeight(db db.Querier,
height uint64) (*types.CertificateInfo, error) {
var certificateInfo types.CertificateInfo
if err := meddler.QueryRow(db, &certificateInfo,
Expand All @@ -119,7 +126,7 @@ func (a *AggSenderSQLStorage) GetLastSentCertificate() (*types.CertificateInfo,
func (a *AggSenderSQLStorage) SaveLastSentCertificate(ctx context.Context, certificate types.CertificateInfo) error {
tx, err := db.NewTx(ctx, a.db)
if err != nil {
return err
return fmt.Errorf("saveLastSentCertificate NewTx. Err: %w", err)
}
defer func() {
if err != nil {
Expand All @@ -131,14 +138,14 @@ func (a *AggSenderSQLStorage) SaveLastSentCertificate(ctx context.Context, certi

cert, err := getCertificateByHeight(tx, certificate.Height)
if err != nil && !errors.Is(err, db.ErrNotFound) {
return err
return fmt.Errorf("saveLastSentCertificate getCertificateByHeight. Err: %w", err)
}

if cert != nil {
// we already have a certificate with this height
// we need to delete it before inserting the new one
if err = deleteCertificate(tx, cert.CertificateID); err != nil {
return err
if err = a.moveCertificateToHistory(tx, cert); err != nil {
return fmt.Errorf("saveLastSentCertificate moveCertificateToHistory Err: %w", err)
}
}

Expand All @@ -147,14 +154,30 @@ func (a *AggSenderSQLStorage) SaveLastSentCertificate(ctx context.Context, certi
}

if err = tx.Commit(); err != nil {
return err
return fmt.Errorf("saveLastSentCertificate commit. Err: %w", err)
}

a.logger.Debugf("inserted certificate - Height: %d. Hash: %s", certificate.Height, certificate.CertificateID)

return nil
}

func (a *AggSenderSQLStorage) moveCertificateToHistory(tx db.Querier, certificate *types.CertificateInfo) error {
if a.cfg.KeepCertificatesHistory {
a.logger.Debugf("moving certificate to history - new CertificateID: %s", certificate.ID())
if _, err := tx.Exec(`INSERT INTO certificate_info_history SELECT * FROM certificate_info WHERE height = $1;`,
certificate.Height); err != nil {
return fmt.Errorf("error moving certificate to history: %w", err)
}
}
a.logger.Debugf("deleting certificate - CertificateID: %s", certificate.ID())
if err := deleteCertificate(tx, certificate.CertificateID); err != nil {
return fmt.Errorf("deleteCertificate %s . Error: %w", certificate.ID(), err)
}

return nil
}

// DeleteCertificate deletes a certificate from the storage
func (a *AggSenderSQLStorage) DeleteCertificate(ctx context.Context, certificateID common.Hash) error {
tx, err := db.NewTx(ctx, a.db)
Expand All @@ -169,7 +192,7 @@ func (a *AggSenderSQLStorage) DeleteCertificate(ctx context.Context, certificate
}
}()

if err = deleteCertificate(a.db, certificateID); err != nil {
if err = deleteCertificate(tx, certificateID); err != nil {
return err
}

Expand All @@ -183,8 +206,8 @@ func (a *AggSenderSQLStorage) DeleteCertificate(ctx context.Context, certificate
}

// deleteCertificate deletes a certificate from the storage using the provided db
func deleteCertificate(db meddler.DB, certificateID common.Hash) error {
if _, err := db.Exec(`DELETE FROM certificate_info WHERE certificate_id = $1;`, certificateID.String()); err != nil {
func deleteCertificate(tx db.Querier, certificateID common.Hash) error {
if _, err := tx.Exec(`DELETE FROM certificate_info WHERE certificate_id = $1;`, certificateID.String()); err != nil {
return fmt.Errorf("error deleting certificate info: %w", err)
}

Expand All @@ -205,8 +228,8 @@ func (a *AggSenderSQLStorage) UpdateCertificate(ctx context.Context, certificate
}
}()

if _, err = tx.Exec(`UPDATE certificate_info SET status = $1 WHERE certificate_id = $2;`,
certificate.Status, certificate.CertificateID.String()); err != nil {
if _, err = tx.Exec(`UPDATE certificate_info SET status = $1, updated_at=$2 WHERE certificate_id = $3;`,
certificate.Status, certificate.UpdatedAt, certificate.CertificateID.String()); err != nil {
return fmt.Errorf("error updating certificate info: %w", err)
}
if err = tx.Commit(); err != nil {
Expand Down
26 changes: 19 additions & 7 deletions aggsender/db/aggsender_db_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"time"

"github.com/0xPolygon/cdk/agglayer"
"github.com/0xPolygon/cdk/aggsender/db/migrations"
"github.com/0xPolygon/cdk/aggsender/types"
"github.com/0xPolygon/cdk/db"
"github.com/0xPolygon/cdk/log"
Expand All @@ -22,9 +21,12 @@ func Test_Storage(t *testing.T) {

path := path.Join(t.TempDir(), "file::memory:?cache=shared")
log.Debugf("sqlite path: %s", path)
require.NoError(t, migrations.RunMigrations(path))
cfg := AggSenderSQLStorageConfig{
DBPath: path,
KeepCertificatesHistory: true,
}

storage, err := NewAggSenderSQLStorage(log.WithFields("aggsender-db"), path)
storage, err := NewAggSenderSQLStorage(log.WithFields("aggsender-db"), cfg)
require.NoError(t, err)

updateTime := time.Now().UTC().UnixMilli()
Expand Down Expand Up @@ -201,6 +203,7 @@ func Test_Storage(t *testing.T) {
// Insert a certificate
certificate := types.CertificateInfo{
Height: 13,
RetryCount: 1234,
CertificateID: common.HexToHash("0xD"),
NewLocalExitRoot: common.HexToHash("0xE"),
FromBlock: 13,
Expand All @@ -213,12 +216,14 @@ func Test_Storage(t *testing.T) {

// Update the status of the certificate
certificate.Status = agglayer.Settled
certificate.UpdatedAt = updateTime + 1
require.NoError(t, storage.UpdateCertificate(ctx, certificate))

// Fetch the certificate and verify the status has been updated
certificateFromDB, err := storage.GetCertificateByHeight(certificate.Height)
require.NoError(t, err)
require.Equal(t, certificate.Status, certificateFromDB.Status)
require.Equal(t, certificate.Status, certificateFromDB.Status, "equal status")
require.Equal(t, certificate.UpdatedAt, certificateFromDB.UpdatedAt, "equal updated at")

require.NoError(t, storage.clean())
})
Expand All @@ -229,9 +234,12 @@ func Test_SaveLastSentCertificate(t *testing.T) {

path := path.Join(t.TempDir(), "file::memory:?cache=shared")
log.Debugf("sqlite path: %s", path)
require.NoError(t, migrations.RunMigrations(path))
cfg := AggSenderSQLStorageConfig{
DBPath: path,
KeepCertificatesHistory: true,
}

storage, err := NewAggSenderSQLStorage(log.WithFields("aggsender-db"), path)
storage, err := NewAggSenderSQLStorage(log.WithFields("aggsender-db"), cfg)
require.NoError(t, err)

updateTime := time.Now().UTC().UnixMilli()
Expand Down Expand Up @@ -372,7 +380,11 @@ func Test_SaveLastSentCertificate(t *testing.T) {
func Test_StoragePreviousLER(t *testing.T) {
ctx := context.TODO()
dbPath := path.Join(t.TempDir(), "Test_StoragePreviousLER.sqlite")
storage, err := NewAggSenderSQLStorage(log.WithFields("aggsender-db"), dbPath)
cfg := AggSenderSQLStorageConfig{
DBPath: dbPath,
KeepCertificatesHistory: true,
}
storage, err := NewAggSenderSQLStorage(log.WithFields("aggsender-db"), cfg)
require.NoError(t, err)
require.NotNil(t, storage)

Expand Down
27 changes: 23 additions & 4 deletions aggsender/db/migrations/0001.sql
Original file line number Diff line number Diff line change
@@ -1,16 +1,35 @@
-- +migrate Down
DROP TABLE IF EXISTS certificate_info;
DROP TABLE IF EXISTS certificate_info_history;
DROP TABLE IF EXISTS certificate_info_history;

-- +migrate Up
CREATE TABLE certificate_info (
height INTEGER NOT NULL,
certificate_id VARCHAR NOT NULL PRIMARY KEY,
retry_count INTEGER DEFAULT 0,
certificate_id VARCHAR NOT NULL,
status INTEGER NOT NULL,
previous_local_exit_root VARCHAR ,
previous_local_exit_root VARCHAR,
new_local_exit_root VARCHAR NOT NULL,
from_block INTEGER NOT NULL,
to_block INTEGER NOT NULL,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
signed_certificate TEXT
);
signed_certificate TEXT,
PRIMARY KEY (height)
);

CREATE TABLE certificate_info_history (
height INTEGER NOT NULL ,
retry_count INTEGER DEFAULT 0,
certificate_id VARCHAR NOT NULL,
status INTEGER NOT NULL,
previous_local_exit_root VARCHAR,
new_local_exit_root VARCHAR NOT NULL,
from_block INTEGER NOT NULL,
to_block INTEGER NOT NULL,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
signed_certificate TEXT,
PRIMARY KEY (height, retry_count)
);
6 changes: 4 additions & 2 deletions aggsender/db/migrations/migrations.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
package migrations

import (
"database/sql"
_ "embed"

"github.com/0xPolygon/cdk/db"
"github.com/0xPolygon/cdk/db/types"
"github.com/0xPolygon/cdk/log"
)

//go:embed 0001.sql
var mig001 string

func RunMigrations(dbPath string) error {
func RunMigrations(logger *log.Logger, database *sql.DB) error {
migrations := []types.Migration{
{
ID: "0001",
SQL: mig001,
},
}

return db.RunMigrations(dbPath, migrations)
return db.RunMigrationsDB(logger, database, migrations)
}
5 changes: 4 additions & 1 deletion aggsender/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type Logger interface {

type CertificateInfo struct {
Height uint64 `meddler:"height"`
RetryCount int `meddler:"retry_count"`
CertificateID common.Hash `meddler:"certificate_id,hash"`
// PreviousLocalExitRoot if it's nil means no reported
PreviousLocalExitRoot *common.Hash `meddler:"previous_local_exit_root,hash"`
Expand All @@ -79,6 +80,7 @@ func (c *CertificateInfo) String() string {
}
return fmt.Sprintf(
"Height: %d "+
"RetryCount: %d "+
"CertificateID: %s "+
"PreviousLocalExitRoot: %s "+
"NewLocalExitRoot: %s "+
Expand All @@ -88,6 +90,7 @@ func (c *CertificateInfo) String() string {
"CreatedAt: %s "+
"UpdatedAt: %s",
c.Height,
c.RetryCount,
c.CertificateID.String(),
previousLocalExitRoot,
c.NewLocalExitRoot.String(),
Expand All @@ -104,7 +107,7 @@ func (c *CertificateInfo) ID() string {
if c == nil {
return "nil"
}
return fmt.Sprintf("%d/%s", c.Height, c.CertificateID.String())
return fmt.Sprintf("%d/%s (retry %d)", c.Height, c.CertificateID.String(), c.RetryCount)
}

// IsClosed returns true if the certificate is closed (settled or inError)
Expand Down
1 change: 1 addition & 0 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,4 +339,5 @@ EpochNotificationPercentage = 50
SaveCertificatesToFilesPath = ""
MaxRetriesStoreCertificate = 3
DelayBeetweenRetries = "60s"
KeepCertificatesHistory = true
`
Loading

0 comments on commit 80d5f0a

Please sign in to comment.