-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathquery.go
109 lines (98 loc) · 2.32 KB
/
query.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package lorm
import (
"database/sql"
"log"
"strings"
"sync"
"time"
"github.com/tada-team/lorm/op"
)
func TxLock2(tx *Tx, k1, k2 int) error {
args := op.NewArgs()
query := op.Select(op.PgAdvisoryXactLock2(k1, k2))
_, err := TxExec(tx, nil, query, args)
return err
}
func TxExec(tx *Tx, locker sync.Locker, q op.Query, args op.Args) (res sql.Result, err error) {
query := q.Query()
defer trackQuery(tx, query, args)()
err = retry(func() error {
res, err = doExec(tx, locker, query, args)
return err
})
return res, err
}
func TxQuery(tx *Tx, q op.Query, args op.Args, each func(*sql.Rows) error) error {
query := q.Query()
defer trackQuery(tx, query, args)()
return retry(func() error {
rows, err := doQuery(tx, nil, query, args)
if err != nil {
return err
}
defer func() { _ = rows.Close() }()
for rows.Next() {
err := each(rows)
if err != nil {
return err
}
}
return rows.Err()
})
}
func TxScan(tx *Tx, locker sync.Locker, q op.Query, args op.Args, dest ...interface{}) error {
query := q.Query()
defer trackQuery(tx, query, args)()
return retry(func() error { return doQueryRow(tx, locker, query, args).Scan(dest...) })
}
func doExec(tx *Tx, locker sync.Locker, query string, args op.Args) (sql.Result, error) {
if locker != nil && !disableLocks {
locker.Lock()
defer locker.Unlock()
}
if tx == nil {
return conn.Exec(query, args...)
}
return tx.Exec(query, args...)
}
func doQuery(tx *Tx, locker sync.Locker, query string, args op.Args) (*sql.Rows, error) {
if locker != nil && !disableLocks {
locker.Lock()
defer locker.Unlock()
}
if tx == nil {
return conn.Query(query, args...)
}
return tx.Query(query, args...)
}
func doQueryRow(tx *Tx, locker sync.Locker, query string, args op.Args) *sql.Row {
if locker != nil && !disableLocks {
locker.Lock()
defer locker.Unlock()
}
if tx == nil {
return conn.QueryRow(query, args...)
}
return tx.QueryRow(query, args...)
}
func retry(fn func() error) error {
i := 0
for {
err := fn()
if err != nil && nonFatalError(err) && i <= MaxAttempts {
i++
log.Println("lorm: warn:", err, "retry:", i)
time.Sleep(time.Duration(i) * time.Second)
continue
}
return err
}
}
func nonFatalError(err error) bool {
for _, s := range NonFatalErrors {
if strings.Contains(err.Error(), s) {
return true
}
}
return false
}