Skip to content

Commit

Permalink
feat: cherry-pick PR #208 from release/v0.5.0 (#215)
Browse files Browse the repository at this point in the history
  • Loading branch information
joanestebanr authored Dec 3, 2024
1 parent bfc409b commit e254ec9
Show file tree
Hide file tree
Showing 17 changed files with 148 additions and 42 deletions.
14 changes: 12 additions & 2 deletions .github/workflows/test-e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,21 @@ jobs:
- uses: actions/checkout@v4

- name: Checkout kurtosis-cdk repository
uses: actions/checkout@v4
with:
go-version: ${{ matrix.go-version }}
env:
GOARCH: ${{ matrix.goarch }}

- name: Build Docker
run: make build-docker

- name: Checkout kurtosis-cdk
uses: actions/checkout@v4
with:
repository: 0xPolygon/kurtosis-cdk
path: kurtosis-cdk
ref: v0.2.22
ref: v0.2.24

- name: Install Kurtosis CDK tools
uses: ./kurtosis-cdk/.github/actions/setup-kurtosis-cdk
Expand All @@ -64,7 +74,7 @@ jobs:
sudo chmod +x /usr/local/bin/polycli
/usr/local/bin/polycli version
- name: Setup bats
- name: Setup Bats and bats libs
uses: bats-core/[email protected]

- name: Download cdk archive
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test-resequence.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
with:
repository: 0xPolygon/kurtosis-cdk
path: kurtosis-cdk
ref: v0.2.22
ref: v0.2.24

