From ce837628e8cec07b8403e5a9ae5ce91ef0e87d7d Mon Sep 17 00:00:00 2001 From: elv-gilles Date: Wed, 24 May 2023 18:57:32 +0200 Subject: [PATCH] sqlite: fix error 'database is locked' * use option '_txlock=immediate' when opening a rw conn in db.DB * add lock around SQLiteConnection.Execute * add retries for sqlite_busy (when using multiple connections) * improve rqlite errors (report caller) --- asynq_end2end_sqlite_test.go | 230 +++++++++++++++++++++++++++++++++ internal/rqlite/errors.go | 43 +++++- internal/sqlite/conn_sqlite.go | 2 +- internal/sqlite3/connection.go | 64 ++++++++- internal/sqlite3/db/db.go | 8 +- 5 files changed, 337 insertions(+), 10 deletions(-) create mode 100644 asynq_end2end_sqlite_test.go diff --git a/asynq_end2end_sqlite_test.go b/asynq_end2end_sqlite_test.go new file mode 100644 index 00000000..2ec44adc --- /dev/null +++ b/asynq_end2end_sqlite_test.go @@ -0,0 +1,230 @@ +package asynq + +import ( + "context" + "encoding/binary" + "fmt" + "os" + "reflect" + "runtime/debug" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.uber.org/atomic" +) + +func TestLocalLongRun(t *testing.T) { + // To execute this manually uncomment this line + //brokerType = SqliteType + + if testing.Short() || brokerType != SqliteType { + // test takes 1 or 2 minutes + t.Skip("long run (1m) with sqlite") + } + + dir, err := os.MkdirTemp("", "testLocalLongRun") + require.NoError(t, err) + cleanup := func() { + _ = os.RemoveAll(dir) + } + defer cleanup() + + dbPath := fmt.Sprintf("%s/db.sqlite", dir) + fmt.Println("db_path", dbPath) + + rqliteConfig.TablesPrefix = "local_" + rqliteConfig.SqliteDbPath = dbPath + rqliteConfig.Type = "sqlite" + + client := NewClient(getClientConnOpt(t)) + srv := newServer(client.rdb, Config{ + ServerID: "inod1111", + Concurrency: 10, + RetryDelayFunc: func(n int, err error, t *Task) time.Duration { + return time.Millisecond * 200 + }, + LogLevel: InfoLevel, + ProcessorEmptyQSleep: time.Millisecond * 200, + ShutdownTimeout: time.Millisecond * 10, + HealthCheckInterval: time.Millisecond * 200, + ForwarderInterval: time.Millisecond * 200, + Queues: &QueuesConfig{ + Queues: map[string]int{ + "low": 1, + "normal": 3, + "high": 5, + }, + }, + }) + + mux := NewServeMux() + _ = srv.Start(mux) + defer srv.Stop() + + inspector := newInspector(srv.broker) + + log := srv.logger + handled := atomic.NewInt64(0) + startCh := make(chan struct{}) + done := atomic.NewBool(false) + doneCh := make(chan struct{}) + + handleError := func(err error) { + if err == nil { + return + } + serr := err.Error() + log.Warnf("error: %v, type: %v", serr, reflect.TypeOf(err)) + if !strings.Contains(serr, "database is locked") { + return + } + log.Warnf("full stack dump - stacktraces %s", "\n"+string(debug.Stack())) + if !done.Load() { + done.Store(true) + close(doneCh) + } + } + + readTask := func(task *Task) error { + id := string(task.Payload()) + queue := task.Type() + _, err := inspector.GetTaskInfo(queue, id) + return err + } + + mux.HandleFunc( + "normal", + func(ctx context.Context, task *Task) error { + c := handled.Inc() + + errr := readTask(task) + handleError(errr) + + time.Sleep(time.Millisecond * 250) + wa := task.ResultWriter() + + bb := binary.LittleEndian.AppendUint64(nil, uint64(c)) + _, errw := wa.Write(bb) + handleError(errw) + + if c%10 == 0 { + log.Infof("handled: %d (task: %s)", c, string(task.Payload())) + } + + return nil + + }) + require.NoError(t, err) + mux.HandleFunc( + "high", + func(ctx context.Context, task *Task) error { + errr := readTask(task) + handleError(errr) + + wa := task.ResultWriter() + _, errw := wa.Write([]byte("b")) + handleError(errw) + + c := handled.Inc() + if c%10 == 0 { + log.Infof("handled count: %d (task: %s)", c, string(task.Payload())) + } + + return nil + }) + require.NoError(t, err) + + w := time.Now() + enqueued := atomic.NewInt64(0) + execDuration := time.Minute * 2 + go func() { + iter := 0 + + for time.Now().Sub(w) < execDuration && enqueued.Load() <= 20000 { + if iter == 5 { + close(startCh) + } + + c := handled.Load() + switch c % 3 { + case 0: + case 1: + time.Sleep(time.Millisecond * 300) + case 2: + time.Sleep(time.Millisecond * 600) + } + id := fmt.Sprintf("a-%d", iter) + _, err := client.EnqueueContext( + context.Background(), + NewTask("normal", []byte(id)), + Queue("normal"), + MaxRetry(1), + ReprocessAfter(time.Millisecond*250), + Timeout(time.Hour), + TaskID(id), + Retention(time.Hour)) + handleError(err) + if err != nil { + break + } + enqueued.Inc() + id = fmt.Sprintf("b-%d", iter) + _, err = client.EnqueueContext( + context.Background(), + NewTask("high", []byte(id)), + Queue("high"), + Timeout(time.Millisecond*150), + TaskID(id), + Retention(time.Hour)) + handleError(err) + if err != nil { + break + } + enqueued.Inc() + iter++ + if iter%100 == 0 { + log.Infof("enqueue - iter count: %d", iter) + } + select { + case <-doneCh: + break + default: + } + } + log.Infof("<< finished enqueuing - took %v, iter: %d, enqueued: %d, handled: %d", time.Now().Sub(w), + iter, + enqueued.Load(), + handled.Load()) + if !done.Load() { + done.Store(true) + close(doneCh) + } + }() + + <-startCh + ticker := time.NewTicker(time.Second * 10) +out: + for { + select { + case <-doneCh: + break out + case <-ticker.C: + enq := enqueued.Load() + han := handled.Load() + log.Infof("queue - d: %v, enqueued: %d, handled: %d", + time.Now().Sub(w), + enq, + han) + if enq == han && enq >= 1000 { + break out + } + } + } + + log.Infof("<< queue closing - took: %v, enqueued: %d, handled: %d", + time.Now().Sub(w), + enqueued.Load(), + handled.Load()) +} diff --git a/internal/rqlite/errors.go b/internal/rqlite/errors.go index dc681372..cd24818c 100644 --- a/internal/rqlite/errors.go +++ b/internal/rqlite/errors.go @@ -2,6 +2,8 @@ package rqlite import ( "fmt" + "runtime" + "strings" "github.com/hibiken/asynq/internal/errors" "github.com/hibiken/asynq/internal/sqlite3" @@ -23,12 +25,18 @@ func (s StatementError) String() string { // RqliteError indicates a command sent to rqlite returned error. type RqliteError struct { Op errors.Op + Caller string Err error // outer most error Statements []StatementError // specific error } func (e *RqliteError) Error() string { - ret := fmt.Sprintf("%s - rqlite error: %v ", e.Op, e.Err) + ret := fmt.Sprintf("%s - rqlite error: %s", e.Op, e.Err.Error()) + if e.Caller != "" { + ret += ", at: " + e.Caller + } else { + ret += " " + } if len(e.Statements) == 0 { return ret } @@ -45,6 +53,7 @@ func (e *RqliteError) Unwrap() error { return e.Err } func NewRqliteWsError(op errors.Op, wrs []sqlite3.WriteResult, err error, stmts []*sqlite3.Statement) error { statements := make([]StatementError, 0) + caller := caller(1) for ndx, wr := range wrs { if wr.Err != nil { statements = append(statements, StatementError{Error: wr.Err, Statement: stmts[ndx]}) @@ -52,6 +61,7 @@ func NewRqliteWsError(op errors.Op, wrs []sqlite3.WriteResult, err error, stmts } return &RqliteError{ Op: op, + Caller: caller, Err: err, Statements: statements, } @@ -59,6 +69,7 @@ func NewRqliteWsError(op errors.Op, wrs []sqlite3.WriteResult, err error, stmts func NewRqliteRsError(op errors.Op, qrs []sqlite3.QueryResult, err error, stmts []*sqlite3.Statement) error { statements := make([]StatementError, 0) + caller := caller(1) for ndx, qr := range qrs { if qr.Err() != nil { statements = append(statements, StatementError{Error: qr.Err(), Statement: stmts[ndx]}) @@ -66,6 +77,7 @@ func NewRqliteRsError(op errors.Op, qrs []sqlite3.QueryResult, err error, stmts } return &RqliteError{ Op: op, + Caller: caller, Err: err, Statements: statements, } @@ -113,3 +125,32 @@ func expectOneRowUpdated(op errors.Op, wrs []sqlite3.WriteResult, index int, st } return nil } + +// caller reports information on the caller at the given index in calling +// goroutine's stack. The argument index is the number of stack frames to ascend, +// with 0 identifying the caller of Caller. +// This function uses internally runtime.Caller +// The returned string contains the 'simple' name of the package and function +// followed by (file-name:line-number) of the caller. +func caller(index int) string { + simpleName := func(name string) string { + if n := strings.LastIndex(name, "/"); n > 0 { + name = name[n+1:] + } + return name + } + + fname := "unknown" + pc, file, line, ok := runtime.Caller(index + 1) // account for this call + if !ok { + file = "??" + line = 1 + } else { + file = simpleName(file) + } + f := runtime.FuncForPC(pc) + if f != nil { + fname = simpleName(f.Name()) + } + return fmt.Sprintf("%s (%s:%d)", fname, file, line) +} diff --git a/internal/sqlite/conn_sqlite.go b/internal/sqlite/conn_sqlite.go index cddb95af..11738c0f 100644 --- a/internal/sqlite/conn_sqlite.go +++ b/internal/sqlite/conn_sqlite.go @@ -24,5 +24,5 @@ func NewSQLiteConnection(ctx context.Context, dbPath string, inMemory bool) (*sq if err != nil { return nil, errors.E(op, errors.Internal, err) } - return sqlite3.NewSQLiteConnection(conn), nil + return sqlite3.NewSQLiteConnection(conn, true), nil } diff --git a/internal/sqlite3/connection.go b/internal/sqlite3/connection.go index 14cdad3d..f5c536a2 100644 --- a/internal/sqlite3/connection.go +++ b/internal/sqlite3/connection.go @@ -2,20 +2,26 @@ package sqlite3 import ( "context" + "sync" + "time" "github.com/hibiken/asynq/internal/errors" "github.com/hibiken/asynq/internal/sqlite3/command" "github.com/hibiken/asynq/internal/sqlite3/db" "github.com/hibiken/asynq/internal/sqlite3/encoding" + "github.com/mattn/go-sqlite3" ) type SQLiteConnection struct { - db *db.DB + mu sync.Mutex // protect db + db *db.DB // the actual connection + retryBusy bool // retry 'execute' on sqlite3.ErrBusy error: 'database is locked' } -func NewSQLiteConnection(db *db.DB) *SQLiteConnection { +func NewSQLiteConnection(db *db.DB, retryBusy bool) *SQLiteConnection { ret := &SQLiteConnection{ - db: db, + db: db, + retryBusy: retryBusy, } return ret } @@ -43,13 +49,19 @@ func newRequest(stmts []*Statement) *command.Request { }) } return &command.Request{ - Transaction: true, // PENDING(GIL): config + Transaction: true, Statements: statements, } } +func (c *SQLiteConnection) QueryContext(ctx context.Context, req *command.Request, xTime bool) ([]*command.QueryRows, error) { + // no lock: db uses a 'read' connection + return c.db.QueryContext(ctx, req, xTime) +} + func (c *SQLiteConnection) QueryStmt(ctx context.Context, stmts ...*Statement) ([]QueryResult, error) { - qrows, err := c.db.QueryContext(ctx, newRequest(stmts), true) // PENDING(GIL): config + req := newRequest(stmts) + qrows, err := c.QueryContext(ctx, req, true) if err != nil { return nil, err } @@ -66,8 +78,48 @@ func (c *SQLiteConnection) QueryStmt(ctx context.Context, stmts ...*Statement) ( return ret, nil } +func (c *SQLiteConnection) ExecuteContext(ctx context.Context, req *command.Request, xTime bool) ([]*command.ExecuteResult, error) { + c.mu.Lock() + defer c.mu.Unlock() + + attempt := 0 + // uncomment for troubleshooting + //t0 := time.Now() + + // We are protected by taking the lock for concurrent calls using this + // connection, but not against concurrent calls made with another connection. + // Hence, retry write attempts that failed with error: SQLITE_BUSY - 05, with + // message 'database is locked'. + // + // Note that all constructors of db.DB open their internal write connections + // with option '_txlock=immediate' which is expected to provide a failure + // upfront (and not after executing some statements). Also function newRequest + // (in this file) always uses Transaction: true + + for { + attempt++ + wrs, err := c.db.ExecuteContext(ctx, req, xTime) + if sqliteErr, ok := err.(sqlite3.Error); ok && + sqliteErr.Code == sqlite3.ErrBusy && + attempt <= 20 && + c.retryBusy { + // 99% of errors (in a unit-test) were eliminated with a 5ms delay + // but half of them required 5 retries or more. + // Also: using an exponential backoff did not help at all. + time.Sleep(time.Millisecond * 5) + continue + } + // uncomment for troubleshooting + //if attempt > 1 { + // fmt.Println("ExecuteContext", "attempt", attempt, "d", time.Now().Sub(t0), "err", err) + //} + return wrs, err + } +} + func (c *SQLiteConnection) WriteStmt(ctx context.Context, stmts ...*Statement) ([]WriteResult, error) { - wrs, err := c.db.ExecuteContext(ctx, newRequest(stmts), true) // PENDING(GIL): config + req := newRequest(stmts) + wrs, err := c.ExecuteContext(ctx, req, true) if err != nil { return nil, err } diff --git a/internal/sqlite3/db/db.go b/internal/sqlite3/db/db.go index 9dfaeadb..fc1ed231 100644 --- a/internal/sqlite3/db/db.go +++ b/internal/sqlite3/db/db.go @@ -90,7 +90,11 @@ func Open(dbPath string, fkEnabled bool) (*DB, error) { // OpenContext opens a file-based database, creating it if it does not exist. // After this function returns, an actual SQLite file will always exist. func OpenContext(ctx context.Context, dbPath string, fkEnabled bool) (*DB, error) { - rwDSN := fmt.Sprintf("file:%s?_fk=%s", dbPath, strconv.FormatBool(fkEnabled)) + rwOpts := []string{ + "_txlock=immediate", + fmt.Sprintf("_fk=%s", strconv.FormatBool(fkEnabled)), + } + rwDSN := fmt.Sprintf("file:%s?%s", dbPath, strings.Join(rwOpts, "&")) rwDB, err := sql.Open("sqlite3", rwDSN) if err != nil { return nil, err @@ -98,9 +102,9 @@ func OpenContext(ctx context.Context, dbPath string, fkEnabled bool) (*DB, error roOpts := []string{ "mode=ro", + "_txlock=deferred", fmt.Sprintf("_fk=%s", strconv.FormatBool(fkEnabled)), } - roDSN := fmt.Sprintf("file:%s?%s", dbPath, strings.Join(roOpts, "&")) roDB, err := sql.Open("sqlite3", roDSN) if err != nil {