Skip to content

Commit

Permalink
address PR comments and track last delete failure from sqlDB to preve…
Browse files Browse the repository at this point in the history
…nt duplicate uploads
  • Loading branch information
ganeshvanahalli committed Nov 20, 2024
1 parent 15ba495 commit b865417
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 39 deletions.
4 changes: 3 additions & 1 deletion timeboost/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,9 @@ func (d *SqliteDatabase) GetBids(maxDbRows int) ([]*SqliteDatabaseBid, uint64, e
return sqlDBbids[:i], sqlDBbids[i].Round, nil
}
}
return sqlDBbids, 0, nil
// If we can't determine a contiguous set of bids, we abort and retry again.
// Saves us from cases where we sometime push same batch data twice
return nil, 0, nil
}

func (d *SqliteDatabase) DeleteBids(round uint64) error {
Expand Down
94 changes: 58 additions & 36 deletions timeboost/s3_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/ethereum/go-ethereum/log"
"github.com/offchainlabs/nitro/util"
"github.com/offchainlabs/nitro/util/gzip"
"github.com/offchainlabs/nitro/util/s3client"
"github.com/offchainlabs/nitro/util/stopwaiter"
"github.com/spf13/pflag"
Expand Down Expand Up @@ -45,9 +45,9 @@ func (c *S3StorageServiceConfig) Validate() error {

var DefaultS3StorageServiceConfig = S3StorageServiceConfig{
Enable: false,
UploadInterval: time.Minute, // is this the right default value?
MaxBatchSize: 100000000, // is this the right default value?
MaxDbRows: 0, // Disabled by default
UploadInterval: 15 * time.Minute,
MaxBatchSize: 100000000,
MaxDbRows: 0, // Disabled by default
}

func S3StorageServiceConfigAddOptions(prefix string, f *pflag.FlagSet) {
Expand All @@ -64,11 +64,12 @@ func S3StorageServiceConfigAddOptions(prefix string, f *pflag.FlagSet) {

type S3StorageService struct {
stopwaiter.StopWaiter
config *S3StorageServiceConfig
client s3client.FullClient
sqlDB *SqliteDatabase
bucket string
objectPrefix string
config *S3StorageServiceConfig
client s3client.FullClient
sqlDB *SqliteDatabase
bucket string
objectPrefix string
lastFailedDeleteRound uint64
}

func NewS3StorageService(config *S3StorageServiceConfig, sqlDB *SqliteDatabase) (*S3StorageService, error) {
Expand All @@ -91,7 +92,7 @@ func (s *S3StorageService) Start(ctx context.Context) {
}

func (s *S3StorageService) uploadBatch(ctx context.Context, batch []byte, fistRound uint64) error {
compressedData, err := util.CompressGzip(batch)
compressedData, err := gzip.CompressGzip(batch)
if err != nil {
return err
}
Expand All @@ -117,7 +118,7 @@ func (s *S3StorageService) downloadBatch(ctx context.Context, key string) ([]byt
}); err != nil {
return nil, err
}
return util.DecompressGzip(buf.Bytes())
return gzip.DecompressGzip(buf.Bytes())
}

func csvRecordSize(record []string) int {
Expand All @@ -129,68 +130,89 @@ func csvRecordSize(record []string) int {
}

func (s *S3StorageService) uploadBatches(ctx context.Context) time.Duration {
// Before doing anything first try to delete the previously uploaded bids that were not successfully erased from the sqlDB
if s.lastFailedDeleteRound != 0 {
if err := s.sqlDB.DeleteBids(s.lastFailedDeleteRound); err != nil {
log.Error("error deleting s3-persisted bids from sql db using lastFailedDeleteRound", "lastFailedDeleteRound", s.lastFailedDeleteRound, "err", err)
return 5 * time.Second
}
s.lastFailedDeleteRound = 0
}

bids, round, err := s.sqlDB.GetBids(s.config.MaxDbRows)
if err != nil {
log.Error("Error fetching validated bids from sql DB", "round", round, "err", err)
return 0
return 5 * time.Second
}
// Nothing to persist, exit early
// Nothing to persist or a contiguous set of bids wasn't found, so exit early
if len(bids) == 0 {
return s.config.UploadInterval
}

var csvBuffer bytes.Buffer
var size int
var firstBidId int
csvWriter := csv.NewWriter(&csvBuffer)
uploadAndDeleteBids := func(firstRound, deletRound uint64) error {
// End current batch when size exceeds MaxBatchSize and the current round ends
csvWriter.Flush()
if err := csvWriter.Error(); err != nil {
log.Error("Error flushing csv writer", "err", err)
return err
}
if err := s.uploadBatch(ctx, csvBuffer.Bytes(), firstRound); err != nil {
log.Error("Error uploading batch to s3", "firstRound", firstRound, "err", err)
return err
}
// After successful upload we should go ahead and delete the uploaded bids from DB to prevent duplicate uploads
// If the delete fails, we track the deleteRound until a future delete succeeds.
if err := s.sqlDB.DeleteBids(deletRound); err != nil {
log.Error("error deleting s3-persisted bids from sql db", "round", deletRound, "err", err)
s.lastFailedDeleteRound = deletRound
} else {
// Previously failed deletes dont matter anymore as the recent one (larger round number) succeeded
s.lastFailedDeleteRound = 0
}
return nil
}

header := []string{"ChainID", "Bidder", "ExpressLaneController", "AuctionContractAddress", "Round", "Amount", "Signature"}
if err := csvWriter.Write(header); err != nil {
log.Error("Error writing to csv writer", "err", err)
return 0
return 5 * time.Second
}
for index, bid := range bids {
record := []string{bid.ChainId, bid.Bidder, bid.ExpressLaneController, bid.AuctionContractAddress, fmt.Sprintf("%d", bid.Round), bid.Amount, bid.Signature}
if err := csvWriter.Write(record); err != nil {
log.Error("Error writing to csv writer", "err", err)
return 0
return 5 * time.Second
}
if s.config.MaxBatchSize != 0 {
size += csvRecordSize(record)
if size >= s.config.MaxBatchSize && index < len(bids)-1 && bid.Round != bids[index+1].Round {
// End current batch when size exceeds MaxBatchSize and the current round ends
csvWriter.Flush()
if err := csvWriter.Error(); err != nil {
log.Error("Error flushing csv writer", "err", err)
return 0
}
if err := s.uploadBatch(ctx, csvBuffer.Bytes(), bids[firstBidId].Round); err != nil {
log.Error("Error uploading batch to s3", "firstRound", bids[firstBidId].Round, "err", err)
return 0
if uploadAndDeleteBids(bids[firstBidId].Round, bids[index+1].Round) != nil {
return 5 * time.Second
}
// Reset csv for next batch
csvBuffer.Reset()
if err := csvWriter.Write(header); err != nil {
log.Error("Error writing to csv writer", "err", err)
return 0
return 5 * time.Second
}
size = 0
firstBidId = index + 1
}
}
}
if s.config.MaxBatchSize == 0 || size > 0 {
csvWriter.Flush()
if err := csvWriter.Error(); err != nil {
log.Error("Error flushing csv writer", "err", err)
return 0
}
if err := s.uploadBatch(ctx, csvBuffer.Bytes(), bids[firstBidId].Round); err != nil {
log.Error("Error uploading batch to s3", "firstRound", bids[firstBidId].Round, "err", err)
return 0
if uploadAndDeleteBids(bids[firstBidId].Round, round) != nil {
return 5 * time.Second
}
}
if err := s.sqlDB.DeleteBids(round); err != nil {
log.Error("error deleting s3-persisted bids from sql db", "round", round, "err", err)
return 0

if s.lastFailedDeleteRound != 0 {
return 5 * time.Second
}

return s.config.UploadInterval
}
1 change: 1 addition & 0 deletions timeboost/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ CREATE TABLE IF NOT EXISTS Bids (
Amount TEXT NOT NULL,
Signature TEXT NOT NULL
);
CREATE INDEX idx_bids_round ON Bids(Round);
`
schemaList = []string{version1}
)
2 changes: 1 addition & 1 deletion util/gzip_compression.go → util/gzip/gzip_compression.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package util
package gzip

import (
"bytes"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package util
package gzip

import (
"bytes"
Expand Down

0 comments on commit b865417

Please sign in to comment.