diff --git a/cfg.example.json b/cfg.example.json index 598a25d..1af6104 100644 --- a/cfg.example.json +++ b/cfg.example.json @@ -13,7 +13,8 @@ "hbs": { "servers": ["127.0.0.1:6030"], "timeout": 300, - "interval": 60 + "interval": 60, + "callTimeout": 5 }, "alarm": { "enabled": true, diff --git a/cron/strategy.go b/cron/strategy.go index c5e8b69..dbc5450 100644 --- a/cron/strategy.go +++ b/cron/strategy.go @@ -20,7 +20,7 @@ func SyncStrategies() { func syncStrategies() { var strategiesResponse model.StrategiesResponse - err := g.HbsClient.Call("Hbs.GetStrategies", model.NullRpcRequest{}, &strategiesResponse) + err := g.HbsClient.CallTimeout("Hbs.GetStrategies", model.NullRpcRequest{}, &strategiesResponse, time.Duration(g.Config().Hbs.CallTimeout)*time.Second) if err != nil { log.Println("[ERROR] Hbs.GetStrategies:", err) return @@ -54,7 +54,7 @@ func rebuildStrategyMap(strategiesResponse *model.StrategiesResponse) { func syncExpression() { var expressionResponse model.ExpressionResponse - err := g.HbsClient.Call("Hbs.GetExpressions", model.NullRpcRequest{}, &expressionResponse) + err := g.HbsClient.CallTimeout("Hbs.GetExpressions", model.NullRpcRequest{}, &expressionResponse, time.Duration(g.Config().Hbs.CallTimeout)*time.Second) if err != nil { log.Println("[ERROR] Hbs.GetExpressions:", err) return diff --git a/g/cfg.go b/g/cfg.go index 74f70b4..93ab404 100644 --- a/g/cfg.go +++ b/g/cfg.go @@ -18,9 +18,10 @@ type RpcConfig struct { } type HbsConfig struct { - Servers []string `json:"servers"` - Timeout int64 `json:"timeout"` - Interval int64 `json:"interval"` + Servers []string `json:"servers"` + Timeout int64 `json:"timeout"` + Interval int64 `json:"interval"` + CallTimeout int64 `json:"callTimeout"` } type RedisConfig struct { diff --git a/g/g.go b/g/g.go index c6f4dfd..49d458f 100644 --- a/g/g.go +++ b/g/g.go @@ -8,8 +8,9 @@ import ( // change log // 2.0.1: bugfix HistoryData limit // 2.0.2: clean stale data +// 2.0.3: add timeout to sync strategies and expressions const ( - VERSION = "2.0.2" + VERSION = "2.0.3" ) func init() { diff --git a/g/rpc.go b/g/rpc.go index f37c272..55db9cd 100644 --- a/g/rpc.go +++ b/g/rpc.go @@ -1,17 +1,15 @@ package g import ( - "github.com/toolkits/net" "log" "math" - "net/rpc" "sync" "time" ) type SingleConnRpcClient struct { sync.Mutex - rpcClient *rpc.Client + rpcClient *TimeoutRpcClient RpcServers []string Timeout time.Duration } @@ -37,7 +35,7 @@ func (this *SingleConnRpcClient) insureConn() { } for _, s := range this.RpcServers { - this.rpcClient, err = net.JsonRpcClient("tcp", s, this.Timeout) + this.rpcClient, err = NewTimeoutRpcClient("tcp", s, this.Timeout) if err == nil { return } @@ -69,3 +67,18 @@ func (this *SingleConnRpcClient) Call(method string, args interface{}, reply int return err } + +func (this *SingleConnRpcClient) CallTimeout(method string, args interface{}, reply interface{}, timeout time.Duration) error { + + this.Lock() + defer this.Unlock() + + this.insureConn() + + err := this.rpcClient.CallTimeout(method, args, reply, timeout) + if err != nil { + this.close() + } + + return err +} diff --git a/g/timeout_rpc_client.go b/g/timeout_rpc_client.go new file mode 100644 index 0000000..c300be8 --- /dev/null +++ b/g/timeout_rpc_client.go @@ -0,0 +1,35 @@ +package g + +import ( + "time" + "errors" + "net" + "net/rpc" + "net/rpc/jsonrpc" +) + +type TimeoutRpcClient struct { + *rpc.Client +} + +func NewTimeoutRpcClient(network, address string, timeout time.Duration) (*TimeoutRpcClient, error) { + conn, err := net.DialTimeout(network, address, timeout) + if err != nil { + return nil, err + } + + return &TimeoutRpcClient { + Client: jsonrpc.NewClient(conn), + }, nil +} + +func (this *TimeoutRpcClient) CallTimeout(serviceMethod string, args interface{}, reply interface{}, timeout time.Duration) error { + call := this.Go(serviceMethod, args, reply, make(chan *rpc.Call, 1)) + + select { + case callPtr := <-call.Done: + return callPtr.Error + case <-time.After(timeout): + return errors.New("rpc call timeout") + } +}