Skip to content

Commit

Permalink
cleanup db package
Browse files Browse the repository at this point in the history
  • Loading branch information
pk910 committed Apr 27, 2024
1 parent 7183769 commit 0986da8
Show file tree
Hide file tree
Showing 15 changed files with 1,284 additions and 1,218 deletions.
73 changes: 73 additions & 0 deletions db/blobs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package db

import (
"fmt"
"strings"

"github.com/ethpandaops/dora/dbtypes"
"github.com/jmoiron/sqlx"
)

func InsertBlob(blob *dbtypes.Blob, tx *sqlx.Tx) error {
_, err := tx.Exec(EngineQuery(map[dbtypes.DBEngineType]string{
dbtypes.DBEnginePgsql: `
INSERT INTO blobs (
commitment, proof, size, blob
) VALUES ($1, $2, $3, $4)
ON CONFLICT (commitment) DO UPDATE SET
size = excluded.size,
blob = excluded.blob`,
dbtypes.DBEngineSqlite: `
INSERT OR REPLACE INTO blobs (
commitment, proof, size, blob
) VALUES ($1, $2, $3, $4)`,
}),
blob.Commitment, blob.Proof, blob.Size, blob.Blob)
if err != nil {
return err
}
return nil
}

func InsertBlobAssignment(blobAssignment *dbtypes.BlobAssignment, tx *sqlx.Tx) error {
_, err := tx.Exec(EngineQuery(map[dbtypes.DBEngineType]string{
dbtypes.DBEnginePgsql: `
INSERT INTO blob_assignments (
root, commitment, slot
) VALUES ($1, $2, $3)
ON CONFLICT (root, commitment) DO NOTHING`,
dbtypes.DBEngineSqlite: `
INSERT OR REPLACE INTO blob_assignments (
root, commitment, slot
) VALUES ($1, $2, $3)`,
}),
blobAssignment.Root, blobAssignment.Commitment, blobAssignment.Slot)
if err != nil {
return err
}
return nil
}

func GetBlob(commitment []byte, withData bool) *dbtypes.Blob {
blob := dbtypes.Blob{}
var sql strings.Builder
fmt.Fprintf(&sql, `SELECT commitment, proof, size`)
if withData {
fmt.Fprintf(&sql, `, blob`)
}
fmt.Fprintf(&sql, ` FROM blobs WHERE commitment = $1`)
err := ReaderDb.Get(&blob, sql.String(), commitment)
if err != nil {
return nil
}
return &blob
}

func GetLatestBlobAssignment(commitment []byte) *dbtypes.BlobAssignment {
blobAssignment := dbtypes.BlobAssignment{}
err := ReaderDb.Get(&blobAssignment, "SELECT root, commitment, slot FROM blob_assignments WHERE commitment = $1 ORDER BY slot DESC LIMIT 1", commitment)
if err != nil {
return nil
}
return &blobAssignment
}
200 changes: 200 additions & 0 deletions db/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
package db

import (
"embed"
"fmt"
"time"

"github.com/jmoiron/sqlx"
_ "github.com/lib/pq"
_ "github.com/mattn/go-sqlite3"
"github.com/pressly/goose/v3"
"github.com/sirupsen/logrus"

"github.com/ethpandaops/dora/dbtypes"
"github.com/ethpandaops/dora/types"
"github.com/ethpandaops/dora/utils"

"github.com/jackc/pgx/v4/pgxpool"
_ "github.com/jackc/pgx/v4/stdlib"
)

//go:embed schema/pgsql/*.sql
var EmbedPgsqlSchema embed.FS

//go:embed schema/sqlite/*.sql
var EmbedSqliteSchema embed.FS

var DBPGX *pgxpool.Conn

// DB is a pointer to the explorer-database
var DbEngine dbtypes.DBEngineType
var WriterDb *sqlx.DB
var ReaderDb *sqlx.DB

var logger = logrus.StandardLogger().WithField("module", "db")

func checkDbConn(dbConn *sqlx.DB, dataBaseName string) {
// The golang sql driver does not properly implement PingContext
// therefore we use a timer to catch db connection timeouts
dbConnectionTimeout := time.NewTimer(15 * time.Second)

go func() {
<-dbConnectionTimeout.C
logger.Fatalf("timeout while connecting to %s", dataBaseName)
}()

err := dbConn.Ping()
if err != nil {
logger.Fatalf("unable to Ping %s: %s", dataBaseName, err)
}

dbConnectionTimeout.Stop()
}

func mustInitSqlite(config *types.SqliteDatabaseConfig) (*sqlx.DB, *sqlx.DB) {
if config.MaxOpenConns == 0 {
config.MaxOpenConns = 50
}
if config.MaxIdleConns == 0 {
config.MaxIdleConns = 10
}
if config.MaxOpenConns < config.MaxIdleConns {
config.MaxIdleConns = config.MaxOpenConns
}

logger.Infof("initializing sqlite connection to %v with %v/%v conn limit", config.File, config.MaxIdleConns, config.MaxOpenConns)
dbConn, err := sqlx.Open("sqlite3", fmt.Sprintf("%s?cache=shared", config.File))
if err != nil {
utils.LogFatal(err, "error opening sqlite database", 0)
}

checkDbConn(dbConn, "database")
dbConn.SetConnMaxIdleTime(0)
dbConn.SetConnMaxLifetime(0)
dbConn.SetMaxOpenConns(config.MaxOpenConns)
dbConn.SetMaxIdleConns(config.MaxIdleConns)

dbConn.MustExec("PRAGMA journal_mode = WAL")

return dbConn, dbConn
}

