Skip to content

Commit

Permalink
Merge pull request #377 from qianbin/optimize-logdb
Browse files Browse the repository at this point in the history
Optimize logdb
  • Loading branch information
libotony authored Sep 24, 2020
2 parents 7f9bff0 + b299d73 commit 9739d5a
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 28 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/cespare/xxhash v1.1.0 // indirect
github.com/coocood/freecache v1.1.1-0.20191203093230-cf06d5fa0ac1
github.com/davecgh/go-spew v1.1.0
github.com/deckarep/golang-set v0.0.0-20171013212420-1d4478f51bed // indirect
github.com/deckarep/golang-set v1.7.1 // indirect
github.com/elastic/gosigar v0.10.5
github.com/elazarl/go-bindata-assetfs v1.0.0
github.com/ethereum/go-ethereum v1.8.14
Expand All @@ -27,7 +27,7 @@ require (
github.com/mattn/go-colorable v0.0.9 // indirect
github.com/mattn/go-isatty v0.0.3
github.com/mattn/go-runewidth v0.0.4 // indirect
github.com/mattn/go-sqlite3 v1.10.0
github.com/mattn/go-sqlite3 v1.14.1
github.com/mattn/go-tty v0.0.0-20180219170247-931426f7535a
github.com/pborman/uuid v0.0.0-20170612153648-e790cca94e6c
github.com/pkg/errors v0.8.0
Expand Down
13 changes: 9 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/PuerkitoBio/goquery v1.5.1/go.mod h1:GsLWisAFVj4WgDibEWF4pvYnkVQBpKBKeU+7zCJoLcc=
github.com/andybalholm/cascadia v1.1.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y=
github.com/aristanetworks/goarista v0.0.0-20180222005525-c41ed3986faa h1:yCVE1EVBfyjHQn7TAfnD1Q4MMHGW/jdZjVJsXQeuRQw=
github.com/aristanetworks/goarista v0.0.0-20180222005525-c41ed3986faa/go.mod h1:D/tb0zPVXnP7fmsLZjtdUhSsumbK/ij54UXjjVgMGxQ=
github.com/beevik/ntp v0.2.0 h1:sGsd+kAXzT0bfVfzJfce04g+dSRfrs+tbQW8lweuYgw=
Expand All @@ -14,8 +16,8 @@ github.com/coocood/freecache v1.1.1-0.20191203093230-cf06d5fa0ac1 h1:qQNgRyCSCPX
github.com/coocood/freecache v1.1.1-0.20191203093230-cf06d5fa0ac1/go.mod h1:ePwxCDzOYvARfHdr1pByNct1at3CoKnsipOHwKlNbzI=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/deckarep/golang-set v0.0.0-20171013212420-1d4478f51bed h1:njG8LmGD6JCWJu4bwIKmkOHvch70UOEIqczl5vp7Gok=
github.com/deckarep/golang-set v0.0.0-20171013212420-1d4478f51bed/go.mod h1:93vsz/8Wt4joVM7c2AVqh+YRMiUSc14yDtF28KmMOgQ=
github.com/deckarep/golang-set v1.7.1 h1:SCQV0S6gTtp6itiFrTqI+pfmJ4LN85S1YzhDf9rTHJQ=
github.com/deckarep/golang-set v1.7.1/go.mod h1:93vsz/8Wt4joVM7c2AVqh+YRMiUSc14yDtF28KmMOgQ=
github.com/elastic/gosigar v0.10.5 h1:GzPQ+78RaAb4J63unidA/JavQRKrB6s8IOzN6Ib59jo=
github.com/elastic/gosigar v0.10.5/go.mod h1:cdorVVzy1fhmEqmtgqkoE3bYtCfSCkVyjTyCIo22xvs=
github.com/elazarl/go-bindata-assetfs v1.0.0 h1:G/bYguwHIzWq9ZoyUQqrjTmJbbYn3j3CKKpKinvZLFk=
Expand Down Expand Up @@ -69,8 +71,8 @@ github.com/mattn/go-isatty v0.0.3 h1:ns/ykhmWi7G9O+8a448SecJU3nSMBXJfqQkl0upE1jI
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-runewidth v0.0.4 h1:2BvfKmzob6Bmd4YsL0zygOqfdFnK7GR4QL06Do4/p7Y=
github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-sqlite3 v1.10.0 h1:jbhqpg7tQe4SupckyijYiy0mJJ/pRyHvXf7JdWK860o=
github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/mattn/go-sqlite3 v1.14.1 h1:AHx9Ra40wIzl+GelgX2X6AWxmT5tfxhI1PL0523HcSw=
github.com/mattn/go-sqlite3 v1.14.1/go.mod h1:JIl7NbARA7phWnGvh0LKTyg7S9BA+6gx71ShQilpsus=
github.com/mattn/go-tty v0.0.0-20180219170247-931426f7535a h1:8TGB3DFRNl06DB1Q6zBX+I7FDoCUZY2fmMS9WGUIIpw=
github.com/mattn/go-tty v0.0.0-20180219170247-931426f7535a/go.mod h1:XPvLUNfbS4fJH25nqRHfWLMa1ONC8Amw+mIA639KxkE=
github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=
Expand Down Expand Up @@ -101,9 +103,12 @@ github.com/vechain/goleveldb v1.0.1-0.20200918014306-20f0a95f6dd4/go.mod h1:u2MK
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/net v0.0.0-20180218175443-cbe0f9307d01/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd h1:nTDtHvHSdCn1m6ITfMRqtOd/9+7a3s8RBNOZ3eYZzJA=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200813134508-3edf25e44fcc h1:zK/HqS5bZxDptfPJNq8v7vJfXtkU7r9TLIoSr1bXaP4=
golang.org/x/net v0.0.0-20200813134508-3edf25e44fcc/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
Expand Down
51 changes: 29 additions & 22 deletions logdb/logdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type LogDB struct {
path string
db *sql.DB
driverVersion string
stmtCache *stmtCache
}

// New create or open log db at given path.
Expand All @@ -48,19 +49,21 @@ func New(path string) (logDB *LogDB, err error) {

driverVer, _, _ := sqlite3.Version()
return &LogDB{
path,
db,
driverVer,
path: path,
db: db,
driverVersion: driverVer,
stmtCache: newStmtCache(db),
}, nil
}

// NewMem create a log db in ram.
func NewMem() (*LogDB, error) {
return New(":memory:")
return New("file::memory:")
}

// Close close the log db.
func (db *LogDB) Close() error {
db.stmtCache.Clear()
return db.db.Close()
}

Expand Down Expand Up @@ -186,8 +189,8 @@ FROM (%v) t
return db.queryTransfers(ctx, fmt.Sprintf(query, subQuery), args...)
}