- name: Install Kurtosis CDK tools
uses: ./kurtosis-cdk/.github/actions/setup-kurtosis-cdk
Expand Down
9 changes: 8 additions & 1 deletion aggsender/aggsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@ func New(
l1InfoTreeSyncer *l1infotreesync.L1InfoTreeSync,
l2Syncer types.L2BridgeSyncer,
epochNotifier types.EpochNotifier) (*AggSender, error) {
storage, err := db.NewAggSenderSQLStorage(logger, cfg.StoragePath)
storageConfig := db.AggSenderSQLStorageConfig{
DBPath: cfg.StoragePath,
KeepCertificatesHistory: cfg.KeepCertificatesHistory,
}
storage, err := db.NewAggSenderSQLStorage(logger, storageConfig)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -152,12 +156,14 @@ func (a *AggSender) sendCertificate(ctx context.Context) (*agglayer.SignedCertif
return nil, err
}
previousToBlock := uint64(0)
retryCount := 0
if lastSentCertificateInfo != nil {
previousToBlock = lastSentCertificateInfo.ToBlock
if lastSentCertificateInfo.Status == agglayer.InError {
// if the last certificate was in error, we need to resend it
// from the block before the error
previousToBlock = lastSentCertificateInfo.FromBlock - 1
retryCount = lastSentCertificateInfo.RetryCount + 1
}
}

Expand Down Expand Up @@ -215,6 +221,7 @@ func (a *AggSender) sendCertificate(ctx context.Context) (*agglayer.SignedCertif
prevLER := common.BytesToHash(certificate.PrevLocalExitRoot[:])
certInfo := types.CertificateInfo{
Height: certificate.Height,
RetryCount: retryCount,
CertificateID: certificateHash,
NewLocalExitRoot: certificate.NewLocalExitRoot,
PreviousLocalExitRoot: &prevLER,
Expand Down
6 changes: 5 additions & 1 deletion aggsender/aggsender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1971,7 +1971,11 @@ func newAggsenderTestData(t *testing.T, creationFlags testDataFlags) *aggsenderT
storage = storageMock
} else {
dbPath := path.Join(t.TempDir(), "newAggsenderTestData.sqlite")
storage, err = db.NewAggSenderSQLStorage(logger, dbPath)
storageConfig := db.AggSenderSQLStorageConfig{
DBPath: dbPath,
KeepCertificatesHistory: true,
}
storage, err = db.NewAggSenderSQLStorage(logger, storageConfig)
require.NoError(t, err)
}

Expand Down
2 changes: 2 additions & 0 deletions aggsender/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type Config struct {
// DelayBeetweenRetries is the delay between retries:
// is used on store Certificate and also in initial check
DelayBeetweenRetries types.Duration `mapstructure:"DelayBeetweenRetries"`
// KeepCertificatesHistory is a flag to keep the certificates history on storage
KeepCertificatesHistory bool `mapstructure:"KeepCertificatesHistory"`
}

// String returns a string representation of the Config
Expand Down
73 changes: 57 additions & 16 deletions aggsender/db/aggsender_db_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,26 +36,33 @@ type AggSenderStorage interface {

var _ AggSenderStorage = (*AggSenderSQLStorage)(nil)

// AggSenderSQLStorageConfig is the configuration for the AggSenderSQLStorage
type AggSenderSQLStorageConfig struct {
DBPath string
KeepCertificatesHistory bool
}

// AggSenderSQLStorage is the struct that implements the AggSenderStorage interface
type AggSenderSQLStorage struct {
logger *log.Logger
db *sql.DB
cfg AggSenderSQLStorageConfig
}

// NewAggSenderSQLStorage creates a new AggSenderSQLStorage
func NewAggSenderSQLStorage(logger *log.Logger, dbPath string) (*AggSenderSQLStorage, error) {
if err := migrations.RunMigrations(dbPath); err != nil {
func NewAggSenderSQLStorage(logger *log.Logger, cfg AggSenderSQLStorageConfig) (*AggSenderSQLStorage, error) {
db, err := db.NewSQLiteDB(cfg.DBPath)
if err != nil {
return nil, err
}

db, err := db.NewSQLiteDB(dbPath)
if err != nil {
if err := migrations.RunMigrations(logger, db); err != nil {
return nil, err
}

return &AggSenderSQLStorage{
db: db,
logger: logger,
cfg: cfg,
}, nil
}

Expand Down Expand Up @@ -93,7 +100,8 @@ func (a *AggSenderSQLStorage) GetCertificateByHeight(height uint64) (*types.Cert
}

// getCertificateByHeight returns a certificate by its height using the provided db
func getCertificateByHeight(db meddler.DB, height uint64) (*types.CertificateInfo, error) {
func getCertificateByHeight(db db.Querier,
height uint64) (*types.CertificateInfo, error) {
var certificateInfo types.CertificateInfo
if err := meddler.QueryRow(db, &certificateInfo,
"SELECT * FROM certificate_info WHERE height = $1;", height); err != nil {
Expand All @@ -118,7 +126,7 @@ func (a *AggSenderSQLStorage) GetLastSentCertificate() (*types.CertificateInfo,
func (a *AggSenderSQLStorage) SaveLastSentCertificate(ctx context.Context, certificate types.CertificateInfo) error {
tx, err := db.NewTx(ctx, a.db)
if err != nil {
return err
return fmt.Errorf("saveLastSentCertificate NewTx. Err: %w", err)
}
shouldRollback := true
defer func() {
Expand All @@ -131,14 +139,14 @@ func (a *AggSenderSQLStorage) SaveLastSentCertificate(ctx context.Context, certi

cert, err := getCertificateByHeight(tx, certificate.Height)
if err != nil && !errors.Is(err, db.ErrNotFound) {
return err
return fmt.Errorf("saveLastSentCertificate getCertificateByHeight. Err: %w", err)
}

if cert != nil {
// we already have a certificate with this height
// we need to delete it before inserting the new one
if err = deleteCertificate(tx, cert.CertificateID); err != nil {
return err
if err = a.moveCertificateToHistoryOrDelete(tx, cert); err != nil {
return fmt.Errorf("saveLastSentCertificate moveCertificateToHistory Err: %w", err)
}
}

Expand All @@ -147,7 +155,7 @@ func (a *AggSenderSQLStorage) SaveLastSentCertificate(ctx context.Context, certi
}

if err = tx.Commit(); err != nil {
return err
return fmt.Errorf("saveLastSentCertificate commit. Err: %w", err)
}
shouldRollback = false

Expand All @@ -156,18 +164,51 @@ func (a *AggSenderSQLStorage) SaveLastSentCertificate(ctx context.Context, certi
return nil
}

func (a *AggSenderSQLStorage) moveCertificateToHistoryOrDelete(tx db.Querier,
certificate *types.CertificateInfo) error {
if a.cfg.KeepCertificatesHistory {
a.logger.Debugf("moving certificate to history - new CertificateID: %s", certificate.ID())
if _, err := tx.Exec(`INSERT INTO certificate_info_history SELECT * FROM certificate_info WHERE height = $1;`,
certificate.Height); err != nil {
return fmt.Errorf("error moving certificate to history: %w", err)
}
}
a.logger.Debugf("deleting certificate - CertificateID: %s", certificate.ID())
if err := deleteCertificate(tx, certificate.CertificateID); err != nil {
return fmt.Errorf("deleteCertificate %s . Error: %w", certificate.ID(), err)
}

return nil
}

// DeleteCertificate deletes a certificate from the storage
func (a *AggSenderSQLStorage) DeleteCertificate(ctx context.Context, certificateID common.Hash) error {
if err := deleteCertificate(a.db, certificateID); err != nil {
tx, err := db.NewTx(ctx, a.db)
if err != nil {
return err
}
defer func() {
if err != nil {
if errRllbck := tx.Rollback(); errRllbck != nil {
a.logger.Errorf(errWhileRollbackFormat, errRllbck)
}
}
}()

if err = deleteCertificate(tx, certificateID); err != nil {
return err
}

if err = tx.Commit(); err != nil {
return err
}
a.logger.Debugf("deleted certificate - CertificateID: %s", certificateID)
return nil
}

// deleteCertificate deletes a certificate from the storage using the provided db
func deleteCertificate(db meddler.DB, certificateID common.Hash) error {
if _, err := db.Exec(`DELETE FROM certificate_info WHERE certificate_id = $1;`, certificateID.String()); err != nil {
func deleteCertificate(tx db.Querier, certificateID common.Hash) error {
if _, err := tx.Exec(`DELETE FROM certificate_info WHERE certificate_id = $1;`, certificateID.String()); err != nil {
return fmt.Errorf("error deleting certificate info: %w", err)
}

Expand All @@ -189,8 +230,8 @@ func (a *AggSenderSQLStorage) UpdateCertificate(ctx context.Context, certificate
}
}()

if _, err = tx.Exec(`UPDATE certificate_info SET status = $1 WHERE certificate_id = $2;`,
certificate.Status, certificate.CertificateID.String()); err != nil {
if _, err = tx.Exec(`UPDATE certificate_info SET status = $1, updated_at = $2 WHERE certificate_id = $3;`,
certificate.Status, certificate.UpdatedAt, certificate.CertificateID.String()); err != nil {
return fmt.Errorf("error updating certificate info: %w", err)
}
if err = tx.Commit(); err != nil {
Expand Down
26 changes: 19 additions & 7 deletions aggsender/db/aggsender_db_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"time"

"github.com/0xPolygon/cdk/agglayer"
"github.com/0xPolygon/cdk/aggsender/db/migrations"
"github.com/0xPolygon/cdk/aggsender/types"
"github.com/0xPolygon/cdk/db"
"github.com/0xPolygon/cdk/log"
Expand All @@ -22,9 +21,12 @@ func Test_Storage(t *testing.T) {

path := path.Join(t.TempDir(), "aggsenderTest_Storage.sqlite")
log.Debugf("sqlite path: %s", path)
require.NoError(t, migrations.RunMigrations(path))
cfg := AggSenderSQLStorageConfig{
DBPath: path,
KeepCertificatesHistory: true,
}

storage, err := NewAggSenderSQLStorage(log.WithFields("aggsender-db"), path)
storage, err := NewAggSenderSQLStorage(log.WithFields("aggsender-db"), cfg)
require.NoError(t, err)

updateTime := time.Now().UTC().UnixMilli()
Expand Down Expand Up @@ -201,6 +203,7 @@ func Test_Storage(t *testing.T) {
// Insert a certificate
certificate := types.CertificateInfo{
Height: 13,
RetryCount: 1234,
CertificateID: common.HexToHash("0xD"),
NewLocalExitRoot: common.HexToHash("0xE"),
FromBlock: 13,
Expand All @@ -213,12 +216,14 @@ func Test_Storage(t *testing.T) {

// Update the status of the certificate
certificate.Status = agglayer.Settled
certificate.UpdatedAt = updateTime + 1
require.NoError(t, storage.UpdateCertificate(ctx, certificate))

// Fetch the certificate and verify the status has been updated
certificateFromDB, err := storage.GetCertificateByHeight(certificate.Height)
require.NoError(t, err)
require.Equal(t, certificate.Status, certificateFromDB.Status)
require.Equal(t, certificate.Status, certificateFromDB.Status, "equal status")
require.Equal(t, certificate.UpdatedAt, certificateFromDB.UpdatedAt, "equal updated at")

require.NoError(t, storage.clean())
})
Expand All @@ -229,9 +234,12 @@ func Test_SaveLastSentCertificate(t *testing.T) {

path := path.Join(t.TempDir(), "aggsenderTest_SaveLastSentCertificate.sqlite")
log.Debugf("sqlite path: %s", path)
require.NoError(t, migrations.RunMigrations(path))
cfg := AggSenderSQLStorageConfig{
DBPath: path,
KeepCertificatesHistory: true,
}

storage, err := NewAggSenderSQLStorage(log.WithFields("aggsender-db"), path)
storage, err := NewAggSenderSQLStorage(log.WithFields("aggsender-db"), cfg)
require.NoError(t, err)

updateTime := time.Now().UTC().UnixMilli()
Expand Down Expand Up @@ -380,7 +388,11 @@ func (a *AggSenderSQLStorage) clean() error {
func Test_StoragePreviousLER(t *testing.T) {
ctx := context.TODO()
dbPath := path.Join(t.TempDir(), "Test_StoragePreviousLER.sqlite")
storage, err := NewAggSenderSQLStorage(log.WithFields("aggsender-db"), dbPath)
cfg := AggSenderSQLStorageConfig{
DBPath: dbPath,
KeepCertificatesHistory: true,
}
storage, err := NewAggSenderSQLStorage(log.WithFields("aggsender-db"), cfg)
require.NoError(t, err)
require.NotNil(t, storage)

Expand Down
25 changes: 22 additions & 3 deletions aggsender/db/migrations/0001.sql
Original file line number Diff line number Diff line change
@@ -1,16 +1,35 @@
-- +migrate Down
DROP TABLE IF EXISTS certificate_info;
DROP TABLE IF EXISTS certificate_info_history;
DROP TABLE IF EXISTS certificate_info_history;

-- +migrate Up
CREATE TABLE certificate_info (
height INTEGER NOT NULL,
certificate_id VARCHAR NOT NULL PRIMARY KEY,
retry_count INTEGER DEFAULT 0,
certificate_id VARCHAR NOT NULL,
status INTEGER NOT NULL,
previous_local_exit_root VARCHAR,
new_local_exit_root VARCHAR NOT NULL,
from_block INTEGER NOT NULL,
to_block INTEGER NOT NULL,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
signed_certificate TEXT
);
signed_certificate TEXT,
PRIMARY KEY (height)
);

CREATE TABLE certificate_info_history (
height INTEGER NOT NULL ,
retry_count INTEGER DEFAULT 0,
certificate_id VARCHAR NOT NULL,
status INTEGER NOT NULL,
previous_local_exit_root VARCHAR,
new_local_exit_root VARCHAR NOT NULL,
from_block INTEGER NOT NULL,
to_block INTEGER NOT NULL,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
signed_certificate TEXT,
PRIMARY KEY (height, retry_count)
);
6 changes: 4 additions & 2 deletions aggsender/db/migrations/migrations.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
package migrations

import (
"database/sql"
_ "embed"

"github.com/0xPolygon/cdk/db"
"github.com/0xPolygon/cdk/db/types"
"github.com/0xPolygon/cdk/log"
)

//go:embed 0001.sql
var mig001 string

func RunMigrations(dbPath string) error {
func RunMigrations(logger *log.Logger, database *sql.DB) error {
migrations := []types.Migration{
{
ID: "0001",
SQL: mig001,
},
}

return db.RunMigrations(dbPath, migrations)
return db.RunMigrationsDB(logger, database, migrations)
}
Loading

0 comments on commit e254ec9

Please sign in to comment.