This repository has been archived by the owner on Oct 12, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 52
/
batch_disposition.go
101 lines (90 loc) · 2.45 KB
/
batch_disposition.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package servicebus
import (
"context"
"fmt"
"github.com/Azure/azure-amqp-common-go/v3/uuid"
)
type (
// MessageStatus defines an acceptable Message disposition status.
MessageStatus dispositionStatus
// BatchDispositionIterator provides an iterator over LockTokenIDs
BatchDispositionIterator struct {
LockTokenIDs []*uuid.UUID
Status MessageStatus
cursor int
}
// BatchDispositionError is an error which returns a collection of DispositionError.
BatchDispositionError struct {
Errors []DispositionError
}
// DispositionError is an error associated with a LockTokenID.
DispositionError struct {
LockTokenID *uuid.UUID
err error
}
)
const (
// Complete exposes completedDisposition
Complete MessageStatus = MessageStatus(completedDisposition)
// Abort exposes abandonedDisposition
Abort MessageStatus = MessageStatus(abandonedDisposition)
)
func (bde BatchDispositionError) Error() string {
msg := ""
if len(bde.Errors) != 0 {
msg = fmt.Sprintf("Operation failed, %d error(s) reported.", len(bde.Errors))
}
return msg
}
func (de DispositionError) Error() string {
return de.err.Error()
}
// UnWrap will return the private error.
func (de DispositionError) UnWrap() error {
return de.err
}
// Done communicates whether there are more messages remaining to be iterated over.
func (bdi *BatchDispositionIterator) Done() bool {
return len(bdi.LockTokenIDs) == bdi.cursor
}
// Next iterates to the next LockToken
func (bdi *BatchDispositionIterator) Next() (uuid *uuid.UUID) {
if done := bdi.Done(); !done {
uuid = bdi.LockTokenIDs[bdi.cursor]
bdi.cursor++
}
return uuid
}
func (bdi *BatchDispositionIterator) doUpdate(ctx context.Context, ec entityConnector) *BatchDispositionError {
var batchError *BatchDispositionError
for !bdi.Done() {
if id := bdi.Next(); id != nil {
m := &Message{
LockToken: id,
}
m.ec = ec
err := m.sendDisposition(ctx, bdi.Status)
if err != nil {
if batchError == nil {
batchError = new(BatchDispositionError)
}
batchError.Errors = append(batchError.Errors, DispositionError{
LockTokenID: id,
err: err,
})
}
}
}
return batchError
}
func (m *Message) sendDisposition(ctx context.Context, dispositionStatus MessageStatus) (err error) {
switch dispositionStatus {
case Complete:
err = m.Complete(ctx)
case Abort:
err = m.Abandon(ctx)
default:
err = fmt.Errorf("unsupported bulk disposition status %q", dispositionStatus)
}
return err
}