diff --git a/gnmi_cli_py/py_gnmicli.py b/gnmi_cli_py/py_gnmicli.py index 062dee7b..7152f13f 100644 --- a/gnmi_cli_py/py_gnmicli.py +++ b/gnmi_cli_py/py_gnmicli.py @@ -24,9 +24,7 @@ - Auto-loads Target cert from Target if not specified - User/password based authentication - Certifificate based authentication - -Current unsupported gNMI features: -- Subscribe +- Subscribe request """ from __future__ import absolute_import @@ -40,14 +38,16 @@ import ssl import sys import six +import datetime try: import gnmi_pb2 except ImportError: print('ERROR: Ensure you\'ve installed dependencies from requirements.txt\n' 'eg, pip install -r requirements.txt') import gnmi_pb2_grpc +import grpc -__version__ = '0.4' +__version__ = '0.5' _RE_PATH_COMPONENT = re.compile(r''' ^ @@ -143,6 +143,21 @@ def _create_parser(): required=False, action='store_true') parser.add_argument('-n', '--notls', help='gRPC insecure mode', required=False, action='store_true') + parser.add_argument('--interval', default=10000, type=int, + help='sample interval in millisecond (default: 10000ms)') + parser.add_argument('--timeout', type=int, help='subscription' + 'duration in seconds (default: none)') + parser.add_argument('--heartbeat', default=0, type=int, help='heartbeat interval (default: None)') + parser.add_argument('--aggregate', action='store_true', help='allow aggregation') + parser.add_argument('--suppress', action='store_true', help='suppress redundant') + parser.add_argument('--submode', default=2, type=int, + help='subscription mode [0=TARGET_DEFINED, 1=ON_CHANGE, 2=SAMPLE]') + parser.add_argument('--update_count', default=0, type=int, help='Max number of streaming updates to receive. 0 means no limit.') + parser.add_argument('--subscribe_mode', default=0, type=int, help='[0=STREAM, 1=ONCE, 2=POLL]') + parser.add_argument('--encoding', default=0, type=int, help='[0=JSON, 1=BYTES, 2=PROTO, 3=ASCII, 4=JSON_IETF]') + parser.add_argument('--qos', default=0, type=int, help='') + parser.add_argument('--use_alias', action='store_true', help='use alias') + parser.add_argument('--prefix', default='', help='gRPC path prefix (default: none)') return parser @@ -353,6 +368,79 @@ def _open_certs(**kwargs): return kwargs +def gen_request(paths, opt, prefix): + """Create subscribe request for passed xpath. + Args: + paths: (str) gNMI path. + opt: (dict) Command line argument passed for subscribe reqeust. + Returns: + gNMI SubscribeRequest object. + """ + mysubs = [] + mysub = gnmi_pb2.Subscription(path=paths, mode=opt["submode"], + sample_interval=opt["interval"]*1000000, + heartbeat_interval=opt['heartbeat']*1000000, + suppress_redundant=opt['suppress']) + mysubs.append(mysub) + + if prefix: + myprefix = prefix + elif opt["prefix"]: + myprefix = _parse_path(_path_names(opt["prefix"])) + else: + myprefix = None + + if opt["qos"]: + myqos = gnmi_pb2.QOSMarking(marking=opt["qos"]) + else: + myqos = None + mysblist = gnmi_pb2.SubscriptionList(prefix=myprefix, mode=opt['subscribe_mode'], + allow_aggregation=opt['aggregate'], encoding=opt['encoding'], + subscription=mysubs, use_aliases=opt['use_alias'], qos=myqos) + mysubreq = gnmi_pb2.SubscribeRequest(subscribe=mysblist) + + print('Sending SubscribeRequest\n'+str(mysubreq)) + yield mysubreq + + +def subscribe_start(stub, options, req_iterator): + """ RPC Start for Subscribe reqeust + Args: + stub: (class) gNMI Stub used to build the secure channel. + options: (dict) Command line argument passed for subscribe reqeust. + req_iterator: gNMI Subscribe Request from gen_request. + Returns: + Start Subscribe and printing response of gNMI Subscribe Response. + """ + metadata = [('username', options['username']), ('password', options['password'])] + max_update_count = options["update_count"] + try: + responses = stub.Subscribe(req_iterator, options['timeout'], metadata=metadata) + update_count = 0 + for response in responses: + print('{0} response received: '.format(datetime.datetime.now())) + if response.HasField('sync_response'): + print(str(response)) + elif response.HasField('error'): + print('gNMI Error '+str(response.error.code)+\ + ' received\n'+str(response.error.message) + str(response.error)) + elif response.HasField('update'): + print(response) + update_count = update_count+1 + else: + print('Unknown response received:\n'+str(response)) + + if max_update_count != 0 and update_count == max_update_count: + print("Max update count reached {0}".format(update_count)) + break + except KeyboardInterrupt: + print("Subscribe Session stopped by user.") + except grpc.RpcError as x: + print("grpc.RpcError received:\n%s" %x) + except Exception as err: + print(err) + + def main(): argparser = _create_parser() args = vars(argparser.parse_args()) @@ -414,9 +502,9 @@ def main(): response = _set(stub, paths, 'delete', user, password, json_value) print('The SetRequest response is below\n' + '-'*25 + '\n', response) elif mode == 'subscribe': - print('This mode not available in this version') - sys.exit() + request_iterator = gen_request(paths, args, prefix) + subscribe_start(stub, args, request_iterator) if __name__ == '__main__': - main() + main() \ No newline at end of file