-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmain.go
146 lines (112 loc) · 4.21 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package main
import (
"fmt"
"log"
"net"
"os"
"path"
"time"
"google.golang.org/grpc/metadata"
"github.com/klayoracle/klayoracle-monorepo/node/protonode"
"github.com/klayoracle/klayoracle-monorepo/data-provider/adapter"
"github.com/klayoracle/klayoracle-monorepo/data-provider/boot"
"github.com/klayoracle/klayoracle-monorepo/data-provider/config"
"golang.org/x/net/context"
)
func main() {
wd, err := os.Getwd()
if err != nil {
log.Fatal("cannot determine working directory: ", err)
}
if os.Getenv("WORK_DIR") != "" {
wd = os.Getenv("WORK_DIR")
}
//Load config, logger e.t.c
boot.Boot(wd, path.Join(wd, "config.yaml"), path.Join(wd, ".env"))
config.Loaded.Logger.Info("Working directory: ", wd)
dp := adapter.NewDataProvider()
//Start Data Provider service
s, err := dp.NewDataProviderService()
if err != nil {
config.Loaded.Logger.Fatal(err)
}
ctx, cancel := context.WithCancel(context.Background())
lis, err := net.Listen("tcp", os.Getenv("HOST_IP"))
if err != nil {
config.Loaded.Logger.Fatal("failed to listen: %v", err)
}
//Serve DP on port os.Getenv("HOST_IP")
go func() {
if err := s.Serve(lis); err != nil {
cancel()
config.Loaded.Logger.Fatal("failed to serve: %v", err)
}
}()
//Create client and start handshake to Node Service
go func() {
hConn, err := dp.HandShake() //handshake connection
defer func() {
err = hConn.Close()
if err != nil {
config.Loaded.Logger.Warnw("cannot close connection", "error", err)
}
}()
if err != nil {
cancel()
config.Loaded.Logger.Fatal("DP Handshake: ", err)
}
adapters := adapter.ListAdapters(true)
config.Loaded.Logger.Infow("send feeds to node", "data provider", os.Getenv("HOST_IP"), "node", config.Loaded.ServiceNode, "total", len(adapters))
for _, adapterCfg := range adapters {
adapterCfg := adapterCfg
go func() {
ticker := time.NewTicker(time.Duration(adapterCfg.Frequency))
for t := range ticker.C {
config.Loaded.Logger.Infow("sending adapter request to service node", "timer", t, "data provider", os.Getenv("HOST_IP"), "node", config.Loaded.ServiceNode, "adapter", adapterCfg.AdapterId, "name", adapterCfg.Name)
func() {
cfg, conn, err := adapter.NewNodeServiceClient()
defer func() {
err = conn.Close()
if err != nil {
config.Loaded.Logger.Warnw("cannot close connection", "error", err)
}
}()
if err != nil {
config.Loaded.Logger.Warnw("error sending adapter request to service node", "data provider", os.Getenv("HOST_IP"), "node", config.Loaded.ServiceNode, "error", err)
}
client := cfg.(protonode.NodeServiceClient)
ctx = context.Background()
md := metadata.Pairs("provider", os.Getenv("HOST_IP"))
ctx = metadata.NewOutgoingContext(ctx, md)
nodeAdapter := new(protonode.Adapter)
err = adapter.CastBtwDPInfo(adapterCfg, nodeAdapter)
if err != nil {
config.Loaded.Logger.Warnw("error sending adapter request to service node", "data provider", os.Getenv("HOST_IP"), "node", config.Loaded.ServiceNode, "error", err)
} else {
status, err := client.QueueJob(ctx, nodeAdapter)
if err != nil || status.Status == 1 {
config.Loaded.Logger.Warnw("error sending adapter request to service node", "data provider", os.Getenv("HOST_IP"), "node", config.Loaded.ServiceNode, "error", err)
//re-attempt HandShake with node again until reconnect
if hConn != nil {
_ = hConn.Close()
}
config.Loaded.Logger.Infow("re attempting handshake to node after 60secs", "node", config.Loaded.ServiceNode)
hConn, err = dp.HandShake() //handshake connection
if err != nil {
config.Loaded.Logger.Infow("could not re-established connection with node", "data provider", os.Getenv("HOST_IP"), "node", config.Loaded.ServiceNode, "error", err)
} else {
config.Loaded.Logger.Infow("connection re-established with node", "data provider", os.Getenv("HOST_IP"), "node", config.Loaded.ServiceNode)
}
}
}
}()
}
}()
}
}()
for range ctx.Done() {
s.Stop() //Don't take chances with resources and be sure DP Service closes
fmt.Println("data provider operation... exited")
return
}
}