func mustInitPgsql(writer *types.PgsqlDatabaseConfig, reader *types.PgsqlDatabaseConfig) (*sqlx.DB, *sqlx.DB) {
if writer.MaxOpenConns == 0 {
writer.MaxOpenConns = 50
}
if writer.MaxIdleConns == 0 {
writer.MaxIdleConns = 10
}
if writer.MaxOpenConns < writer.MaxIdleConns {
writer.MaxIdleConns = writer.MaxOpenConns
}

if reader.MaxOpenConns == 0 {
reader.MaxOpenConns = 50
}
if reader.MaxIdleConns == 0 {
reader.MaxIdleConns = 10
}
if reader.MaxOpenConns < reader.MaxIdleConns {
reader.MaxIdleConns = reader.MaxOpenConns
}

logger.Infof("initializing pgsql writer connection to %v with %v/%v conn limit", writer.Host, writer.MaxIdleConns, writer.MaxOpenConns)
dbConnWriter, err := sqlx.Open("pgx", fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=disable", writer.Username, writer.Password, writer.Host, writer.Port, writer.Name))
if err != nil {
utils.LogFatal(err, "error getting pgsql writer database", 0)
}

checkDbConn(dbConnWriter, "database")
dbConnWriter.SetConnMaxIdleTime(time.Second * 30)
dbConnWriter.SetConnMaxLifetime(time.Second * 60)
dbConnWriter.SetMaxOpenConns(writer.MaxOpenConns)
dbConnWriter.SetMaxIdleConns(writer.MaxIdleConns)

logger.Infof("initializing pgsql reader connection to %v with %v/%v conn limit", writer.Host, reader.MaxIdleConns, reader.MaxOpenConns)
dbConnReader, err := sqlx.Open("pgx", fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=disable", reader.Username, reader.Password, reader.Host, reader.Port, reader.Name))
if err != nil {
utils.LogFatal(err, "error getting pgsql reader database", 0)
}

checkDbConn(dbConnReader, "read replica database")
dbConnReader.SetConnMaxIdleTime(time.Second * 30)
dbConnReader.SetConnMaxLifetime(time.Second * 60)
dbConnReader.SetMaxOpenConns(reader.MaxOpenConns)
dbConnReader.SetMaxIdleConns(reader.MaxIdleConns)
return dbConnWriter, dbConnReader
}

func MustInitDB() {
if utils.Config.Database.Engine == "sqlite" {
sqliteConfig := (*types.SqliteDatabaseConfig)(&utils.Config.Database.Sqlite)
DbEngine = dbtypes.DBEngineSqlite
WriterDb, ReaderDb = mustInitSqlite(sqliteConfig)
} else if utils.Config.Database.Engine == "pgsql" {
readerConfig := (*types.PgsqlDatabaseConfig)(&utils.Config.Database.Pgsql)
writerConfig := (*types.PgsqlDatabaseConfig)(&utils.Config.Database.PgsqlWriter)
if writerConfig.Host == "" {
writerConfig = readerConfig
}
DbEngine = dbtypes.DBEnginePgsql
WriterDb, ReaderDb = mustInitPgsql(writerConfig, readerConfig)
} else {
logger.Fatalf("unknown database engine type: %s", utils.Config.Database.Engine)
}
}

func MustCloseDB() {
err := WriterDb.Close()
if err != nil {
logger.Errorf("Error closing writer db connection: %v", err)
}
err = ReaderDb.Close()
if err != nil {
logger.Errorf("Error closing reader db connection: %v", err)
}
}

func ApplyEmbeddedDbSchema(version int64) error {
var engineDialect string
var schemaDirectory string
switch DbEngine {
case dbtypes.DBEnginePgsql:
goose.SetBaseFS(EmbedPgsqlSchema)
engineDialect = "postgres"
schemaDirectory = "schema/pgsql"
case dbtypes.DBEngineSqlite:
goose.SetBaseFS(EmbedSqliteSchema)
engineDialect = "sqlite3"
schemaDirectory = "schema/sqlite"
default:
logger.Fatalf("unknown database engine")
}
if err := goose.SetDialect(engineDialect); err != nil {
return err
}

if version == -2 {
if err := goose.Up(WriterDb.DB, schemaDirectory); err != nil {
return err
}
} else if version == -1 {
if err := goose.UpByOne(WriterDb.DB, schemaDirectory); err != nil {
return err
}
} else {
if err := goose.UpTo(WriterDb.DB, schemaDirectory, version); err != nil {
return err
}
}

return nil
}

func EngineQuery(queryMap map[dbtypes.DBEngineType]string) string {
if queryMap[DbEngine] != "" {
return queryMap[DbEngine]
}
return queryMap[dbtypes.DBEngineAny]
}
Loading

0 comments on commit 0986da8

Please sign in to comment.