-
Notifications
You must be signed in to change notification settings - Fork 30
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #57 from ethpandaops/slot-index
change database schema to avoid expensive joins on search views
- Loading branch information
Showing
33 changed files
with
1,937 additions
and
1,317 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
package db | ||
|
||
import ( | ||
"fmt" | ||
"strings" | ||
|
||
"github.com/ethpandaops/dora/dbtypes" | ||
"github.com/jmoiron/sqlx" | ||
) | ||
|
||
func InsertBlob(blob *dbtypes.Blob, tx *sqlx.Tx) error { | ||
_, err := tx.Exec(EngineQuery(map[dbtypes.DBEngineType]string{ | ||
dbtypes.DBEnginePgsql: ` | ||
INSERT INTO blobs ( | ||
commitment, proof, size, blob | ||
) VALUES ($1, $2, $3, $4) | ||
ON CONFLICT (commitment) DO UPDATE SET | ||
size = excluded.size, | ||
blob = excluded.blob`, | ||
dbtypes.DBEngineSqlite: ` | ||
INSERT OR REPLACE INTO blobs ( | ||
commitment, proof, size, blob | ||
) VALUES ($1, $2, $3, $4)`, | ||
}), | ||
blob.Commitment, blob.Proof, blob.Size, blob.Blob) | ||
if err != nil { | ||
return err | ||
} | ||
return nil | ||
} | ||
|
||
func InsertBlobAssignment(blobAssignment *dbtypes.BlobAssignment, tx *sqlx.Tx) error { | ||
_, err := tx.Exec(EngineQuery(map[dbtypes.DBEngineType]string{ | ||
dbtypes.DBEnginePgsql: ` | ||
INSERT INTO blob_assignments ( | ||
root, commitment, slot | ||
) VALUES ($1, $2, $3) | ||
ON CONFLICT (root, commitment) DO NOTHING`, | ||
dbtypes.DBEngineSqlite: ` | ||
INSERT OR REPLACE INTO blob_assignments ( | ||
root, commitment, slot | ||
) VALUES ($1, $2, $3)`, | ||
}), | ||
blobAssignment.Root, blobAssignment.Commitment, blobAssignment.Slot) | ||
if err != nil { | ||
return err | ||
} | ||
return nil | ||
} | ||
|
||
func GetBlob(commitment []byte, withData bool) *dbtypes.Blob { | ||
blob := dbtypes.Blob{} | ||
var sql strings.Builder | ||
fmt.Fprintf(&sql, `SELECT commitment, proof, size`) | ||
if withData { | ||
fmt.Fprintf(&sql, `, blob`) | ||
} | ||
fmt.Fprintf(&sql, ` FROM blobs WHERE commitment = $1`) | ||
err := ReaderDb.Get(&blob, sql.String(), commitment) | ||
if err != nil { | ||
return nil | ||
} | ||
return &blob | ||
} | ||
|
||
func GetLatestBlobAssignment(commitment []byte) *dbtypes.BlobAssignment { | ||
blobAssignment := dbtypes.BlobAssignment{} | ||
err := ReaderDb.Get(&blobAssignment, "SELECT root, commitment, slot FROM blob_assignments WHERE commitment = $1 ORDER BY slot DESC LIMIT 1", commitment) | ||
if err != nil { | ||
return nil | ||
} | ||
return &blobAssignment | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,200 @@ | ||
package db | ||
|
||
import ( | ||
"embed" | ||
"fmt" | ||
"time" | ||
|
||
"github.com/jmoiron/sqlx" | ||
_ "github.com/lib/pq" | ||
_ "github.com/mattn/go-sqlite3" | ||
"github.com/pressly/goose/v3" | ||
"github.com/sirupsen/logrus" | ||
|
||
"github.com/ethpandaops/dora/dbtypes" | ||
"github.com/ethpandaops/dora/types" | ||
"github.com/ethpandaops/dora/utils" | ||
|
||
"github.com/jackc/pgx/v4/pgxpool" | ||
_ "github.com/jackc/pgx/v4/stdlib" | ||
) | ||
|
||
//go:embed schema/pgsql/*.sql | ||
var EmbedPgsqlSchema embed.FS | ||
|
||
//go:embed schema/sqlite/*.sql | ||
var EmbedSqliteSchema embed.FS | ||
|
||
var DBPGX *pgxpool.Conn | ||
|
||
// DB is a pointer to the explorer-database | ||
var DbEngine dbtypes.DBEngineType | ||
var WriterDb *sqlx.DB | ||
var ReaderDb *sqlx.DB | ||
|
||
var logger = logrus.StandardLogger().WithField("module", "db") | ||
|
||
func checkDbConn(dbConn *sqlx.DB, dataBaseName string) { | ||
// The golang sql driver does not properly implement PingContext | ||
// therefore we use a timer to catch db connection timeouts | ||
dbConnectionTimeout := time.NewTimer(15 * time.Second) | ||
|
||
go func() { | ||
<-dbConnectionTimeout.C | ||
logger.Fatalf("timeout while connecting to %s", dataBaseName) | ||
}() | ||
|
||
err := dbConn.Ping() | ||
if err != nil { | ||
logger.Fatalf("unable to Ping %s: %s", dataBaseName, err) | ||
} | ||
|
||
dbConnectionTimeout.Stop() | ||
} | ||
|
||
func mustInitSqlite(config *types.SqliteDatabaseConfig) (*sqlx.DB, *sqlx.DB) { | ||
if config.MaxOpenConns == 0 { | ||
config.MaxOpenConns = 50 | ||
} | ||
if config.MaxIdleConns == 0 { | ||
config.MaxIdleConns = 10 | ||
} | ||
if config.MaxOpenConns < config.MaxIdleConns { | ||
config.MaxIdleConns = config.MaxOpenConns | ||
} | ||
|
||
logger.Infof("initializing sqlite connection to %v with %v/%v conn limit", config.File, config.MaxIdleConns, config.MaxOpenConns) | ||
dbConn, err := sqlx.Open("sqlite3", fmt.Sprintf("%s?cache=shared", config.File)) | ||
if err != nil { | ||
utils.LogFatal(err, "error opening sqlite database", 0) | ||
} | ||
|
||
checkDbConn(dbConn, "database") | ||
dbConn.SetConnMaxIdleTime(0) | ||
dbConn.SetConnMaxLifetime(0) | ||
dbConn.SetMaxOpenConns(config.MaxOpenConns) | ||
dbConn.SetMaxIdleConns(config.MaxIdleConns) | ||
|
||
dbConn.MustExec("PRAGMA journal_mode = WAL") | ||
|
||
return dbConn, dbConn | ||
} | ||
|
||
func mustInitPgsql(writer *types.PgsqlDatabaseConfig, reader *types.PgsqlDatabaseConfig) (*sqlx.DB, *sqlx.DB) { | ||
if writer.MaxOpenConns == 0 { | ||
writer.MaxOpenConns = 50 | ||
} | ||
if writer.MaxIdleConns == 0 { | ||
writer.MaxIdleConns = 10 | ||
} | ||
if writer.MaxOpenConns < writer.MaxIdleConns { | ||
writer.MaxIdleConns = writer.MaxOpenConns | ||
} | ||
|
||
if reader.MaxOpenConns == 0 { | ||
reader.MaxOpenConns = 50 | ||
} | ||
if reader.MaxIdleConns == 0 { | ||
reader.MaxIdleConns = 10 | ||
} | ||
if reader.MaxOpenConns < reader.MaxIdleConns { | ||
reader.MaxIdleConns = reader.MaxOpenConns | ||
} | ||
|
||
logger.Infof("initializing pgsql writer connection to %v with %v/%v conn limit", writer.Host, writer.MaxIdleConns, writer.MaxOpenConns) | ||
dbConnWriter, err := sqlx.Open("pgx", fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=disable", writer.Username, writer.Password, writer.Host, writer.Port, writer.Name)) | ||
if err != nil { | ||
utils.LogFatal(err, "error getting pgsql writer database", 0) | ||
} | ||
|
||
checkDbConn(dbConnWriter, "database") | ||
dbConnWriter.SetConnMaxIdleTime(time.Second * 30) | ||
dbConnWriter.SetConnMaxLifetime(time.Second * 60) | ||
dbConnWriter.SetMaxOpenConns(writer.MaxOpenConns) | ||
dbConnWriter.SetMaxIdleConns(writer.MaxIdleConns) | ||
|
||
logger.Infof("initializing pgsql reader connection to %v with %v/%v conn limit", writer.Host, reader.MaxIdleConns, reader.MaxOpenConns) | ||
dbConnReader, err := sqlx.Open("pgx", fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=disable", reader.Username, reader.Password, reader.Host, reader.Port, reader.Name)) | ||
if err != nil { | ||
utils.LogFatal(err, "error getting pgsql reader database", 0) | ||
} | ||
|
||
checkDbConn(dbConnReader, "read replica database") | ||
dbConnReader.SetConnMaxIdleTime(time.Second * 30) | ||
dbConnReader.SetConnMaxLifetime(time.Second * 60) | ||
dbConnReader.SetMaxOpenConns(reader.MaxOpenConns) | ||
dbConnReader.SetMaxIdleConns(reader.MaxIdleConns) | ||
return dbConnWriter, dbConnReader | ||
} | ||
|
||
func MustInitDB() { | ||
if utils.Config.Database.Engine == "sqlite" { | ||
sqliteConfig := (*types.SqliteDatabaseConfig)(&utils.Config.Database.Sqlite) | ||
DbEngine = dbtypes.DBEngineSqlite | ||
WriterDb, ReaderDb = mustInitSqlite(sqliteConfig) | ||
} else if utils.Config.Database.Engine == "pgsql" { | ||
readerConfig := (*types.PgsqlDatabaseConfig)(&utils.Config.Database.Pgsql) | ||
writerConfig := (*types.PgsqlDatabaseConfig)(&utils.Config.Database.PgsqlWriter) | ||
if writerConfig.Host == "" { | ||
writerConfig = readerConfig | ||
} | ||
DbEngine = dbtypes.DBEnginePgsql | ||
WriterDb, ReaderDb = mustInitPgsql(writerConfig, readerConfig) | ||
} else { | ||
logger.Fatalf("unknown database engine type: %s", utils.Config.Database.Engine) | ||
} | ||
} | ||
|
||
func MustCloseDB() { | ||
err := WriterDb.Close() | ||
if err != nil { | ||
logger.Errorf("Error closing writer db connection: %v", err) | ||
} | ||
err = ReaderDb.Close() | ||
if err != nil { | ||
logger.Errorf("Error closing reader db connection: %v", err) | ||
} | ||
} | ||
|
||
func ApplyEmbeddedDbSchema(version int64) error { | ||
var engineDialect string | ||
var schemaDirectory string | ||
switch DbEngine { | ||
case dbtypes.DBEnginePgsql: | ||
goose.SetBaseFS(EmbedPgsqlSchema) | ||
engineDialect = "postgres" | ||
schemaDirectory = "schema/pgsql" | ||
case dbtypes.DBEngineSqlite: | ||
goose.SetBaseFS(EmbedSqliteSchema) | ||
engineDialect = "sqlite3" | ||
schemaDirectory = "schema/sqlite" | ||
default: | ||
logger.Fatalf("unknown database engine") | ||
} | ||
if err := goose.SetDialect(engineDialect); err != nil { | ||
return err | ||
} | ||
|
||
if version == -2 { | ||
if err := goose.Up(WriterDb.DB, schemaDirectory); err != nil { | ||
return err | ||
} | ||
} else if version == -1 { | ||
if err := goose.UpByOne(WriterDb.DB, schemaDirectory); err != nil { | ||
return err | ||
} | ||
} else { | ||
if err := goose.UpTo(WriterDb.DB, schemaDirectory, version); err != nil { | ||
return err | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func EngineQuery(queryMap map[dbtypes.DBEngineType]string) string { | ||
if queryMap[DbEngine] != "" { | ||
return queryMap[DbEngine] | ||
} | ||
return queryMap[dbtypes.DBEngineAny] | ||
} |
Oops, something went wrong.