Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sqlite: fix error 'database is locked' #19

Merged
merged 1 commit into from
Jun 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 230 additions & 0 deletions asynq_end2end_sqlite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
package asynq

import (
"context"
"encoding/binary"
"fmt"
"os"
"reflect"
"runtime/debug"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.uber.org/atomic"
)

func TestLocalLongRun(t *testing.T) {
// To execute this manually uncomment this line
//brokerType = SqliteType

if testing.Short() || brokerType != SqliteType {
// test takes 1 or 2 minutes
t.Skip("long run (1m) with sqlite")
}

dir, err := os.MkdirTemp("", "testLocalLongRun")
require.NoError(t, err)
cleanup := func() {
_ = os.RemoveAll(dir)
}
defer cleanup()

dbPath := fmt.Sprintf("%s/db.sqlite", dir)
fmt.Println("db_path", dbPath)

rqliteConfig.TablesPrefix = "local_"
rqliteConfig.SqliteDbPath = dbPath
rqliteConfig.Type = "sqlite"

client := NewClient(getClientConnOpt(t))
srv := newServer(client.rdb, Config{
ServerID: "inod1111",
Concurrency: 10,
RetryDelayFunc: func(n int, err error, t *Task) time.Duration {
return time.Millisecond * 200
},
LogLevel: InfoLevel,
ProcessorEmptyQSleep: time.Millisecond * 200,
ShutdownTimeout: time.Millisecond * 10,
HealthCheckInterval: time.Millisecond * 200,
ForwarderInterval: time.Millisecond * 200,
Queues: &QueuesConfig{
Queues: map[string]int{
"low": 1,
"normal": 3,
"high": 5,
},
},
})

mux := NewServeMux()
_ = srv.Start(mux)
defer srv.Stop()

inspector := newInspector(srv.broker)

log := srv.logger
handled := atomic.NewInt64(0)
startCh := make(chan struct{})
done := atomic.NewBool(false)
doneCh := make(chan struct{})

handleError := func(err error) {
if err == nil {
return
}
serr := err.Error()
log.Warnf("error: %v, type: %v", serr, reflect.TypeOf(err))
if !strings.Contains(serr, "database is locked") {
return
}
log.Warnf("full stack dump - stacktraces %s", "\n"+string(debug.Stack()))
if !done.Load() {
done.Store(true)
close(doneCh)
}
}

readTask := func(task *Task) error {
id := string(task.Payload())
queue := task.Type()
_, err := inspector.GetTaskInfo(queue, id)
return err
}

mux.HandleFunc(
"normal",
func(ctx context.Context, task *Task) error {
c := handled.Inc()

errr := readTask(task)
handleError(errr)

time.Sleep(time.Millisecond * 250)
wa := task.ResultWriter()

bb := binary.LittleEndian.AppendUint64(nil, uint64(c))
_, errw := wa.Write(bb)
handleError(errw)

if c%10 == 0 {
log.Infof("handled: %d (task: %s)", c, string(task.Payload()))
}

return nil

})
require.NoError(t, err)
mux.HandleFunc(
"high",
func(ctx context.Context, task *Task) error {
errr := readTask(task)
handleError(errr)

wa := task.ResultWriter()
_, errw := wa.Write([]byte("b"))
handleError(errw)

c := handled.Inc()
if c%10 == 0 {
log.Infof("handled count: %d (task: %s)", c, string(task.Payload()))
}

return nil
})
require.NoError(t, err)

w := time.Now()
enqueued := atomic.NewInt64(0)
execDuration := time.Minute * 2
go func() {
iter := 0

for time.Now().Sub(w) < execDuration && enqueued.Load() <= 20000 {
if iter == 5 {
close(startCh)
}

c := handled.Load()
switch c % 3 {
case 0:
case 1:
time.Sleep(time.Millisecond * 300)
case 2:
time.Sleep(time.Millisecond * 600)
}
id := fmt.Sprintf("a-%d", iter)
_, err := client.EnqueueContext(
context.Background(),
NewTask("normal", []byte(id)),
Queue("normal"),
MaxRetry(1),
ReprocessAfter(time.Millisecond*250),
Timeout(time.Hour),
TaskID(id),
Retention(time.Hour))
handleError(err)
if err != nil {
break
}
enqueued.Inc()
id = fmt.Sprintf("b-%d", iter)
_, err = client.EnqueueContext(
context.Background(),
NewTask("high", []byte(id)),
Queue("high"),
Timeout(time.Millisecond*150),
TaskID(id),
Retention(time.Hour))
handleError(err)
if err != nil {
break
}
enqueued.Inc()
iter++
if iter%100 == 0 {
log.Infof("enqueue - iter count: %d", iter)
}
select {
case <-doneCh:
break
default:
}
}
log.Infof("<< finished enqueuing - took %v, iter: %d, enqueued: %d, handled: %d", time.Now().Sub(w),
iter,
enqueued.Load(),
handled.Load())
if !done.Load() {
done.Store(true)
close(doneCh)
}
}()

<-startCh
ticker := time.NewTicker(time.Second * 10)
out:
for {
select {
case <-doneCh:
break out
case <-ticker.C:
enq := enqueued.Load()
han := handled.Load()
log.Infof("queue - d: %v, enqueued: %d, handled: %d",
time.Now().Sub(w),
enq,
han)
if enq == han && enq >= 1000 {
break out
}
}
}

log.Infof("<< queue closing - took: %v, enqueued: %d, handled: %d",
time.Now().Sub(w),
enqueued.Load(),
handled.Load())
}
43 changes: 42 additions & 1 deletion internal/rqlite/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package rqlite

