Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for subscribe mode (#1) #1

Merged
merged 1 commit into from
Dec 2, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 95 additions & 7 deletions gnmi_cli_py/py_gnmicli.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@
- Auto-loads Target cert from Target if not specified
- User/password based authentication
- Certifificate based authentication

Current unsupported gNMI features:
- Subscribe
- Subscribe request
"""

from __future__ import absolute_import
Expand All @@ -40,14 +38,16 @@
import ssl
import sys
import six
import datetime
try:
import gnmi_pb2
except ImportError:
print('ERROR: Ensure you\'ve installed dependencies from requirements.txt\n'
'eg, pip install -r requirements.txt')
import gnmi_pb2_grpc
import grpc

__version__ = '0.4'
__version__ = '0.5'

_RE_PATH_COMPONENT = re.compile(r'''
^
Expand Down Expand Up @@ -143,6 +143,21 @@ def _create_parser():
required=False, action='store_true')
parser.add_argument('-n', '--notls', help='gRPC insecure mode',
required=False, action='store_true')
parser.add_argument('--interval', default=10000, type=int,
help='sample interval in millisecond (default: 10000ms)')
parser.add_argument('--timeout', type=int, help='subscription'
'duration in seconds (default: none)')
parser.add_argument('--heartbeat', default=0, type=int, help='heartbeat interval (default: None)')
parser.add_argument('--aggregate', action='store_true', help='allow aggregation')
parser.add_argument('--suppress', action='store_true', help='suppress redundant')
parser.add_argument('--submode', default=2, type=int,
help='subscription mode [0=TARGET_DEFINED, 1=ON_CHANGE, 2=SAMPLE]')
parser.add_argument('--update_count', default=0, type=int, help='Max number of streaming updates to receive. 0 means no limit.')
parser.add_argument('--subscribe_mode', default=0, type=int, help='[0=STREAM, 1=ONCE, 2=POLL]')
parser.add_argument('--encoding', default=0, type=int, help='[0=JSON, 1=BYTES, 2=PROTO, 3=ASCII, 4=JSON_IETF]')
parser.add_argument('--qos', default=0, type=int, help='')
parser.add_argument('--use_alias', action='store_true', help='use alias')
parser.add_argument('--prefix', default='', help='gRPC path prefix (default: none)')
return parser


Expand Down Expand Up @@ -353,6 +368,79 @@ def _open_certs(**kwargs):
return kwargs


def gen_request(paths, opt, prefix):
"""Create subscribe request for passed xpath.
Args:
paths: (str) gNMI path.
opt: (dict) Command line argument passed for subscribe reqeust.
Returns:
gNMI SubscribeRequest object.
"""
mysubs = []
mysub = gnmi_pb2.Subscription(path=paths, mode=opt["submode"],
sample_interval=opt["interval"]*1000000,
heartbeat_interval=opt['heartbeat']*1000000,
suppress_redundant=opt['suppress'])
mysubs.append(mysub)

if prefix:
myprefix = prefix
elif opt["prefix"]:
myprefix = _parse_path(_path_names(opt["prefix"]))
else:
myprefix = None

if opt["qos"]:
myqos = gnmi_pb2.QOSMarking(marking=opt["qos"])
else:
myqos = None
mysblist = gnmi_pb2.SubscriptionList(prefix=myprefix, mode=opt['subscribe_mode'],
allow_aggregation=opt['aggregate'], encoding=opt['encoding'],
subscription=mysubs, use_aliases=opt['use_alias'], qos=myqos)
mysubreq = gnmi_pb2.SubscribeRequest(subscribe=mysblist)

print('Sending SubscribeRequest\n'+str(mysubreq))
yield mysubreq


def subscribe_start(stub, options, req_iterator):
""" RPC Start for Subscribe reqeust
Args:
stub: (class) gNMI Stub used to build the secure channel.
options: (dict) Command line argument passed for subscribe reqeust.
req_iterator: gNMI Subscribe Request from gen_request.
Returns:
Start Subscribe and printing response of gNMI Subscribe Response.
"""
metadata = [('username', options['username']), ('password', options['password'])]
max_update_count = options["update_count"]
try:
responses = stub.Subscribe(req_iterator, options['timeout'], metadata=metadata)
update_count = 0
for response in responses:
print('{0} response received: '.format(datetime.datetime.now()))
if response.HasField('sync_response'):
print(str(response))
elif response.HasField('error'):
print('gNMI Error '+str(response.error.code)+\
' received\n'+str(response.error.message) + str(response.error))
elif response.HasField('update'):
print(response)
update_count = update_count+1
else:
print('Unknown response received:\n'+str(response))

if max_update_count != 0 and update_count == max_update_count:
print("Max update count reached {0}".format(update_count))
break
except KeyboardInterrupt:
print("Subscribe Session stopped by user.")
except grpc.RpcError as x:
print("grpc.RpcError received:\n%s" %x)
except Exception as err:
print(err)


def main():
argparser = _create_parser()
args = vars(argparser.parse_args())
Expand Down Expand Up @@ -414,9 +502,9 @@ def main():
response = _set(stub, paths, 'delete', user, password, json_value)
print('The SetRequest response is below\n' + '-'*25 + '\n', response)
elif mode == 'subscribe':
print('This mode not available in this version')
sys.exit()
request_iterator = gen_request(paths, args, prefix)
subscribe_start(stub, args, request_iterator)


if __name__ == '__main__':
main()
main()