-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdb_test.go
292 lines (233 loc) · 5.92 KB
/
db_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
package sqle
import (
"context"
"database/sql"
"testing"
"time"
_ "github.com/mattn/go-sqlite3"
"github.com/stretchr/testify/require"
"github.com/yaitoo/sqle/shardid"
)
func createSQLite3() *sql.DB {
// f, err := os.CreateTemp(".", "*.db")
// f.Close()
// clean := func() {
// os.Remove(f.Name()) //nolint
// }
// if err != nil {
// return nil, clean, err
// }
// db, err := sql.Open("sqlite3", "file:"+f.Name()+"?cache=shared")
db, err := sql.Open("sqlite3", "file::memory:")
if err != nil {
return nil
}
// https://github.com/mattn/go-sqlite3/issues/209
// db.SetMaxOpenConns(1)
return db
}
func TestOn(t *testing.T) {
dbs := make([]*sql.DB, 0, 10)
for i := 0; i < 10; i++ {
db3 := createSQLite3()
db3.Exec("CREATE TABLE `users` (`id` bigint , `status` tinyint,`email` varchar(50),`passwd` varchar(120), `salt` varchar(45), `created` DATETIME, PRIMARY KEY (`id`))") //nolint: errcheck
dbs = append(dbs, db3)
}
db := Open(dbs...)
gen := shardid.New(shardid.WithDatabase(10))
ids := make([]shardid.ID, 10)
for i := 0; i < 10; i++ {
id := gen.Next()
b := New().On(id).
Insert("users").
Set("id", id.Int64).
Set("status", 1).
Set("created", time.Now()).
End()
result, err := db.On(id).ExecBuilder(context.TODO(), b)
require.NoError(t, err)
rows, err := result.RowsAffected()
require.NoError(t, err)
require.Equal(t, int64(1), rows)
ids[i] = id
}
for i, id := range ids {
b := New().On(id).Select("users", "id")
ctx := db.On(id)
require.Equal(t, i, ctx.Index)
var userID int64
err := ctx.QueryRowBuilder(context.TODO(), b).Scan(&userID)
require.NoError(t, err)
require.Equal(t, id.Int64, userID)
}
}
func TestDHT(t *testing.T) {
db := Open(createSQLite3())
// "" is default DHT name
_, err := db.OnDHT("")
require.ErrorIs(t, err, ErrMissingDHT)
// MUST NOT panic even DHT is missing
db.GetDHT("").Add(1)
db.GetDHT("").Done()
db.NewDHT("", 0)
ctx, err := db.OnDHT("")
require.NoError(t, err)
require.Equal(t, 0, ctx.Index)
db.Add(createSQLite3())
db.GetDHT("").Add(1)
db.GetDHT("").Done()
ctx, err = db.OnDHT("")
require.NoError(t, err)
require.NotNil(t, ctx)
db.NewDHT("user", 1)
ctx, err = db.OnDHT("id", "user")
require.NoError(t, err)
require.Equal(t, 1, ctx.Index)
}
func TestOnDHT(t *testing.T) {
dbs := make([]*sql.DB, 0, 10)
for i := 0; i < 10; i++ {
db3 := createSQLite3()
db3.Exec("CREATE TABLE `dht` (`v` varchar(50), PRIMARY KEY (`v`))") // nolint: errcheck
dbs = append(dbs, db3)
}
db := Open(dbs...)
// 2 dbs -> 3 dbs -> data
// -> 2439456 1149
// 46916880 E0 0 !
// 63694499 E1 1
// <- 80472118 E2 2
// <- 84017712 S2 2
// -> 111074370 638
// 117572950 S0 0 !
// 134350569 S1 1
// <- 214987260 G2 2
// 248542498 G0 0 !
// 265320117 G1 1
// 316638712 M0 0
// 333416331 M1 1
// <- 350193950 M2 2
// <- 351179688 K2 2
// 384734926 K0 0 !
// 401512545 K1 1
// <- 484709092 O2 2
// 518264330 O0 0 !
// 535041949 O1 1
// <- 2228889920 C2 2
// 2262445158 C0 0 !
// 2279222777 C1 1
// 2330541372 I0 0
// 2347318991 I1 1
// <- 2364096610 I2 2
// 2597703348 A0 0 !
// 2600263204 Q0 0
// 2614480967 A1 1
// 2617040823 Q1 1
// <- 2631258586 A2 2
// <- 2633818442 Q2 2
// -> 4113327457 150
db.NewDHT("", 1, 2)
values := map[string]int{
"1149": 1,
"S0": 2,
"I2": 1,
}
for v, i := range values {
b := New().Insert("dht").
Set("v", v).
End()
c, err := db.OnDHT(v)
require.NoError(t, err)
require.Equal(t, i, c.Index)
result, err := c.ExecBuilder(context.TODO(), b)
require.NoError(t, err)
rows, err := result.RowsAffected()
require.NoError(t, err)
require.Equal(t, int64(1), rows)
}
for v, i := range values {
b := New().Select("dht", "v").Where("v = {v}").Param("v", v)
ctx := db.dbs[i]
var val string
err := ctx.QueryRowBuilder(context.TODO(), b).Scan(&val)
require.NoError(t, err)
require.Equal(t, v, val)
}
}
func TestDHTScaling(t *testing.T) {
dbs := make([]*sql.DB, 0, 10)
for i := 0; i < 10; i++ {
db3 := createSQLite3()
db3.Exec("CREATE TABLE `dht` (`v` varchar(50), PRIMARY KEY (`v`))") // nolint: errcheck
dbs = append(dbs, db3)
}
db := Open(dbs...)
// 2 dbs -> 3 dbs -> data
// -> 2439456 1149
// 46916880 E0 0 !
// 63694499 E1 1
// <- 80472118 E2 2
// -> 83143427 3850
// <- 84017712 S2 2
// -> 111074370 638
// 117572950 S0 0 !
// 134350569 S1 1
// <- 214987260 G2 2
// 248542498 G0 0 !
// 265320117 G1 1
// 316638712 M0 0
// 333416331 M1 1
// <- 350193950 M2 2
// <- 351179688 K2 2
// 384734926 K0 0 !
// 401512545 K1 1
// <- 484709092 O2 2
// 518264330 O0 0 !
// 535041949 O1 1
// <- 2228889920 C2 2
// 2262445158 C0 0 !
// 2279222777 C1 1
// 2330541372 I0 0
// 2347318991 I1 1
// <- 2364096610 I2 2
// 2597703348 A0 0 !
// 2600263204 Q0 0
// 2614480967 A1 1
// 2617040823 Q1 1
// <- 2631258586 A2 2
// <- 2633818442 Q2 2
// -> 4113327457 150
db.NewDHT("", 0, 1)
type item struct {
current int
busy bool
next int
}
values := make(map[string]item)
values["1149"] = item{current: 0, busy: false, next: 0} // Busy
values["E1"] = item{current: 0, busy: true, next: 2} // move from S0 to E2
values["3850"] = item{current: 0, busy: true, next: 2} // move from S0 to S2
values["638"] = item{current: 0, busy: false, next: 0} // keep on S0
values["150"] = item{current: 0, busy: false, next: 0} // keep on E0
for v, it := range values {
ctx, err := db.OnDHT(v)
require.NoError(t, err)
require.Equal(t, it.current, ctx.Index)
}
db.GetDHT("").Add(2)
for v, it := range values {
ctx, err := db.OnDHT(v)
if it.busy {
require.ErrorIs(t, err, shardid.ErrDataItemIsBusy)
} else {
require.NoError(t, err)
require.Equal(t, it.current, ctx.Index)
}
}
db.GetDHT("").Done()
for v, it := range values {
ctx, err := db.OnDHT(v)
require.NoError(t, err)
require.Equal(t, it.next, ctx.Index)
}
}