import (
"fmt"
"runtime"
"strings"

"github.com/hibiken/asynq/internal/errors"
"github.com/hibiken/asynq/internal/sqlite3"
Expand All @@ -23,12 +25,18 @@ func (s StatementError) String() string {
// RqliteError indicates a command sent to rqlite returned error.
type RqliteError struct {
Op errors.Op
Caller string
Err error // outer most error
Statements []StatementError // specific error
}

func (e *RqliteError) Error() string {
ret := fmt.Sprintf("%s - rqlite error: %v ", e.Op, e.Err)
ret := fmt.Sprintf("%s - rqlite error: %s", e.Op, e.Err.Error())
if e.Caller != "" {
ret += ", at: " + e.Caller
} else {
ret += " "
}
if len(e.Statements) == 0 {
return ret
}
Expand All @@ -45,27 +53,31 @@ func (e *RqliteError) Unwrap() error { return e.Err }

func NewRqliteWsError(op errors.Op, wrs []sqlite3.WriteResult, err error, stmts []*sqlite3.Statement) error {
statements := make([]StatementError, 0)
caller := caller(1)
for ndx, wr := range wrs {
if wr.Err != nil {
statements = append(statements, StatementError{Error: wr.Err, Statement: stmts[ndx]})
}
}
return &RqliteError{
Op: op,
Caller: caller,
Err: err,
Statements: statements,
}
}

func NewRqliteRsError(op errors.Op, qrs []sqlite3.QueryResult, err error, stmts []*sqlite3.Statement) error {
statements := make([]StatementError, 0)
caller := caller(1)
for ndx, qr := range qrs {
if qr.Err() != nil {
statements = append(statements, StatementError{Error: qr.Err(), Statement: stmts[ndx]})
}
}
return &RqliteError{
Op: op,
Caller: caller,
Err: err,
Statements: statements,
}
Expand Down Expand Up @@ -113,3 +125,32 @@ func expectOneRowUpdated(op errors.Op, wrs []sqlite3.WriteResult, index int, st
}
return nil
}

// caller reports information on the caller at the given index in calling
// goroutine's stack. The argument index is the number of stack frames to ascend,
// with 0 identifying the caller of Caller.
// This function uses internally runtime.Caller
// The returned string contains the 'simple' name of the package and function
// followed by (file-name:line-number) of the caller.
func caller(index int) string {
simpleName := func(name string) string {
if n := strings.LastIndex(name, "/"); n > 0 {
name = name[n+1:]
}
return name
}

fname := "unknown"
pc, file, line, ok := runtime.Caller(index + 1) // account for this call
if !ok {
file = "??"
line = 1
} else {
file = simpleName(file)
}
f := runtime.FuncForPC(pc)
if f != nil {
fname = simpleName(f.Name())
}
return fmt.Sprintf("%s (%s:%d)", fname, file, line)
}
2 changes: 1 addition & 1 deletion internal/sqlite/conn_sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ func NewSQLiteConnection(ctx context.Context, dbPath string, inMemory bool) (*sq
if err != nil {
return nil, errors.E(op, errors.Internal, err)
}
return sqlite3.NewSQLiteConnection(conn), nil
return sqlite3.NewSQLiteConnection(conn, true), nil
}
Loading