Skip to content

Commit

Permalink
sqlite: fix error 'database is locked' (#19)
Browse files Browse the repository at this point in the history
* use option '_txlock=immediate' when opening a rw conn in db.DB
* add lock around SQLiteConnection.Execute
* add retries for sqlite_busy (when using multiple connections)
* improve rqlite errors (report caller)
  • Loading branch information
elv-gilles authored Jun 9, 2023
1 parent 29b4faf commit e507721
Show file tree
Hide file tree
Showing 5 changed files with 337 additions and 10 deletions.
230 changes: 230 additions & 0 deletions asynq_end2end_sqlite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
package asynq

import (
"context"
"encoding/binary"
"fmt"
"os"
"reflect"
"runtime/debug"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)

func TestLocalLongRun(t *testing.T) {
// To execute this manually uncomment this line
//brokerType = SqliteType

if testing.Short() || brokerType != SqliteType {
// test takes 1 or 2 minutes
t.Skip("long run (1m) with sqlite")
}

dir, err := os.MkdirTemp("", "testLocalLongRun")
require.NoError(t, err)
cleanup := func() {
_ = os.RemoveAll(dir)
}
defer cleanup()

dbPath := fmt.Sprintf("%s/db.sqlite", dir)
fmt.Println("db_path", dbPath)

rqliteConfig.TablesPrefix = "local_"
rqliteConfig.SqliteDbPath = dbPath
rqliteConfig.Type = "sqlite"

client := NewClient(getClientConnOpt(t))
srv := newServer(client.rdb, Config{
ServerID: "inod1111",
Concurrency: 10,
RetryDelayFunc: func(n int, err error, t *Task) time.Duration {
return time.Millisecond * 200
},
LogLevel: InfoLevel,
ProcessorEmptyQSleep: time.Millisecond * 200,
ShutdownTimeout: time.Millisecond * 10,
HealthCheckInterval: time.Millisecond * 200,
ForwarderInterval: time.Millisecond * 200,
Queues: &QueuesConfig{
Queues: map[string]int{
"low": 1,
"normal": 3,
"high": 5,
},
},
})

mux := NewServeMux()
_ = srv.Start(mux)
defer srv.Stop()

inspector := newInspector(srv.broker)

log := srv.logger
handled := atomic.NewInt64(0)
startCh := make(chan struct{})
done := atomic.NewBool(false)
doneCh := make(chan struct{})

handleError := func(err error) {
if err == nil {
return
}
serr := err.Error()
log.Warnf("error: %v, type: %v", serr, reflect.TypeOf(err))
if !strings.Contains(serr, "database is locked") {
return
}
log.Warnf("full stack dump - stacktraces %s", "\n"+string(debug.Stack()))
if !done.Load() {
done.Store(true)
close(doneCh)
}
}

readTask := func(task *Task) error {
id := string(task.Payload())
queue := task.Type()
_, err := inspector.GetTaskInfo(queue, id)
return err
}

mux.HandleFunc(
"normal",
func(ctx context.Context, task *Task) error {
c := handled.Inc()

errr := readTask(task)
handleError(errr)

time.Sleep(time.Millisecond * 250)
wa := task.ResultWriter()

bb := binary.LittleEndian.AppendUint64(nil, uint64(c))
_, errw := wa.Write(bb)
handleError(errw)

if c%10 == 0 {
log.Infof("handled: %d (task: %s)", c, string(task.Payload()))
}

return nil

})
require.NoError(t, err)
mux.HandleFunc(
"high",
func(ctx context.Context, task *Task) error {
errr := readTask(task)
handleError(errr)

wa := task.ResultWriter()
_, errw := wa.Write([]byte("b"))
handleError(errw)

c := handled.Inc()
if c%10 == 0 {
log.Infof("handled count: %d (task: %s)", c, string(task.Payload()))
}

return nil
})
require.NoError(t, err)

w := time.Now()
enqueued := atomic.NewInt64(0)
execDuration := time.Minute * 2
go func() {
iter := 0

for time.Now().Sub(w) < execDuration && enqueued.Load() <= 20000 {
if iter == 5 {
close(startCh)
}

c := handled.Load()
switch c % 3 {
case 0:
case 1:
time.Sleep(time.Millisecond * 300)
case 2:
time.Sleep(time.Millisecond * 600)
}
id := fmt.Sprintf("a-%d", iter)
_, err := client.EnqueueContext(
context.Background(),
NewTask("normal", []byte(id)),
Queue("normal"),
MaxRetry(1),
ReprocessAfter(time.Millisecond*250),
Timeout(time.Hour),
TaskID(id),
Retention(time.Hour))
handleError(err)
if err != nil {
break
}
enqueued.Inc()
id = fmt.Sprintf("b-%d", iter)
_, err = client.EnqueueContext(
context.Background(),
NewTask("high", []byte(id)),
Queue("high"),
Timeout(time.Millisecond*150),
TaskID(id),
Retention(time.Hour))
handleError(err)
if err != nil {
break
}
enqueued.Inc()
iter++
if iter%100 == 0 {
log.Infof("enqueue - iter count: %d", iter)
}
select {
case <-doneCh:
break
default:
}
}
log.Infof("<< finished enqueuing - took %v, iter: %d, enqueued: %d, handled: %d", time.Now().Sub(w),
iter,
enqueued.Load(),
handled.Load())
if !done.Load() {
done.Store(true)
close(doneCh)
}
}()

<-startCh
ticker := time.NewTicker(time.Second * 10)
out:
for {
select {
case <-doneCh:
break out
case <-ticker.C:
enq := enqueued.Load()
han := handled.Load()
log.Infof("queue - d: %v, enqueued: %d, handled: %d",
time.Now().Sub(w),
enq,
han)
if enq == han && enq >= 1000 {
break out
}
}
}

log.Infof("<< queue closing - took: %v, enqueued: %d, handled: %d",
time.Now().Sub(w),
enqueued.Load(),
handled.Load())
}
43 changes: 42 additions & 1 deletion internal/rqlite/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package rqlite

import (
"fmt"
"runtime"
"strings"

"github.com/hibiken/asynq/internal/errors"
"github.com/hibiken/asynq/internal/sqlite3"
Expand All @@ -23,12 +25,18 @@ func (s StatementError) String() string {
// RqliteError indicates a command sent to rqlite returned error.
type RqliteError struct {
Op errors.Op
Caller string
Err error // outer most error
Statements []StatementError // specific error
}

func (e *RqliteError) Error() string {
ret := fmt.Sprintf("%s - rqlite error: %v ", e.Op, e.Err)
ret := fmt.Sprintf("%s - rqlite error: %s", e.Op, e.Err.Error())
if e.Caller != "" {
ret += ", at: " + e.Caller
} else {
ret += " "
}
if len(e.Statements) == 0 {
return ret
}
Expand All @@ -45,27 +53,31 @@ func (e *RqliteError) Unwrap() error { return e.Err }

func NewRqliteWsError(op errors.Op, wrs []sqlite3.WriteResult, err error, stmts []*sqlite3.Statement) error {
statements := make([]StatementError, 0)
caller := caller(1)
for ndx, wr := range wrs {
if wr.Err != nil {
statements = append(statements, StatementError{Error: wr.Err, Statement: stmts[ndx]})
}
}
return &RqliteError{
Op: op,
Caller: caller,
Err: err,
Statements: statements,
}
}

func NewRqliteRsError(op errors.Op, qrs []sqlite3.QueryResult, err error, stmts []*sqlite3.Statement) error {
statements := make([]StatementError, 0)
caller := caller(1)
for ndx, qr := range qrs {
if qr.Err() != nil {
statements = append(statements, StatementError{Error: qr.Err(), Statement: stmts[ndx]})
}
}
return &RqliteError{
Op: op,
Caller: caller,
Err: err,
Statements: statements,
}
Expand Down Expand Up @@ -113,3 +125,32 @@ func expectOneRowUpdated(op errors.Op, wrs []sqlite3.WriteResult, index int, st
}
return nil
}

// caller reports information on the caller at the given index in calling
// goroutine's stack. The argument index is the number of stack frames to ascend,
// with 0 identifying the caller of Caller.
// This function uses internally runtime.Caller
// The returned string contains the 'simple' name of the package and function
// followed by (file-name:line-number) of the caller.
func caller(index int) string {
simpleName := func(name string) string {
if n := strings.LastIndex(name, "/"); n > 0 {
name = name[n+1:]
}
return name
}

fname := "unknown"
pc, file, line, ok := runtime.Caller(index + 1) // account for this call
if !ok {
file = "??"
line = 1
} else {
file = simpleName(file)
}
f := runtime.FuncForPC(pc)
if f != nil {
fname = simpleName(f.Name())
}
return fmt.Sprintf("%s (%s:%d)", fname, file, line)
}
2 changes: 1 addition & 1 deletion internal/sqlite/conn_sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ func NewSQLiteConnection(ctx context.Context, dbPath string, inMemory bool) (*sq
if err != nil {
return nil, errors.E(op, errors.Internal, err)
}
return sqlite3.NewSQLiteConnection(conn), nil
return sqlite3.NewSQLiteConnection(conn, true), nil
}
Loading

0 comments on commit e507721

Please sign in to comment.