forked from yoavfeld/nds
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtransaction.go
192 lines (175 loc) · 6.03 KB
/
transaction.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
package nds
import (
"sync"
"golang.org/x/net/context"
"cloud.google.com/go/datastore"
"github.com/bradfitz/gomemcache/memcache"
)
var transactionKey = "used for *transaction"
type transaction struct {
sync.Mutex
tx *datastore.Transaction
lockMemcacheItems []*memcache.Item
}
func transactionFromContext(c context.Context) (*transaction, bool) {
tx, ok := c.Value(&transactionKey).(*transaction)
return tx, ok
}
// NewTransaction starts a new transaction.
//func NewTransaction(ctx context.Context, opts ...TransactionOption) (*Transaction, error) {
// for _, o := range opts {
// if _, ok := o.(maxAttempts); ok {
// return nil, errors.New("datastore: NewTransaction does not accept MaxAttempts option")
// }
// }
// req := &pb.BeginTransactionRequest{
// ProjectId: c.dataset,
// }
// resp, err := c.client.BeginTransaction(ctx, req)
// if err != nil {
// return nil, err
// }
//
// return &Transaction{
// id: resp.Transaction,
// ctx: ctx,
// client: c,
// mutations: nil,
// pending: make(map[int]*PendingKey),
// }, nil
//}
// RunInTransaction runs f in a transaction. f is invoked with a Transaction
// that f should use for all the transaction's datastore operations.
//
// f must not call Commit or Rollback on the provided Transaction.
//
// If f returns nil, RunInTransaction commits the transaction,
// returning the Commit and a nil error if it succeeds. If the commit fails due
// to a conflicting transaction, RunInTransaction retries f with a new
// Transaction. It gives up and returns ErrConcurrentTransaction after three
// failed attempts (or as configured with MaxAttempts).
//
// If f returns non-nil, then the transaction will be rolled back and
// RunInTransaction will return the same error. The function f is not retried.
//
// Note that when f returns, the transaction is not committed. Calling code
// must not assume that any of f's changes have been committed until
// RunInTransaction returns nil.
//
// Since f may be called multiple times, f should usually be idempotent – that
// is, it should have the same result when called multiple times. Note that
// Transaction.Get will append when unmarshalling slice fields, so it is not
// necessarily idempotent.
//func RunInTransaction(ctx context.Context, f func(tx *Transaction) error, opts ...TransactionOption) (*Commit, error) {
// settings := newTransactionSettings(opts)
// for n := 0; n < settings.attempts; n++ {
// tx, err := c.NewTransaction(ctx)
// if err != nil {
// return nil, err
// }
// if err := f(tx); err != nil {
// tx.Rollback()
// return nil, err
// }
// if cmt, err := tx.Commit(); err != ErrConcurrentTransaction {
// return cmt, err
// }
// }
// return nil, ErrConcurrentTransaction
//}
// Commit applies the enqueued operations atomically.
func (t *transaction) Commit() (*datastore.Commit, error) {
t.Lock()
defer t.Unlock()
memcacheSetMulti(nil, t.lockMemcacheItems)
return t.tx.Commit()
}
// Rollback abandons a pending transaction.
func (t *transaction) Rollback() error {
return t.tx.Rollback()
}
// Get is the transaction-specific version of the package function Get.
// All reads performed during the transaction will come from a single consistent
// snapshot. Furthermore, if the transaction is set to a serializable isolation
// level, another transaction cannot concurrently modify the data that is read
// or modified by this transaction.
func (t *transaction) Get(key *datastore.Key, dst interface{}) error {
return Get(nil, key, dst)
}
// GetMulti is a batch version of Get.
func (t *transaction) GetMulti(keys []*datastore.Key, dst interface{}) error {
return GetMulti(nil, keys, dst)
}
// Put is the transaction-specific version of the package function Put.
//
// Put returns a PendingKey which can be resolved into a Key using the
// return value from a successful Commit. If key is an incomplete key, the
// returned pending key will resolve to a unique key generated by the
// datastore.
func (t *transaction) Put(key *datastore.Key, src interface{}) (*datastore.PendingKey, error) {
keys := []*datastore.Key{key}
pendingKeys, err := t.tx.PutMulti(keys, src)
if err != nil {
if me, ok := err.(datastore.MultiError); ok {
return nil, me[0]
}
return nil, err
}
return pendingKeys[0], nil
}
// PutMulti is a batch version of Put. One PendingKey is returned for each
// element of src in the same order.
func (t *transaction) PutMulti(keys []*datastore.Key, src interface{}) ([]*datastore.PendingKey, error) {
lockMemcacheKeys := make([]string, 0, len(keys))
lockMemcacheItems := make([]*memcache.Item, 0, len(keys))
for _, key := range keys {
if !key.Incomplete() {
item := &memcache.Item{
Key: createMemcacheKey(key),
Flags: lockItem,
Value: itemLock(),
Expiration: memcacheLockTime,
}
lockMemcacheItems = append(lockMemcacheItems, item)
lockMemcacheKeys = append(lockMemcacheKeys, item.Key)
}
}
t.Lock()
t.lockMemcacheItems = append(t.lockMemcacheItems,
lockMemcacheItems...)
t.Unlock()
return t.tx.PutMulti(keys, src)
}
// Delete is the transaction-specific version of the package function Delete.
// Delete enqueues the deletion of the entity for the given key, to be
// committed atomically upon calling Commit.
func (t *transaction) Delete(key *datastore.Key) error {
err := t.tx.DeleteMulti([]*datastore.Key{key})
if me, ok := err.(datastore.MultiError); ok {
return me[0]
}
return err
}
// DeleteMulti is a batch version of Delete.
func (t *transaction) DeleteMulti(keys []*datastore.Key) error {
lockMemcacheItems := []*memcache.Item{}
for _, key := range keys {
// Worst case scenario is that we lock the entity for memcacheLockTime.
// datastore.Delete will raise the appropriate error.
if key == nil || key.Incomplete() {
continue
}
item := &memcache.Item{
Key: createMemcacheKey(key),
Flags: lockItem,
Value: itemLock(),
Expiration: memcacheLockTime,
}
lockMemcacheItems = append(lockMemcacheItems, item)
}
t.Lock()
t.lockMemcacheItems = append(t.lockMemcacheItems,
lockMemcacheItems...)
t.Unlock()
return t.tx.DeleteMulti(keys)
}