-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathorder.go
172 lines (149 loc) · 3.44 KB
/
order.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package coinfactory
import (
"sync"
"time"
"github.com/sinisterminister/coinfactory/pkg/binance"
log "github.com/sirupsen/logrus"
)
// Order contains the state of the order
type Order struct {
OrderRequest
orderStatus binance.OrderStatusResponse
orderCreationTime time.Time
orderID int
orderMutex *sync.Mutex
doneChan chan int
stopChan <-chan bool
updateChannels []chan bool
}
type orderBuilder struct {
request OrderRequest
status binance.OrderStatusResponse
creationTime time.Time
id int
stopChan <-chan bool
}
func (ob *orderBuilder) withStatus(status binance.OrderStatusResponse) *orderBuilder {
ob.status = status
return ob
}
func (ob *orderBuilder) withCreationTime(creationTime time.Time) *orderBuilder {
ob.creationTime = creationTime
return ob
}
func (ob *orderBuilder) withID(id int) *orderBuilder {
ob.id = id
return ob
}
func (ob *orderBuilder) build() *Order {
order := &Order{
ob.request,
ob.status,
ob.creationTime,
ob.id,
&sync.Mutex{},
make(chan int),
ob.stopChan,
[]chan bool{},
}
go order.orderStatusHandler(order.stopChan)
return order
}
func newOrderBuilder(request OrderRequest, stopChan <-chan bool) *orderBuilder {
return &orderBuilder{
request: request,
stopChan: stopChan,
}
}
// GetStatus
func (o *Order) GetStatus() binance.OrderStatusResponse {
o.orderMutex.Lock()
defer o.orderMutex.Unlock()
return o.orderStatus
}
// GetAge returns the age of the order
func (o *Order) GetAge() time.Duration {
o.orderMutex.Lock()
defer o.orderMutex.Unlock()
return time.Since(o.orderCreationTime)
}
func (o *Order) GetCreationTime() time.Time {
o.orderMutex.Lock()
defer o.orderMutex.Unlock()
return o.orderCreationTime
}
func (o *Order) GetDoneChan() <-chan int {
return o.doneChan
}
func (o *Order) GetUpdateChan() <-chan bool {
ch := make(chan bool)
// Add the channel to registry
o.orderMutex.Lock()
o.updateChannels = append(o.updateChannels, ch)
o.orderMutex.Unlock()
return ch
}
func (order *Order) orderStatusHandler(stopChan <-chan bool) {
orderStream := getUserDataStreamService().GetOrderUpdateStream(stopChan)
for {
// Bail if stopchan is closed
select {
case <-stopChan:
return
default:
}
select {
case <-stopChan:
return
case data := <-orderStream:
if data.OrderID == order.orderID {
log.Info("Updating order status")
order.orderMutex.Lock()
switch data.CurrentOrderStatus {
case "FILLED":
fallthrough
case "CANCELED":
fallthrough
case "REJECTED":
fallthrough
case "EXPIRED":
select {
default:
close(order.doneChan)
case <-order.doneChan:
}
}
// Build an order status
status := binance.OrderStatusResponse{
order.Symbol,
order.orderID,
data.ClientOrderID,
order.Price,
data.OrderQuantity,
data.FilledQuantity,
data.CurrentOrderStatus,
data.TimeInForce,
data.OrderType,
data.Side,
data.StopPrice.String(),
data.IcebergQuantity,
data.OrderCreationTime,
data.EventTime,
data.IsWorking,
}
order.orderStatus = status
// Let all the channels know
for _, ch := range order.updateChannels {
select {
case ch <- true:
// Let the channel know that the order was updated
default:
// Skip blocked channel
log.Warn("skipping blocked order status channel")
}
}
order.orderMutex.Unlock()
}
}
}
}