func (db *LogDB) queryEvents(ctx context.Context, stmt string, args ...interface{}) ([]*Event, error) {
rows, err := db.db.QueryContext(ctx, stmt, args...)
func (db *LogDB) queryEvents(ctx context.Context, query string, args ...interface{}) ([]*Event, error) {
rows, err := db.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -253,8 +256,8 @@ func (db *LogDB) queryEvents(ctx context.Context, stmt string, args ...interface
return events, nil
}

func (db *LogDB) queryTransfers(ctx context.Context, stmt string, args ...interface{}) ([]*Transfer, error) {
rows, err := db.db.QueryContext(ctx, stmt, args...)
func (db *LogDB) queryTransfers(ctx context.Context, query string, args ...interface{}) ([]*Transfer, error) {
rows, err := db.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -313,19 +316,18 @@ func (db *LogDB) queryTransfers(ctx context.Context, stmt string, args ...interf
// NewestBlockID query newest written block id.
func (db *LogDB) NewestBlockID() (thor.Bytes32, error) {
// select from config if any
row := db.db.QueryRow("SELECT value FROM config WHERE key=?", configBlockIDKey)
row := db.stmtCache.MustPrepare("SELECT value FROM config WHERE key=?").QueryRow(configBlockIDKey)
var data []byte
if err := row.Scan(&data); err != nil {
if sql.ErrNoRows != err {
return thor.Bytes32{}, err
}

// no config, query newest block ID from existing records
row = db.db.QueryRow(
`SELECT MAX(data) FROM (
SELECT data FROM ref WHERE id=(SELECT blockId FROM transfer ORDER BY seq DESC LIMIT 1)
UNION
SELECT data FROM ref WHERE id=(SELECT blockId FROM event ORDER BY seq DESC LIMIT 1))`)
row := db.stmtCache.MustPrepare(`SELECT MAX(data) FROM (
SELECT data FROM ref WHERE id=(SELECT blockId FROM transfer ORDER BY seq DESC LIMIT 1)
UNION
SELECT data FROM ref WHERE id=(SELECT blockId FROM event ORDER BY seq DESC LIMIT 1))`).QueryRow()

if err := row.Scan(&data); err != nil {
if sql.ErrNoRows != err {
Expand All @@ -338,12 +340,13 @@ SELECT data FROM ref WHERE id=(SELECT blockId FROM event ORDER BY seq DESC LIMIT

// HasBlockID query whether given block id related logs were written.
func (db *LogDB) HasBlockID(id thor.Bytes32) (bool, error) {
const query = `SELECT COUNT(*) FROM (
SELECT * FROM (SELECT seq FROM transfer WHERE seq=? AND blockID=` + refIDQuery + ` LIMIT 1)
UNION
SELECT * FROM (SELECT seq FROM event WHERE seq=? AND blockID=` + refIDQuery + ` LIMIT 1))`

seq := newSequence(block.Number(id), 0)
row := db.db.QueryRow(fmt.Sprintf(`SELECT COUNT(*) FROM (
SELECT * FROM (SELECT seq FROM transfer WHERE seq=? AND blockID=%v LIMIT 1)
UNION
SELECT * FROM (SELECT seq FROM event WHERE seq=? AND blockID=%v LIMIT 1))`, refIDQuery, refIDQuery),
seq, id.Bytes(), seq, id.Bytes())
row := db.stmtCache.MustPrepare(query).QueryRow(seq, id.Bytes(), seq, id.Bytes())
var count int
if err := row.Scan(&count); err != nil {
// no need to check ErrNoRows
Expand All @@ -354,7 +357,7 @@ SELECT * FROM (SELECT seq FROM event WHERE seq=? AND blockID=%v LIMIT 1))`, refI

// Log write logs.
func (db *LogDB) Log(f func(*Writer) error) error {
w := &Writer{db: db.db}
w := &Writer{db: db.db, stmtCache: db.stmtCache}
if err := f(w); err != nil {
if w.tx != nil {
_ = w.tx.Rollback()
Expand All @@ -374,6 +377,7 @@ func topicValue(topics []thor.Bytes32, i int) []byte {
// Writer is the transactional log writer.
type Writer struct {
db *sql.DB
stmtCache *stmtCache
tx *sql.Tx
len int
lastBlockID thor.Bytes32
Expand All @@ -393,7 +397,10 @@ func (w *Writer) Write(b *block.Block, receipts tx.Receipts) error {

if num > 0 && w.len == 0 {
seq := newSequence(num, 0)
if err := w.exec("DELETE from event where seq >= ?;DELETE from transfer where seq >= ?", seq, seq); err != nil {
if err := w.exec("DELETE FROM event WHERE seq >= ?", seq); err != nil {
return err
}
if err := w.exec("DELETE FROM transfer WHERE seq >= ?", seq); err != nil {
return err
}
}
Expand Down Expand Up @@ -529,7 +536,7 @@ func (w *Writer) exec(query string, args ...interface{}) error {
w.tx = tx
}

if _, err := w.tx.Exec(query, args...); err != nil {
if _, err := w.tx.Stmt(w.stmtCache.MustPrepare(query)).Exec(args...); err != nil {
return err
}
w.len++
Expand Down
50 changes: 50 additions & 0 deletions logdb/stmt_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright (c) 2020 The VeChainThor developers

// Distributed under the GNU Lesser General Public License v3.0 software license, see the accompanying
// file LICENSE or <https://www.gnu.org/licenses/lgpl-3.0.html>

package logdb

import (
"database/sql"
"sync"
)

// to cache prepared sql statement, which maps query string to stmt.
type stmtCache struct {
db *sql.DB
m sync.Map
}

func newStmtCache(db *sql.DB) *stmtCache {
return &stmtCache{db: db}
}

func (sc *stmtCache) Prepare(query string) (*sql.Stmt, error) {
cached, _ := sc.m.Load(query)
if cached == nil {
stmt, err := sc.db.Prepare(query)
if err != nil {
return nil, err
}
sc.m.Store(query, stmt)
cached = stmt
}
return cached.(*sql.Stmt), nil
}

func (sc *stmtCache) MustPrepare(query string) *sql.Stmt {
stmt, err := sc.Prepare(query)
if err != nil {
panic(err)
}
return stmt
}

func (sc *stmtCache) Clear() {
sc.m.Range(func(k, v interface{}) bool {
_ = v.(*sql.Stmt).Close()
sc.m.Delete(k)
return true
})
}

0 comments on commit 9739d5a

Please sign in to comment.