Skip to content

Commit

Permalink
Enable etcd plugin
Browse files Browse the repository at this point in the history
This patch includes the following changes:
1. Add bulk interface for etcd storage plugin
2. Enhance handshake protocol for goconserver console connection
3. Use lease, lock, watch from etcd to support goconserver cluster
4. Console session redirection among goconserver cluster
5. A dispatcher to select the goconserver host and could help
   balance the workload when enrolling node(s) in goconserver service.
  • Loading branch information
chenglch committed May 17, 2018
1 parent f3dba5e commit bf2e350
Show file tree
Hide file tree
Showing 19 changed files with 1,444 additions and 658 deletions.
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,14 @@ source /etc/profile.d/congo.sh
congo list
```

### Start goconserver with xcat

Currently `xcat` and `goconserver` are integrated with `file` storage type.
Some reference doc could be found at

- [rcons](http://xcat-docs.readthedocs.io/en/latest/guides/admin-guides/manage_clusters/ppc64le/management/basic/rcons.html)
- [gocons](http://xcat-docs.readthedocs.io/en/latest/advanced/goconserver/index.html)

## Development

### Requirement
Expand All @@ -100,7 +108,7 @@ make install

Please refer to [ssl](/scripts/ssl/)

### Web Interface (ongoing)
### Web Interface

Setup nodejs(9.0+) and npm(5.6.0+) toolkit at first. An example steps could be
found at [node env](/frontend/). Then follow the steps below:
Expand Down
24 changes: 20 additions & 4 deletions api/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,11 @@ func (api *NodeApi) put(w http.ResponseWriter, req *http.Request) {
nodes := make([]string, 0, 1)
nodes = append(nodes, vars["node"])
plog.InfoNode(vars["node"], fmt.Sprintf("The console state has been changed to %s.", state))
result := nodeManager.SetConsoleState(nodes, state)
result, err := nodeManager.SetConsoleState(nodes, state)
if err != nil {
plog.HandleHttp(w, req, http.StatusInternalServerError, err.Error())
return
}
if resp, err = json.Marshal(result); err != nil {
plog.HandleHttp(w, req, http.StatusInternalServerError, err.Error())
return
Expand Down Expand Up @@ -131,7 +135,11 @@ func (api *NodeApi) bulkPut(w http.ResponseWriter, req *http.Request) {
for _, v := range storNodes["nodes"] {
nodes = append(nodes, v.Name)
}
result := nodeManager.SetConsoleState(nodes, state)
result, err := nodeManager.SetConsoleState(nodes, state)
if err != nil {
plog.HandleHttp(w, req, http.StatusInternalServerError, err.Error())
return
}
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
if resp, err = json.Marshal(result); err != nil {
plog.HandleHttp(w, req, http.StatusInternalServerError, err.Error())
Expand Down Expand Up @@ -191,7 +199,11 @@ func (api *NodeApi) bulkPost(w http.ResponseWriter, req *http.Request) {
plog.HandleHttp(w, req, http.StatusUnprocessableEntity, err.Error())
return
}
result := nodeManager.PostNodes(nodes)
result, err := nodeManager.PostNodes(nodes)
if err != nil {
plog.HandleHttp(w, req, http.StatusBadRequest, err.Error())
return
}
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
if resp, err = json.Marshal(result); err != nil {
plog.HandleHttp(w, req, http.StatusInternalServerError, err.Error())
Expand Down Expand Up @@ -234,7 +246,11 @@ func (api *NodeApi) bulkDelete(w http.ResponseWriter, req *http.Request) {
for _, node := range nodes["nodes"] {
names = append(names, node.Name)
}
result := nodeManager.DeleteNodes(names)
result, err := nodeManager.DeleteNodes(names)
if err != nil {
plog.HandleHttp(w, req, http.StatusBadRequest, err.Error())
return
}
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
if resp, err = json.Marshal(result); err != nil {
plog.HandleHttp(w, req, http.StatusInternalServerError, err.Error())
Expand Down
139 changes: 95 additions & 44 deletions benchmark/api.py → benchmark/benchapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
import time
import requests
import eventlet
import argparse
import traceback
import inspect

eventlet.monkey_patch(os=False)
import futurist
Expand All @@ -28,18 +31,32 @@ def wrap(*args, **kwargs):


class APItest(object):
def __init__(self, count):
def __init__(self, url, method, count):
self._executor = futurist.GreenThreadPoolExecutor(max_workers=1000)
self.url = url
self.method = method
self.count = count
if 'consoleserver_host' in os.environ:
self.host = os.environ["consoleserver_host"]

def run(self):
if self.method == 'all':
for func in inspect.getmembers(self):
if func[0].startswith('test_'):
getattr(self, func[0])()
elif hasattr(self, 'test_%s' % self.method):
getattr(self, 'test_%s' % self.method)()
else:
self.host = 'http://localhost:12429'
print('Could not find method test_%s' % self.method)

def list(self):
print("Test function list:\n")
for func in inspect.getmembers(self):
if func[0].startswith('test_'):
print(func[0])

@elaspe_run(desc="post")
def test_post(self):
def _test_post(data):
requests.post("%s/nodes" % self.host, data=json.dumps(data))
requests.post("%s/nodes" % self.url, data=json.dumps(data))

futures = []
for i in range(self.count):
Expand All @@ -62,12 +79,12 @@ def test_bulk_post(self):
'port': '22',
'private_key': '/Users/longcheng/.ssh/id_rsa'}}
data['nodes'].append(item)
requests.post("%s/bulk/nodes" % self.host, data=json.dumps(data))
requests.post("%s/bulk/nodes" % self.url, data=json.dumps(data))

@elaspe_run(desc="delete")
def test_delete(self):
def _test_delete(i):
requests.delete('%s/nodes/fakenode%d' % (self.host, i))
requests.delete('%s/nodes/fakenode%d' % (self.url, i))

futures = []
for i in range(self.count):
Expand All @@ -81,16 +98,16 @@ def test_bulk_delete(self):
for i in range(self.count):
item = {'name': 'fakenode%d' % i}
data['nodes'].append(item)
requests.delete('%s/bulk/nodes' % self.host, data=json.dumps(data))
requests.delete('%s/bulk/nodes' % self.url, data=json.dumps(data))

@elaspe_run(desc="list")
def test_list(self):
requests.get('%s/nodes' % self.host)
requests.get('%s/nodes' % self.url)

@elaspe_run(desc="put_on")
def test_put_on(self):
def _test_put(i):
requests.delete('%s/nodes/fakenode%d?state=on' % (self.host, i))
requests.delete('%s/nodes/fakenode%d?state=on' % (self.url, i))

futures = []
for i in range(self.count):
Expand All @@ -104,13 +121,13 @@ def test_bulk_put_on(self):
for i in range(self.count):
item = {'name': 'fakenode%d' % i}
data['nodes'].append(item)
requests.delete('%s/bulk/nodes?state=on' % self.host,
requests.delete('%s/bulk/nodes?state=on' % self.url,
data=json.dumps(data))

@elaspe_run(desc="put_off")
def test_put_off(self):
def _test_put(i):
requests.delete('%s/nodes/fakenode%d?state=off' % (self.host, i))
requests.delete('%s/nodes/fakenode%d?state=off' % (self.url, i))

futures = []
for i in range(self.count):
Expand All @@ -124,13 +141,13 @@ def test_bulk_put_off(self):
for i in range(self.count):
item = {'name': 'fakenode%d' % i}
data['nodes'].append(item)
requests.delete('%s/bulk/nodes?state=off' % self.host,
requests.delete('%s/bulk/nodes?state=off' % self.url,
data=json.dumps(data))

@elaspe_run(desc="show")
def test_show(self):
def _test_get(i):
requests.get('%s/nodes/fakenode%d' % (self.host, i))
requests.get('%s/nodes/fakenode%d' % (self.url, i))

futures = []
for i in range(self.count):
Expand All @@ -139,34 +156,68 @@ def _test_get(i):
waiters.wait_for_all(futures, 3600)


class BenchmarkAPI(object):
def get_base_parser(self):
parser = argparse.ArgumentParser(
prog='benchapi',
epilog='See "benchapi help COMMAND" '
'for help on a specific command.',
add_help=False,
formatter_class=HelpFormatter,
)
parser.add_argument('-h', '--help',
action='store_true',
help=argparse.SUPPRESS)
parser.add_argument('-m', '--method',
help="The rest api method, could be post, delete, "
"bulk_post, bulk_delete, all",
default='all',
type=str)
parser.add_argument('-c', '--count',
help="The number of nodes in the test case",
type=int,
default=1)
parser.add_argument('--url',
help="http url of goconserver master",
type=str,
default='http://localhost:12429')
parser.add_argument('-l', '--list',
help="List the test functions for benchmark api",
action='store_true')
return parser

def do_help(self, args):
self.parser.print_help()

def main(self, argv):
self.parser = self.get_base_parser()
(options, args) = self.parser.parse_known_args(argv)

if options.help:
self.do_help(options)
return 0

api = APItest(options.url, options.method, options.count)
if options.list:
api.list()
return 0

api.run()


class HelpFormatter(argparse.HelpFormatter):
def start_section(self, heading):
# Title-case the headings
heading = '%s%s' % (heading[0].upper(), heading[1:])
super(HelpFormatter, self).start_section(heading)


if __name__ == "__main__":
if len(sys.argv) == 3:
method = str(sys.argv[1])
count = int(sys.argv[2])
api = APItest(count)

if method == 'post':
api.test_post()
elif method == 'delete':
api.test_delete()
elif method == 'bulk_post':
api.test_bulk_post()
elif method == 'bulk_delete':
api.test_bulk_delete()
elif method == 'list':
api.test_list()
else:
print("Unsupport sub command %s." % method)
else:
count = 1000
api = APItest(count)
api.test_bulk_post()
api.test_list()
api.test_show()
api.test_put_on()
api.test_put_off()
api.test_bulk_put_on()
api.test_bulk_put_off()
api.test_bulk_delete()
api.test_post()
api.test_delete()
try:
BenchmarkAPI().main(sys.argv[1:])
except KeyboardInterrupt:
print("... terminating benchmark testing", file=sys.stderr)
sys.exit(130)
except Exception as e:
print(traceback.format_exc())
sys.exit(1)
27 changes: 18 additions & 9 deletions common/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,20 @@ type LoggerCfg struct {
UDP []UDPCfg `yaml:"udp"`
}

type EtcdCfg struct {
DailTimeout int `yaml:"dail_timeout"`
RequestTimeout int `yaml:"request_timeout"`
Endpoints string `yaml:"endpoints"`
ServiceHeartbeat int64 `yaml:"service_heartbeat"`
Prefix string `yaml: prefix`
SSLKeyFile string `yaml:"ssl_key_file"`
SSLCertFile string `yaml:"ssl_cert_file"`
SSLCACertFile string `yaml:"ssl_ca_cert_file"`
// vhost is the name registered in etcd service, by default it is the hostname
Vhost string `yaml:"vhost`
RpcPort string `yaml:"rpcport"`
}

