-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathquicksnmp.py
89 lines (72 loc) · 2.59 KB
/
quicksnmp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
#!/usr/local/env python3
from pysnmp import hlapi
def construct_object_types(list_of_oids):
object_types = []
for oid in list_of_oids:
object_types.append(hlapi.ObjectType(hlapi.ObjectIdentity(oid)))
return object_types
def construct_value_pairs(list_of_pairs):
pairs = []
for key, value in list_of_pairs.items():
pairs.append(hlapi.ObjectType(hlapi.ObjectIdentity(key), value))
return pairs
def get(target, oids, credentials, port=161, engine=hlapi.SnmpEngine(), context=hlapi.ContextData()):
handler = hlapi.getCmd(
engine,
credentials,
hlapi.UdpTransportTarget((target, port)),
context,
*construct_object_types(oids)
)
return fetch(handler, 1)[0]
def set(target, value_pairs, credentials, port=161, engine=hlapi.SnmpEngine(), context=hlapi.ContextData()):
handler = hlapi.setCmd(
engine,
credentials,
hlapi.UdpTransportTarget((target, port)),
context,
*construct_value_pairs(value_pairs)
)
return fetch(handler, 1)[0]
def get_bulk(target, oids, credentials, count, start_from=0, port=161,
engine=hlapi.SnmpEngine(), context=hlapi.ContextData()):
handler = hlapi.bulkCmd(
engine,
credentials,
hlapi.UdpTransportTarget((target, port)),
context,
start_from, count,
*construct_object_types(oids)
)
return fetch(handler, count)
def get_bulk_auto(target, oids, credentials, count_oid, start_from=0, port=161,
engine=hlapi.SnmpEngine(), context=hlapi.ContextData()):
count = get(target, [count_oid], credentials, port, engine, context)[count_oid]
return get_bulk(target, oids, credentials, count, start_from, port, engine, context)
def cast(value):
try:
return int(value)
except (ValueError, TypeError):
try:
return float(value)
except (ValueError, TypeError):
try:
return str(value)
except (ValueError, TypeError):
pass
return value
def fetch(handler, count):
result = []
for i in range(count):
try:
error_indication, error_status, error_index, var_binds = next(handler)
if not error_indication and not error_status:
items = {}
for var_bind in var_binds:
items[str(var_bind[0])] = cast(var_bind[1])
result.append(items)
else:
raise RuntimeError('Got SNMP error: {0}'.format(error_indication))
except StopIteration:
break
return result