From 454389245ff25f7611145e62cad34f03e9acd43b Mon Sep 17 00:00:00 2001 From: Dimitrios Psarrou Date: Tue, 11 May 2021 21:45:42 +0300 Subject: [PATCH 1/2] send messages to subscription channels in an non-blocking way --- bitvavo.go | 170 +++++++++++++++++++++++++++++------------------------ 1 file changed, 93 insertions(+), 77 deletions(-) diff --git a/bitvavo.go b/bitvavo.go index 2137ef8..0d9e9a3 100644 --- a/bitvavo.go +++ b/bitvavo.go @@ -1195,83 +1195,7 @@ func (bitvavo Bitvavo) handleMessage(ws *Websocket) { } ws.errChannel <- MyError{CustomError: t} } - if x["event"] == "authenticate" { - ws.authenticated = true - } else if x["event"] == "book" { - var t SubscriptionBookUpdate - err = json.Unmarshal(message, &t) - if handleError(err) { - return - } - market, _ := x["market"].(string) - if ws.subscriptionBookUpdateChannelMap[market] != nil { - ws.subscriptionBookUpdateChannelMap[market] <- t - } - if ws.keepLocalBook { - addToBook(t, ws) - } - } else if x["event"] == "trade" { - var t SubscriptionTrades - err = json.Unmarshal(message, &t) - if handleError(err) { - return - } - market, _ := x["market"].(string) - if ws.subscriptionTradesChannelMap[market] != nil { - ws.subscriptionTradesChannelMap[market] <- t - } - } else if x["event"] == "fill" { - var t SubscriptionAccountFill - err = json.Unmarshal(message, &t) - if handleError(err) { - return - } - market, _ := x["market"].(string) - if ws.subscriptionAccountFillChannelMap[market] != nil { - ws.subscriptionAccountFillChannelMap[market] <- t - } - } else if x["event"] == "order" { - var t SubscriptionAccountOrder - err = json.Unmarshal(message, &t) - if handleError(err) { - return - } - market, _ := x["market"].(string) - if ws.subscriptionAccountOrderChannelMap[market] != nil { - ws.subscriptionAccountOrderChannelMap[market] <- t - } - } else if x["event"] == "ticker" { - var t SubscriptionTicker - err = json.Unmarshal(message, &t) - if handleError(err) { - return - } - market, _ := x["market"].(string) - ws.subscriptionTickerChannelMap[market] <- t - } else if x["event"] == "ticker24h" { - var t SubscriptionTicker24h - err = json.Unmarshal(message, &t) - if handleError(err) { - return - } - for i := 0; i < len(t.Data); i++ { - ws.subscriptionTicker24hChannelMap[t.Data[i].Market] <- t.Data[i] - } - } else if x["event"] == "candle" { - var t PreCandle - err := json.Unmarshal(message, &t) - if err != nil { - return - } - var candles []Candle - for i := 0; i < len(t.Candle); i++ { - entry := reflect.ValueOf(t.Candle[i]) - candles = append(candles, Candle{Timestamp: int(entry.Index(0).Interface().(float64)), Open: entry.Index(1).Interface().(string), High: entry.Index(2).Interface().(string), Low: entry.Index(3).Interface().(string), Close: entry.Index(4).Interface().(string), Volume: entry.Index(5).Interface().(string)}) - } - market, _ := x["market"].(string) - interval, _ := x["interval"].(string) - ws.subscriptionCandlesChannelMap[market][interval] <- SubscriptionCandles{Event: t.Event, Market: t.Market, Interval: t.Interval, Candle: candles} - } + bitvavo.handleEvent(ws, x, message) if x["action"] == "getTime" { var t TimeResponse err = json.Unmarshal(message, &t) @@ -1452,6 +1376,98 @@ func (bitvavo Bitvavo) handleMessage(ws *Websocket) { errorToConsole("HandleMessage has ended, messages will no longer be received, please restart.") } +func (bitvavo Bitvavo) handleEvent(ws *Websocket, x map[string]interface{}, message []byte) { + var market string + if m, ok := x["market"]; ok { + market = m.(string) + } + if x["event"] == "authenticate" { + ws.authenticated = true + } else if x["event"] == "book" && ws.subscriptionBookUpdateChannelMap[market] != nil { + var t SubscriptionBookUpdate + err := json.Unmarshal(message, &t) + if handleError(err) { + return + } + select { + case ws.subscriptionBookUpdateChannelMap[market] <- t: + default: + } + if ws.keepLocalBook { + addToBook(t, ws) + } + } else if x["event"] == "trade" && ws.subscriptionTradesChannelMap[market] != nil { + var t SubscriptionTrades + err := json.Unmarshal(message, &t) + if handleError(err) { + return + } + select { + case ws.subscriptionTradesChannelMap[market] <- t: + default: + } + } else if x["event"] == "fill" && ws.subscriptionAccountFillChannelMap[market] != nil { + var t SubscriptionAccountFill + err := json.Unmarshal(message, &t) + if handleError(err) { + return + } + select { + case ws.subscriptionAccountFillChannelMap[market] <- t: + default: + } + } else if x["event"] == "order" && ws.subscriptionAccountOrderChannelMap[market] != nil { + var t SubscriptionAccountOrder + err := json.Unmarshal(message, &t) + if handleError(err) { + return + } + select { + case ws.subscriptionAccountOrderChannelMap[market] <- t: + default: + } + } else if x["event"] == "ticker" && ws.subscriptionTickerChannelMap[market] != nil { + var t SubscriptionTicker + err := json.Unmarshal(message, &t) + if handleError(err) { + return + } + select { + case ws.subscriptionTickerChannelMap[market] <- t: + default: + } + } else if x["event"] == "ticker24h" { + var t SubscriptionTicker24h + err := json.Unmarshal(message, &t) + if handleError(err) { + return + } + for i := 0; i < len(t.Data); i++ { + select { + case ws.subscriptionTicker24hChannelMap[t.Data[i].Market] <- t.Data[i]: + default: + } + } + } else if x["event"] == "candle" { + var t PreCandle + err := json.Unmarshal(message, &t) + if err != nil { + return + } + var candles []Candle + for i := 0; i < len(t.Candle); i++ { + entry := reflect.ValueOf(t.Candle[i]) + candles = append(candles, Candle{Timestamp: int(entry.Index(0).Interface().(float64)), Open: entry.Index(1).Interface().(string), High: entry.Index(2).Interface().(string), Low: entry.Index(3).Interface().(string), Close: entry.Index(4).Interface().(string), Volume: entry.Index(5).Interface().(string)}) + } + market, _ := x["market"].(string) + interval, _ := x["interval"].(string) + select { + case ws.subscriptionCandlesChannelMap[market][interval] <- SubscriptionCandles{Event: t.Event, Market: t.Market, Interval: t.Interval, Candle: candles}: + default: + } + } +} + func (bitvavo Bitvavo) InitWS() *websocket.Conn { bitvavo.reconnectTimer = 100 uri, _ := url.Parse(bitvavo.WsUrl) From d5246720ff239929989417e746686863f80be129 Mon Sep 17 00:00:00 2001 From: Dimitrios Psarrou Date: Tue, 11 May 2021 21:46:33 +0300 Subject: [PATCH 2/2] add unsubscribe methods --- bitvavo.go | 49 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/bitvavo.go b/bitvavo.go index 0d9e9a3..2e5303e 100644 --- a/bitvavo.go +++ b/bitvavo.go @@ -1883,3 +1883,52 @@ func (ws *Websocket) SubscriptionBook(market string, options map[string]string) ws.conn.WriteMessage(websocket.TextMessage, []byte(mySecondMessage)) return ws.subscriptionBookChannelMap[market] } + +func (ws *Websocket) UnsubscribeTicker(market string) { + delete(ws.subscriptionTickerOptionsMap, market) + options := SubscriptionTickerObject{Action: "unsubscribe", Channels: []SubscriptionTickAccSubObject{SubscriptionTickAccSubObject{Name: "ticker", Markets: []string{market}}}} + myMessage, _ := json.Marshal(options) + ws.conn.WriteMessage(websocket.TextMessage, []byte(myMessage)) +} + +func (ws *Websocket) UnsubscribeTicker24h(market string) { + delete(ws.subscriptionTicker24hOptionsMap, market) + options := SubscriptionTickerObject{Action: "unsubscribe", Channels: []SubscriptionTickAccSubObject{SubscriptionTickAccSubObject{Name: "ticker24h", Markets: []string{market}}}} + myMessage, _ := json.Marshal(options) + ws.conn.WriteMessage(websocket.TextMessage, []byte(myMessage)) +} + +func (ws *Websocket) UnsubscribeAccount(market string) { + delete(ws.subscriptionAccountOptionsMap, market) + options := SubscriptionTickerObject{Action: "unsubscribe", Channels: []SubscriptionTickAccSubObject{SubscriptionTickAccSubObject{Name: "account", Markets: []string{market}}}} + myMessage, _ := json.Marshal(options) + ws.conn.WriteMessage(websocket.TextMessage, []byte(myMessage)) +} + +func (ws *Websocket) UnsubscribeCandles(market string, interval string) { + delete(ws.subscriptionCandlesOptionsMap, market) + options := SubscriptionCandlesObject{Action: "unsubscribe", Channels: []SubscriptionCandlesSubObject{SubscriptionCandlesSubObject{Name: "candles", Interval: []string{interval}, Markets: []string{market}}}} + myMessage, _ := json.Marshal(options) + ws.conn.WriteMessage(websocket.TextMessage, []byte(myMessage)) +} + +func (ws *Websocket) UnsubscribeTrades(market string) { + delete(ws.subscriptionTradesOptionsMap, market) + options := SubscriptionTradesBookObject{Action: "unsubscribe", Channels: []SubscriptionTradesBookSubObject{SubscriptionTradesBookSubObject{Name: "trades", Markets: []string{market}}}} + myMessage, _ := json.Marshal(options) + ws.conn.WriteMessage(websocket.TextMessage, []byte(myMessage)) +} + +func (ws *Websocket) UnsubscribeBookUpdate(market string) { + delete(ws.subscriptionBookUpdateOptionsMap, market) + options := SubscriptionTradesBookObject{Action: "unsubscribe", Channels: []SubscriptionTradesBookSubObject{SubscriptionTradesBookSubObject{Name: "book", Markets: []string{market}}}} + myMessage, _ := json.Marshal(options) + ws.conn.WriteMessage(websocket.TextMessage, []byte(myMessage)) +} + +func (ws *Websocket) UnsubscribeBook(market string) { + delete(ws.subscriptionBookOptionsSecondMap, market) + options := SubscriptionTradesBookObject{Action: "unsubscribe", Channels: []SubscriptionTradesBookSubObject{SubscriptionTradesBookSubObject{Name: "book", Markets: []string{market}}}} + myMessage, _ := json.Marshal(options) + ws.conn.WriteMessage(websocket.TextMessage, []byte(myMessage)) +} \ No newline at end of file