type ServerConfig struct {
Global struct {
Host string `yaml:"host"`
Expand All @@ -110,15 +124,9 @@ type ServerConfig struct {
ClientTimeout int `yaml:"client_timeout"`
TargetTimeout int `yaml:"target_timeout"`
ReconnectInterval int `yaml:"reconnect_interval"`
RPCPort string `yaml:"rpcport"`
Loggers LoggerCfg `yaml:"logger"`
}
Etcd struct {
DailTimeout int `yaml:"dail_timeout"`
RequestTimeout int `yaml:"request_timeout"`
Endpoints string `yaml:"endpoints"`
ServerHeartbeat int64 `yaml:"server_heartbeat"`
}
Etcd EtcdCfg `yaml:"etcd"`
}

func InitServerConfig(confFile string) (*ServerConfig, error) {
Expand All @@ -139,12 +147,13 @@ func InitServerConfig(confFile string) (*ServerConfig, error) {
serverConfig.Console.ClientTimeout = 30
serverConfig.Console.TargetTimeout = 30
serverConfig.Console.ReconnectInterval = 5
serverConfig.Console.RPCPort = "12431" // only for async storage type
serverConfig.Etcd.RpcPort = "12431" // only for async storage type

serverConfig.Etcd.DailTimeout = 5
serverConfig.Etcd.RequestTimeout = 2
serverConfig.Etcd.Endpoints = "127.0.0.1:2379"
serverConfig.Etcd.ServerHeartbeat = 5
serverConfig.Etcd.ServiceHeartbeat = 30
serverConfig.Etcd.Prefix = "goconserver"
data, err := ioutil.ReadFile(confFile)
if err != nil {
return serverConfig, nil
Expand Down
10 changes: 6 additions & 4 deletions common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const (
HOST_NOT_EXIST
INVALID_TYPE
ETCD_NOT_INIT
ETCD_TRANSACTION_ERROR
SET_DEADLINE_ERROR
SEND_KEEPALIVE_ERROR
LOGGER_TYPE_ERROR
Expand All @@ -42,7 +43,7 @@ var (
ErrLocked = NewErr(LOCKED, "Locked")
ErrUnlocked = NewErr(UNLOCKED, "Unlocked")
ErrConnection = NewErr(CONNECTION_ERROR, "Could not connect")
ErrAlreadyExist = NewErr(ALREADY_EXIST, "Already exit")
ErrAlreadyExist = NewErr(ALREADY_EXIST, "Already exist")
ErrOutOfQuota = NewErr(OUTOF_QUATA, "Out of quota")
ErrTimeout = NewErr(TIMEOUT, "Timeout")
ErrLoggerType = NewErr(LOGGER_TYPE_ERROR, "Invalid logger type")
Expand All @@ -51,10 +52,11 @@ var (
ErrNotTerminal = NewErr(NOT_TERMINAL, "Not terminal")
ErrInvalidType = NewErr(INVALID_TYPE, "Invalid type")
// ssh
ErrSetDeadline = NewErr(SET_DEADLINE_ERROR, "failed to set deadline")
ErrSendKeepalive = NewErr(SEND_KEEPALIVE_ERROR, "failed to send keep alive")
ErrSetDeadline = NewErr(SET_DEADLINE_ERROR, "Failed to set deadline")
ErrSendKeepalive = NewErr(SEND_KEEPALIVE_ERROR, "Failed to send keep alive")
// etcd
ErrETCDNotInit = NewErr(ETCD_NOT_INIT, "Etcd is not initialized")
ErrEtcdUnInit = NewErr(ETCD_NOT_INIT, "Etcd is not initialized")
ErrETCDTransaction = NewErr(ETCD_TRANSACTION_ERROR, "Failed to submit etcd transaction")
)

func NewErr(code int, text string) *GoConsError {
Expand Down
5 changes: 1 addition & 4 deletions common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@ const (
TYPE_SHARE_LOCK
TYPE_EXCLUDE_LOCK

SLEEP_TICK = 100 // millisecond
ACTION_DELETE = 1
ACTION_PUT = 0
ACTION_NIL = -1
SLEEP_TICK = 100 // millisecond

Maxint32 = 1<<31 - 1

Expand Down
Loading

0 comments on commit bf2e350

Please sign in to comment.