Skip to content

Commit

Permalink
logdb: cache query statements
Browse files Browse the repository at this point in the history
  • Loading branch information
qianbin committed Sep 24, 2020
1 parent 219d962 commit b299d73
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 22 deletions.
51 changes: 29 additions & 22 deletions logdb/logdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type LogDB struct {
path string
db *sql.DB
driverVersion string
stmtCache *stmtCache
}

// New create or open log db at given path.
Expand All @@ -48,19 +49,21 @@ func New(path string) (logDB *LogDB, err error) {

driverVer, _, _ := sqlite3.Version()
return &LogDB{
path,
db,
driverVer,
path: path,
db: db,
driverVersion: driverVer,
stmtCache: newStmtCache(db),
}, nil
}

// NewMem create a log db in ram.
func NewMem() (*LogDB, error) {
return New(":memory:")
return New("file::memory:")
}

// Close close the log db.
func (db *LogDB) Close() error {
db.stmtCache.Clear()
return db.db.Close()
}

Expand Down Expand Up @@ -186,8 +189,8 @@ FROM (%v) t
return db.queryTransfers(ctx, fmt.Sprintf(query, subQuery), args...)
}

func (db *LogDB) queryEvents(ctx context.Context, stmt string, args ...interface{}) ([]*Event, error) {
rows, err := db.db.QueryContext(ctx, stmt, args...)
func (db *LogDB) queryEvents(ctx context.Context, query string, args ...interface{}) ([]*Event, error) {
rows, err := db.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -253,8 +256,8 @@ func (db *LogDB) queryEvents(ctx context.Context, stmt string, args ...interface
return events, nil
}

func (db *LogDB) queryTransfers(ctx context.Context, stmt string, args ...interface{}) ([]*Transfer, error) {
rows, err := db.db.QueryContext(ctx, stmt, args...)
func (db *LogDB) queryTransfers(ctx context.Context, query string, args ...interface{}) ([]*Transfer, error) {
rows, err := db.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -313,19 +316,18 @@ func (db *LogDB) queryTransfers(ctx context.Context, stmt string, args ...interf
// NewestBlockID query newest written block id.
func (db *LogDB) NewestBlockID() (thor.Bytes32, error) {
// select from config if any
row := db.db.QueryRow("SELECT value FROM config WHERE key=?", configBlockIDKey)
row := db.stmtCache.MustPrepare("SELECT value FROM config WHERE key=?").QueryRow(configBlockIDKey)
var data []byte
if err := row.Scan(&data); err != nil {
if sql.ErrNoRows != err {
return thor.Bytes32{}, err
}

// no config, query newest block ID from existing records
row = db.db.QueryRow(
`SELECT MAX(data) FROM (
SELECT data FROM ref WHERE id=(SELECT blockId FROM transfer ORDER BY seq DESC LIMIT 1)
UNION
SELECT data FROM ref WHERE id=(SELECT blockId FROM event ORDER BY seq DESC LIMIT 1))`)
row := db.stmtCache.MustPrepare(`SELECT MAX(data) FROM (
SELECT data FROM ref WHERE id=(SELECT blockId FROM transfer ORDER BY seq DESC LIMIT 1)
UNION
SELECT data FROM ref WHERE id=(SELECT blockId FROM event ORDER BY seq DESC LIMIT 1))`).QueryRow()

if err := row.Scan(&data); err != nil {
if sql.ErrNoRows != err {
Expand All @@ -338,12 +340,13 @@ SELECT data FROM ref WHERE id=(SELECT blockId FROM event ORDER BY seq DESC LIMIT

// HasBlockID query whether given block id related logs were written.
func (db *LogDB) HasBlockID(id thor.Bytes32) (bool, error) {
const query = `SELECT COUNT(*) FROM (
SELECT * FROM (SELECT seq FROM transfer WHERE seq=? AND blockID=` + refIDQuery + ` LIMIT 1)
UNION
SELECT * FROM (SELECT seq FROM event WHERE seq=? AND blockID=` + refIDQuery + ` LIMIT 1))`

seq := newSequence(block.Number(id), 0)
row := db.db.QueryRow(fmt.Sprintf(`SELECT COUNT(*) FROM (
SELECT * FROM (SELECT seq FROM transfer WHERE seq=? AND blockID=%v LIMIT 1)
UNION
SELECT * FROM (SELECT seq FROM event WHERE seq=? AND blockID=%v LIMIT 1))`, refIDQuery, refIDQuery),
seq, id.Bytes(), seq, id.Bytes())
row := db.stmtCache.MustPrepare(query).QueryRow(seq, id.Bytes(), seq, id.Bytes())
var count int
if err := row.Scan(&count); err != nil {
// no need to check ErrNoRows
Expand All @@ -354,7 +357,7 @@ SELECT * FROM (SELECT seq FROM event WHERE seq=? AND blockID=%v LIMIT 1))`, refI

// Log write logs.
func (db *LogDB) Log(f func(*Writer) error) error {
w := &Writer{db: db.db}
w := &Writer{db: db.db, stmtCache: db.stmtCache}
if err := f(w); err != nil {
if w.tx != nil {
_ = w.tx.Rollback()
Expand All @@ -374,6 +377,7 @@ func topicValue(topics []thor.Bytes32, i int) []byte {
// Writer is the transactional log writer.
type Writer struct {
db *sql.DB
stmtCache *stmtCache
tx *sql.Tx
len int
lastBlockID thor.Bytes32
Expand All @@ -393,7 +397,10 @@ func (w *Writer) Write(b *block.Block, receipts tx.Receipts) error {

if num > 0 && w.len == 0 {
seq := newSequence(num, 0)
if err := w.exec("DELETE from event where seq >= ?;DELETE from transfer where seq >= ?", seq, seq); err != nil {
if err := w.exec("DELETE FROM event WHERE seq >= ?", seq); err != nil {
return err
}
if err := w.exec("DELETE FROM transfer WHERE seq >= ?", seq); err != nil {
return err
}
}
Expand Down Expand Up @@ -529,7 +536,7 @@ func (w *Writer) exec(query string, args ...interface{}) error {
w.tx = tx
}

if _, err := w.tx.Exec(query, args...); err != nil {
if _, err := w.tx.Stmt(w.stmtCache.MustPrepare(query)).Exec(args...); err != nil {
return err
}
w.len++
Expand Down
50 changes: 50 additions & 0 deletions logdb/stmt_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright (c) 2020 The VeChainThor developers

// Distributed under the GNU Lesser General Public License v3.0 software license, see the accompanying
// file LICENSE or <https://www.gnu.org/licenses/lgpl-3.0.html>

package logdb

import (
"database/sql"
"sync"
)

// to cache prepared sql statement, which maps query string to stmt.
type stmtCache struct {
db *sql.DB
m sync.Map
}

func newStmtCache(db *sql.DB) *stmtCache {
return &stmtCache{db: db}
}

func (sc *stmtCache) Prepare(query string) (*sql.Stmt, error) {
cached, _ := sc.m.Load(query)
if cached == nil {
stmt, err := sc.db.Prepare(query)
if err != nil {
return nil, err
}
sc.m.Store(query, stmt)
cached = stmt
}
return cached.(*sql.Stmt), nil
}

func (sc *stmtCache) MustPrepare(query string) *sql.Stmt {
stmt, err := sc.Prepare(query)
if err != nil {
panic(err)
}
return stmt
}

func (sc *stmtCache) Clear() {
sc.m.Range(func(k, v interface{}) bool {
_ = v.(*sql.Stmt).Close()
sc.m.Delete(k)
return true
})
}

0 comments on commit b299d73

Please sign in to comment.