forked from seveas/python-hpilo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhpilo.py
2122 lines (1857 loc) · 95.1 KB
/
hpilo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# (c) 2011-2014 Dennis Kaarsemaker <[email protected]>
# see COPYING for license details
import os
import platform
import random
import re
import socket
import subprocess
import sys
import warnings
import hpilo_fw
PY3 = sys.version_info[0] >= 3
if PY3:
import urllib.request as urllib2
import io as StringIO
b = lambda x: bytes(x, 'ascii')
class Bogus(Exception): pass
socket.sslerror = Bogus
basestring = str
else:
import urllib2
import cStringIO as StringIO
b = lambda x: x
try:
import ssl
except ImportError:
# Fallback for older python versions
class ssl:
PROTOCOL_SSLv3 = 1
PROTOCOL_TLSv23 = 2
PROTOCOL_TLSv1 = 3
@staticmethod
def wrap_socket(sock, *args, **kwargs):
return ssl(sock)
def __init__(self, sock):
self.sock = sock
self.sslsock = socket.ssl(sock)
def read(self, n=None):
if not n:
return self.sslsock.read()
return self.sslsock.read(n)
def write(self, data):
return self.sslsock.write(data)
def shutdown(self, what):
return self.sock.shutdown(what)
def close(self):
return self.sock.close()
try:
import xml.etree.ElementTree as etree
except ImportError:
import elementtree.ElementTree as etree
# Oh the joys of monkeypatching...
# - We need a CDATA element in set_security_msg, but ElementTree doesn't support it
# - We need to disable escaping of the PASSWORD attribute, because iLO doesn't
# unescape it properly
def CDATA(text=None):
element = etree.Element('![CDATA[')
element.text = text
return element
# Adding this tag to RIBCL scripts should make this hack unnecessary in newer
# iLO firmware versions. TODO: Check compatibility.
# <?ilo entity-processing="standard"?>
class DoNotEscapeMe(str):
pass
etree._original_escape_attrib = etree._escape_attrib
def _escape_attrib(text, *args, **kwargs):
if isinstance(text, DoNotEscapeMe):
return str(text)
else:
return etree._original_escape_attrib(text, *args, **kwargs)
etree._escape_attrib = _escape_attrib
# Python 2.7 and 3
if hasattr(etree, '_serialize_xml'):
etree._original_serialize_xml = etree._serialize_xml
def _serialize_xml(write, elem, *args, **kwargs):
if elem.tag == '![CDATA[':
write("\n<%s%s]]>\n" % (elem.tag, elem.text))
return
return etree._original_serialize_xml(write, elem, *args, **kwargs)
etree._serialize_xml = etree._serialize['xml'] = _serialize_xml
# Python 2.5-2.6, and non-stdlib ElementTree
elif hasattr(etree.ElementTree, '_write'):
etree.ElementTree._orig_write = etree.ElementTree._write
def _write(self, file, node, encoding, namespaces):
if node.tag == '![CDATA[':
file.write("\n<![CDATA[%s]]>\n" % node.text.encode(encoding))
else:
self._orig_write(file, node, encoding, namespaces)
etree.ElementTree._write = _write
else:
raise RuntimeError("Don't know how to monkeypatch XML serializer workarounds. Please report a bug at https://github.com/seveas/python-hpilo")
# Which protocol to use
ILO_RAW = 1
ILO_HTTP = 2
ILO_LOCAL = 3
_untested = []
def untested(meth):
"""Decorator to mark a method as untested"""
meth.untested = True
if hasattr(meth, 'func_name'):
_untested.append(meth.func_name)
else:
_untested.append(meth.__name__)
return meth
class IloError(Exception):
def __init__(self, message, errorcode=None):
if issubclass(IloError, object):
super(IloError, self).__init__(message)
else:
Exception.__init__(self, message)
self.errorcode = errorcode
class IloCommunicationError(IloError):
pass
# When we stop supporting ilo 1, 'User login name was not found' and 0x000a can
# be removed. They should, as they cause IloLoginFailed in cases where login
# did not fail, but e.g. get_user('nonexistent') was called
class IloLoginFailed(IloError):
possible_messages = ['User login name was not found', 'Login failed', 'Login credentials rejected']
possible_codes = [0x005f, 0x000a]
pass
class IloWarning(Warning):
pass
class IloTestWarning(Warning):
pass
class Ilo(object):
"""Represents an iLO/iLO2/iLO3/RILOE II management interface on a
specific host. A new connection using the specified login, password and
timeout will be made for each API call. The library will detect which
protocol to use, but you can override this by setting protocol to
ILO_RAW or ILO_HTTP. Use ILO_LOCAL to avoid using a network connection
and use hponcfg instead. Username and password are ignored for ILO_LOCAL
connections. Set delayed to True to make python-hpilo not send requests
immediately, but group them together. See :func:`call_delayed`"""
XML_HEADER = b('<?xml version="1.0"?>\r\n')
HTTP_HEADER = "POST /ribcl HTTP/1.1\r\nHost: localhost\r\nContent-Length: %d\r\nConnection: Close%s\r\n\r\n"
HTTP_UPLOAD_HEADER = "POST /cgi-bin/uploadRibclFiles HTTP/1.1\r\nHost: localhost\r\nConnection: Close\r\nContent-Length: %d\r\nContent-Type: multipart/form-data; boundary=%s\r\n\r\n"
BLOCK_SIZE = 64 * 1024
hponcfg = "/sbin/hponcfg"
if platform.system() == 'Windows':
hponcfg = 'C:\Program Files\HP Lights-Out Configuration Utility\cpqlocfg.exe'
def __init__(self, hostname, login=None, password=None, timeout=60, port=443, protocol=None, delayed=False):
self.hostname = hostname
self.login = login or 'Administrator'
self.password = password or 'Password'
self.timeout = timeout
self.debug = 0
self.port = port
self.protocol = protocol
self.cookie = None
self.delayed = delayed
self._elements = None
self._processors = []
self.ssl_version = ssl.PROTOCOL_TLSv1
self.save_response = None
self.read_response = None
self._protect_passwords = os.environ.get('HPILO_DONT_PROTECT_PASSWORDS', None) != 'YesPlease'
def __str__(self):
return "iLO interface of %s" % self.hostname
def _debug(self, level, message):
if message.__class__.__name__ == 'bytes':
message = message.decode('latin-1')
if self.debug >= level:
if self._protect_passwords:
message = re.sub(r'PASSWORD=".*?"', 'PASSWORD="********"', message)
sys.stderr.write(message)
if message.startswith('\r'):
sys.stderr.flush()
else:
sys.stderr.write('\n')
def _request(self, xml, progress=None):
"""Given an ElementTree.Element, serialize it and do the request.
Returns an ElementTree.Element containing the response"""
if not self.protocol and not self.read_response:
self._detect_protocol()
# Serialize the XML
if hasattr(etree, 'tostringlist'):
xml = b("\r\n").join(etree.tostringlist(xml)) + b('\r\n')
else:
xml = etree.tostring(xml)
header, data = self._communicate(xml, self.protocol, progress=progress)
# This thing usually contains multiple XML messages
messages = []
while data:
pos = data.find('<?xml', 5)
if pos == -1:
message = self._parse_message(data)
data = None
else:
message = self._parse_message(data[:pos])
data = data[pos:]
# _parse_message returns None if a message has no useful content
if message is not None:
messages.append(message)
if not messages:
return header, None
elif len(messages) == 1:
return header, messages[0]
else:
return header, messages
def _detect_protocol(self):
# Use hponcfg when 'connecting' to localhost
if self.hostname == 'localhost':
self.protocol = ILO_LOCAL
return
# Do a bogus request, using the HTTP protocol. If there is no
# header (see special case in communicate(), we should be using the
# raw protocol
header, data = self._communicate(b('<RIBCL VERSION="2.0"></RIBCL>'), ILO_HTTP)
if header:
self.protocol = ILO_HTTP
else:
self.protocol = ILO_RAW
def _upload_file(self, filename, progress):
firmware = open(filename, 'rb').read()
boundary = b('------hpiLO3t' + str(random.randint(100000,1000000)) + 'z')
while boundary in firmware:
boundary = b('------hpiLO3t' + str(random.randint(100000,1000000)) + 'z')
parts = [
b("--") + boundary + b("""\r\nContent-Disposition: form-data; name="fileType"\r\n\r\n"""),
b("\r\n--") + boundary + b('''\r\nContent-Disposition: form-data; name="fwimgfile"; filename="''') + b(filename) + b('''"\r\nContent-Type: application/octet-stream\r\n\r\n'''),
firmware,
b("\r\n--") + boundary + b("--\r\n"),
]
total_bytes = sum([len(x) for x in parts])
sock = self._get_socket()
self._debug(2, self.HTTP_UPLOAD_HEADER % (total_bytes, boundary.decode('ascii')))
sock.write(b(self.HTTP_UPLOAD_HEADER % (total_bytes, boundary.decode('ascii'))))
for part in parts:
if len(part) < self.BLOCK_SIZE:
self._debug(2, part)
sock.write(part)
else:
sent = 0
fwlen = len(part)
while sent < fwlen:
written = sock.write(part[sent:sent+self.BLOCK_SIZE])
if written is None:
plen = len(part[sent:sent+self.BLOCK_SIZE])
raise IloCommunicationError("Unexpected EOF while sending %d bytes (%d of %d sent before)" % (plen, sent, fwlen))
sent += written
if callable(progress):
progress("Sending request %d/%d bytes (%d%%)" % (sent, fwlen, 100.0*sent/fwlen))
data = ''
try:
while True:
d = sock.read()
data += d.decode('latin-1')
if not d:
break
except socket.sslerror: # Connection closed
e = sys.exc_info()[1]
if not data:
raise IloCommunicationError("Communication with %s:%d failed: %s" % (self.hostname, self.port, str(e)))
self._debug(1, "Received %d bytes" % len(data))
self._debug(2, data)
if 'Set-Cookie:' not in data:
# Seen on ilo3 with corrupt filesystem
body = re.search('<body>(.*)</body>', data, flags=re.DOTALL).group(1)
body = re.sub('<[^>]*>', '', body).strip()
body = re.sub('Return to last page', '', body).strip()
body = re.sub('\s+', ' ', body).strip()
raise IloError(body)
self.cookie = re.search('Set-Cookie: *(.*)', data).group(1)
self._debug(2, "Cookie: %s" % self.cookie)
def _get_socket(self):
"""Set up a subprocess or an https connection and do an HTTP/raw socket request"""
if self.read_response:
class FakeSocket(object):
def __init__(self, file):
self.trash = StringIO.StringIO()
self.output = open(file)
self.read = self.output.read
self.write = self.trash.write
self.close = self.output.close
shutdown = lambda *args: None
return FakeSocket(self.read_response)
if self.protocol == ILO_LOCAL:
self._debug(1, "Launching hponcfg")
try:
sp = subprocess.Popen([self.hponcfg, '--input', '--xmlverbose'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except OSError:
e = sys.exc_info()[1]
raise IloCommunicationError("Cannot run %s: %s" % (self.hponcfg, str(e)))
sp.write = sp.stdin.write
sp.read = sp.stdout.read
return sp
self._debug(1, "Connecting to %s port %d" % (self.hostname, self.port))
err = None
for res in socket.getaddrinfo(self.hostname, self.port, 0, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
sock = None
try:
sock = socket.socket(af, socktype, proto)
sock.settimeout(self.timeout)
self._debug(2, "Connecting to %s port %d" % sa[:2])
sock.connect(sa)
except socket.timeout:
if sock is not None:
sock.close()
err = IloCommunicationError("Timeout connecting to %s port %d" % (self.hostname, self.port))
except socket.error:
if sock is not None:
sock.close()
e = sys.exc_info()[1]
err = IloCommunicationError("Error connecting to %s port %d: %s" % (self.hostname, self.port, str(e)))
if err is not None:
raise err
if not sock:
raise IloCommunicationError("Unable to resolve %s" % self.hostname)
try:
return ssl.wrap_socket(sock, ssl_version=self.ssl_version)
except socket.sslerror:
e = sys.exc_info()[1]
# Some ancient iLO's don't support TLSv1, retry with SSLv3
if 'wrong version number' in (e.message or str(e)) and self.sslversion == ssl.PROTOCOL_TLSv1:
self.ssl_version = ssl.PROTOCOL_SSLv3
return self._get_socket()
raise IloCommunicationError("Cannot establish ssl session with %s:%d: %s" % (self.hostname, self.port, e.message or str(e)))
def _communicate(self, xml, protocol, progress=None):
sock = self._get_socket()
msglen = msglen_ = len(self.XML_HEADER + xml)
if protocol == ILO_HTTP:
extra_header = ''
if self.cookie:
extra_header = "\r\nCookie: %s" % self.cookie
http_header = self.HTTP_HEADER % (msglen, extra_header)
msglen += len(http_header)
self._debug(1, "Sending XML request, %d bytes" % msglen)
if protocol == ILO_HTTP:
self._debug(2, http_header)
sock.write(b(http_header))
self._debug(2, self.XML_HEADER + xml)
# XML header and data need to arrive in 2 distinct packets
if self.protocol != ILO_LOCAL:
sock.write(self.XML_HEADER)
if b('$EMBED') in xml:
pre, name, post = re.compile(b(r'(.*)\$EMBED:(.*)\$(.*)'), re.DOTALL).match(xml).groups()
sock.write(pre)
sent = 0
fwlen = os.path.getsize(name)
fw = open(name, 'rb').read()
while sent < fwlen:
written = sock.write(fw[sent:sent+self.BLOCK_SIZE])
sent += written
if callable(progress):
progress("Sending request %d/%d bytes (%d%%)" % (sent, fwlen, 100.0*sent/fwlen))
sock.write(post.strip())
else:
sock.write(xml)
# And grab the data
if self.protocol == ILO_LOCAL:
# hponcfg doesn't return data until stdin is closed
sock.stdin.close()
data = ''
try:
while True:
d = sock.read().decode('latin-1')
data += d
if not d:
break
if callable(progress) and d.strip().endswith('</RIBCL>'):
d = d[d.find('<?xml'):]
while '<?xml' in d:
end = d.find('<?xml', 5)
if end == -1:
msg = self._parse_message(d, include_inform=True)
if msg:
progress(msg)
break
else:
msg = self._parse_message(d[:end], include_inform=True)
if msg:
progress(msg)
d = d[end:]
except socket.sslerror: # Connection closed
e = sys.exc_info()[1]
if not data:
raise IloCommunicationError("Communication with %s:%d failed: %s" % (self.hostname, self.port, str(e)))
self._debug(1, "Received %d bytes" % len(data))
if self.protocol == ILO_LOCAL:
sock.stdout.close()
sock.wait()
elif sock.shutdown:
sock.shutdown(socket.SHUT_RDWR)
sock.close()
if self.save_response:
fd = open(self.save_response, 'a')
fd.write(data)
fd.close()
# Stript out garbage from hponcfg
if self.protocol == ILO_LOCAL:
data = data[data.find('<'):data.rfind('>')+1]
# Do we have HTTP?
header_ = ''
if protocol == ILO_HTTP and data.startswith('HTTP/1.1 200'):
header, data = data.split('\r\n\r\n', 1)
header_ = header
header = [x.split(':', 1) for x in header.split('\r\n')[1:]]
header = dict([(x[0].lower(), x[1].strip()) for x in header])
if header['transfer-encoding'] == 'chunked':
_data, data = data, ''
while _data:
clen, _data = _data.split('\r\n', 1)
clen = int(clen, 16)
if clen == 0:
break
data += _data[:clen]
_data = _data[clen+2:]
elif data.startswith('HTTP/1.1 404'):
# We must be using iLO2 or older, they don't do HTTP for XML requests
# This case is only triggered by the protocol detection
header = None
elif not data.startswith('<?xml'):
if protocol == ILO_LOCAL:
raise IloError(sock.stderr.read().strip())
raise IloError("Remote returned bogus data, maybe it's not an iLO")
else:
header = None
self._debug(2, "%s\r\n\r\n%s" % (header_, data))
return header, data
def _root_element(self, element, **attrs):
"""Create a basic XML structure for a message. Return root and innermost element"""
if not self.delayed or not self._elements:
root = etree.Element('RIBCL', VERSION="2.0")
login = etree.SubElement(root, 'LOGIN', USER_LOGIN=self.login, PASSWORD=DoNotEscapeMe(self.password))
if self.delayed:
if self._elements:
root, login = self._elements
else:
self._elements = (root, login)
element = etree.SubElement(login, element, **attrs)
return root, element
def _parse_message(self, data, include_inform=False):
"""Parse iLO responses into Element instances and remove useless messages"""
# Bug in some ilo versions causes malformed XML
if '<RIBCL VERSION="2.22"/>' in data:
data = data.replace('<RIBCL VERSION="2.22"/>', '<RIBCL VERSION="2.22">')
if re.search(r'''=+ *[^"'\n=]''', data):
data = re.sub(r'''= *([^"'\n]+?) *\n''', r'="\1"', data)
data = data.strip()
if not data:
return None
message = etree.fromstring(data)
if message.tag == 'RIBCL':
for child in message:
if child.tag == 'INFORM':
if include_inform:
# Filter useless message:
if 'should be updated' in child.text:
return None
return child.text
# RESPONE with status 0 also adds no value
# Maybe start adding <?xmlilo output-format="xml"?> to requests. TODO: check compatibility
elif child.tag == 'RESPONSE' and int(child.get('STATUS'), 16) == 0:
if child.get('MESSAGE') != 'No error':
warnings.warn(child.get('MESSAGE'), IloWarning)
# These are interesting, something went wrong
elif child.tag == 'RESPONSE':
if 'syntax error' in child.get('MESSAGE') and not self.protocol:
# This is triggered when doing protocol detection, ignore
pass
else:
status = int(child.get('STATUS'), 16)
message = child.get('MESSAGE')
if 'syntax error' in message:
message += '. You may have tried to use a feature this iLO version or firmware version does not support.'
if status in IloLoginFailed.possible_codes or \
message in IloLoginFailed.possible_messages:
raise IloLoginFailed(message, status)
raise IloError(message, status)
# And this type of message is the actual payload.
else:
return message
return None
# This shouldn't be reached as all messages are RIBCL messages. But who knows!
return message
def _element_children_to_dict(self, element):
"""Returns a dict with tag names of all child elements as keys and the
VALUE attributes as values"""
retval = {}
keys = [elt.tag.lower() for elt in element]
if len(keys) != 1 and len(set(keys)) == 1:
# Can't return a dict
retval = []
for elt in element:
# There are some special tags
fname = '_parse_%s_%s' % (element.tag.lower(), elt.tag.lower())
if hasattr(self, fname):
retval.update(getattr(self, fname)(elt))
continue
key, val, unit = elt.tag.lower(), elt.get('VALUE', elt.get('value', None)), elt.get('UNIT', None)
if val is None:
# HP is not best friends with consistency. Sometimes there are
# attributes, sometimes child tags and sometimes text nodes. Oh
# well, deal with it :)
if elt.attrib and list(elt):
val = self._element_to_dict(elt)
elif list(elt):
val = self._element_to_list(elt)
elif elt.text:
val = elt.text.strip()
elif elt.attrib:
val = self._element_to_dict(elt)
val = self._coerce(val)
if unit:
val = (val, unit)
if isinstance(retval, list):
retval.append(val)
elif key in retval:
if isinstance(retval[key], dict):
retval[key].update(val)
elif not isinstance(retval[key], list):
retval[key] = [retval[key], val]
else:
retval[key].append(val)
else:
retval[key] = val
return retval
def _element_to_dict(self, element):
"""Returns a dict with tag attributes as items"""
retval = {}
for key, val in element.attrib.items():
retval[key.lower()] = self._coerce(val)
if list(element):
fields = []
for child in element.getchildren():
if child.tag == 'FIELD':
fields.append(self._element_to_dict(child))
if fields:
names = [x['name'] for x in fields]
if len(names) == len(set(names)):
# Field names are unique, treat them like attributes
for field in fields:
retval[field['name']] = field['value']
else:
# Field names are not unique, such as the name "MAC"
retval['fields'] = fields
return retval
def _element_to_list(self, element):
tagnames = [x.tag for x in element]
if len(set(tagnames)) == 1:
return [self._element_children_to_dict(x) for x in element]
else:
return [(child.tag.lower(), self._element_to_dict(child)) for child in element]
def _coerce(self, val):
"""Do some data type coercion: unquote, turn integers into integers and
Y/N into booleans"""
if isinstance(val, basestring):
if val.startswith('"') and val.endswith('"'):
val = val[1:-1]
if val.isdigit():
val = int(val)
else:
val = {'Y': True, 'N': False}.get(val, val)
return val
def _raw(self, *tags):
if self.delayed:
raise IloError("Cannot use raw tags in delayed mode")
root, inner = self._root_element(tags[0][0], **(tags[0][1]))
for t in tags[1:]:
inner = etree.SubElement(inner, t[0], **t[1])
header, message = self._request(root)
fd = StringIO.StringIO()
etree.ElementTree(message).write(fd)
ret = fd.getvalue()
fd.close()
return ret
def _info_tag(self, infotype, tagname, returntags=None, attrib={}, process=lambda x: x):
root, inner = self._root_element(infotype, MODE='read')
etree.SubElement(inner, tagname, **attrib)
if self.delayed:
self._processors.append([self._process_info_tag, returntags or [tagname], process])
return
header, message = self._request(root)
return self._process_info_tag(message, returntags or [tagname], process)
def _process_info_tag(self, message, returntags, process):
if isinstance(returntags, basestring):
returntags = [returntags]
for tag in returntags:
if message.find(tag) is None:
continue
message = message.find(tag)
if list(message):
return process(self._element_children_to_dict(message))
else:
return process(self._element_to_dict(message))
raise IloError("Expected tag '%s' not found" % "' or '".join(returntags))
def _control_tag(self, controltype, tagname, returntag=None, attrib={}, elements=[], text=None):
root, inner = self._root_element(controltype, MODE='write')
inner = etree.SubElement(inner, tagname, **attrib)
if text:
inner.text = text
for element in elements:
inner.append(element)
if self.delayed:
if tagname == 'CERTIFICATE_SIGNING_REQUEST':
self._processors.append([self._process_control_tag, returntag or tagname])
return
header, message = self._request(root)
return self._process_control_tag(message, returntag or tagname)
def _process_control_tag(self, message, returntag):
if message is None:
return None
message = message.find(returntag)
if message.text.strip():
return message.text.strip()
if not message.attrib and not list(message):
return None
raise IloError("You've reached unknown territories, please report a bug")
if list(message):
return self._element_children_to_dict(message)
else:
return self._element_to_dict(message)
def call_delayed(self):
"""In delayed mode, calling a method on an iLO object will not cause an
immediate callout to the iLO. Instead, the method and parameters are
stored for future calls of this method. This method makes one
connection to the iLO and sends all commands as one XML document.
This speeds up applications that make many calls to the iLO by
removing seconds of overhead per call.
The return value of call_delayed is a list of return values for
individual methods that don't return None. This means that there may
be fewer items returned than methods called as only `get_*` methods
return data
Delayed calls only work on iLO 2 or newer"""
if not self._elements:
raise ValueError("No commands scheduled")
root, inner = self._elements
header, message = self._request(root)
ret = []
if message is not None:
if not isinstance(message, list):
message = [message]
for message, processor in zip(message, self._processors):
ret.append(processor.pop(0)(message, *processor))
self._processors = []
self._elements = None
return ret
def activate_license(self, key):
"""Activate an iLO advanced license"""
license = etree.Element('ACTIVATE', KEY=key)
return self._control_tag('RIB_INFO', 'LICENSE', elements=[license])
def add_federation_group(self, group_name, group_key, admin_priv=False,
remote_cons_priv=True, reset_server_priv=False,
virtual_media_priv=False, config_ilo_priv=True, login_priv=False):
"""Add a new federation group"""
attrs = locals()
elements = []
for attribute in [x for x in attrs.keys() if x.endswith('_priv')]:
val = ['No', 'Yes'][bool(attrs[attribute])]
elements.append(etree.Element(attribute.upper(), VALUE=val))
return self._control_tag('RIB_INFO', 'ADD_FEDERATION_GROUP', elements=elements,
attrib={'GROUP_NAME': group_name, 'GROUP_KEY': group_key})
def add_user(self, user_login, user_name, password, admin_priv=False,
remote_cons_priv=True, reset_server_priv=False,
virtual_media_priv=False, config_ilo_priv=True):
"""Add a new user to the iLO interface with the specified name,
password and permissions. Permission attributes should be boolean
values."""
attrs = locals()
elements = []
for attribute in [x for x in attrs.keys() if x.endswith('_priv')]:
val = ['No', 'Yes'][bool(attrs[attribute])]
elements.append(etree.Element(attribute.upper(), VALUE=val))
return self._control_tag('USER_INFO', 'ADD_USER', elements=elements,
attrib={'USER_LOGIN': user_login, 'USER_NAME': user_name, 'PASSWORD': DoNotEscapeMe(password)})
def ahs_clear_data(self):
"""Clears Active Health System information log"""
return self._raw(('RIB_INFO', {'MODE': 'WRITE'}), ('AHS_CLEAR_DATA', {}))
def cert_fqdn(self, use_fqdn):
"""Configure whether to use the fqdn or the short hostname for certificate requests"""
use_fqdn = str({True: 'Yes', False: 'No'}.get(use_fqdn, use_fqdn))
return self._control_tag('RIB_INFO', 'CERT_FQDN', attrib={'VALUE': use_fqdn})
def certificate_signing_request(self, country=None, state=None, locality=None, organization=None,
organizational_unit=None, common_name=None):
"""Get a certificate signing request from the iLO"""
vars = locals()
del vars['self']
vars = dict([('CSR_' + x.upper(), vars[x]) for x in vars if vars[x]])
return self._control_tag('RIB_INFO', 'CERTIFICATE_SIGNING_REQUEST')
def clear_ilo_event_log(self):
"""Clears the iLO event log"""
return self._control_tag('RIB_INFO', 'CLEAR_EVENTLOG')
def clear_server_event_log(self):
"""Clears the server event log"""
return self._control_tag('SERVER_INFO', 'CLEAR_IML')
def clear_server_power_on_time(self):
"""Clears the server power on time"""
return self._control_tag('SERVER_INFO', 'CLEAR_SERVER_POWER_ON_TIME')
def computer_lock_config(self, computer_lock=None, computer_lock_key=None):
"""Configure the computer lock settings"""
if computer_lock_key:
computer_lock = "custom"
if not computer_lock:
raise ValueError("A value must be specified for computer_lock")
elements = [etree.Element('COMPUTER_LOCK', VALUE=computer_lock)]
if computer_lock_key:
elements.append(etree.Element('COMPUTER_LOCK_KEY', VALUE=computer_lock_key))
return self._control_tag('RIB_INFO', 'COMPUTER_LOCK_CONFIG', elements=elements)
def dc_registration_complete(self):
return self._control_tag('RIB_INFO', 'DC_REGISTRATION_COMPLETE')
def delete_federation_group(self, group_name):
"""Delete the specified federation group membership"""
return self._control_tag('RIB_INFO', 'DELETE_FEDERATION_GROUP', attrib={'GROUP_NAME': group_name})
def delete_user(self, user_login):
"""Delete the specified user from the ilo"""
return self._control_tag('USER_INFO', 'DELETE_USER', attrib={'USER_LOGIN': user_login})
def disable_ers(self):
"""Disable Insight Remote Support functionality and unregister the server"""
return self._control_tag('RIB_INFO', 'DISABLE_ERS')
def eject_virtual_floppy(self):
"""Eject the virtual floppy"""
return self._control_tag('RIB_INFO', 'EJECT_VIRTUAL_FLOPPY')
def eject_virtual_media(self, device="cdrom"):
"""Eject the virtual media attached to the specified device"""
return self._control_tag('RIB_INFO', 'EJECT_VIRTUAL_MEDIA',
attrib={"DEVICE": device.upper()})
def ers_ahs_submit(self, message_id, bb_days):
"""Submity AHS data to the insight remote support server"""
elements = [
etree.Element('MESSAGE_ID', attrib={'VALUE': str(message_id)}),
etree.Element('BB_DAYS', attrib={'VALUE': str(bb_days)}),
]
return self._control_tag('RIB_INFO', 'TRIGGER_BB_DATA', elements=elements)
def fips_enable(self):
"""Enable FIPS standard to enforce AES/3DES encryption, can only be
reset with a call to factory_defaults. Resets Administrator password
and license key"""
return self._control_tag('RIB_INFO', 'FIPS_ENABLE')
def factory_defaults(self):
"""Reset the iLO to factory default settings"""
return self._control_tag('RIB_INFO', 'FACTORY_DEFAULTS')
def get_ahs_status(self):
"""Get active health system logging status"""
return self._info_tag('RIB_INFO', 'GET_AHS_STATUS')
def get_all_users(self):
"""Get a list of all loginnames"""
def process(data):
if isinstance(data, dict):
data = data.values()
return [x for x in data if x]
return self._info_tag('USER_INFO', 'GET_ALL_USERS', process=process)
def get_all_user_info(self):
"""Get basic and authorization info of all users"""
def process(data):
if isinstance(data, dict):
data = data.values()
return dict([(x['user_login'], x) for x in data])
return self._info_tag('USER_INFO', 'GET_ALL_USER_INFO', process=process)
def get_asset_tag(self):
"""Gets the server asset tag"""
# The absence of an asset tag is communicated in a warning and there
# will be *NO* returntag, hence the AttributeError.
try:
return self._info_tag('SERVER_INFO', 'GET_ASSET_TAG')
except AttributeError:
return {'asset_tag': None}
def get_cert_subject_info(self):
"""Get ssl certificate subject information"""
return self._info_tag('RIB_INFO', 'GET_CERT_SUBJECT_INFO', 'CSR_CERT_SETTINGS')
def get_current_boot_mode(self):
"""Get the current boot mode (legaci or uefi)"""
return self._info_tag('SERVER_INFO', 'GET_CURRENT_BOOT_MODE', process=lambda data: data['boot_mode'])
def get_dir_config(self):
"""Get directory authentication configuration"""
return self._info_tag('DIR_INFO', 'GET_DIR_CONFIG')
def get_embedded_health(self):
"""Get server health information"""
def process(data):
for category in data:
if category == 'health_at_a_glance':
health = {}
for key, val in data[category]:
if key not in health:
health[key] = val
else:
health[key].update(val)
data[category] = health
continue
elif isinstance(data[category], list) and data[category]:
for tag in ('label', 'location'):
if tag in data[category][0]:
data[category] = dict([(x[tag], x) for x in data[category]])
break
elif data[category] in ['', []]:
data[category] = None
return data
return self._info_tag('SERVER_INFO', 'GET_EMBEDDED_HEALTH', 'GET_EMBEDDED_HEALTH_DATA',
process=process)
# Ok, special XML structures. Yay.
def _parse_get_embedded_health_data_drives(self, element):
ret = []
for bp in element:
if bp.tag != 'BACKPLANE':
raise IloError("Unexpected data returned: %s" % bp.tag)
backplane = obj = {'drive_bays': {}}
ret.append(backplane)
for elt in bp:
if elt.tag == 'DRIVE_BAY':
obj = {}
backplane['drive_bays'][int(elt.get('VALUE'))] = obj
else:
obj[elt.tag.lower()] = elt.get('VALUE')
return {'drives_backplanes': ret}
def _parse_get_embedded_health_data_memory(self, element):
ret = {}
for elt in element:
fname = '_parse_%s_%s' % (element.tag.lower(), elt.tag.lower())
if hasattr(self, fname):
ret.update(getattr(self, fname)(elt))
continue
ret[elt.tag.lower()] = self._element_children_to_dict(elt)
return {element.tag.lower(): ret}
_parse_memory_memory_details_summary = _parse_get_embedded_health_data_memory
def _parse_memory_memory_details(self, element):
ret = {}
for elt in element:
if elt.tag not in ret:
ret[elt.tag] = {}
data = self._element_children_to_dict(elt)
ret[elt.tag]["socket %d" % data["socket"]] = data
return {element.tag.lower(): ret}
def _parse_get_embedded_health_data_nic_information(self, element):
ret = {}
for elt in element:
data = self._element_children_to_dict(elt)
ret[data['network_port']] = data
return {element.tag.lower(): ret}
def _parse_get_embedded_health_data_firmware_information(self, element):
ret = {}
for elt in element:
data = self._element_children_to_dict(elt)
ret[data['firmware_name']] = data['firmware_version']
return {element.tag.lower(): ret}
def _parse_get_embedded_health_data_storage(self, element):
ret = []
for ctrl in element:
data = {}
for elt in ctrl:
tag = elt.tag.lower()
if tag in ('drive_enclosure', 'logical_drive'):
tag += 's'
if tag not in data:
data[tag] = []
if tag == 'drive_enclosures':
data[tag].append(self._element_children_to_dict(elt))
else:
data[tag].append(self._parse_logical_drive(elt))
else:
data[tag] = elt.get('VALUE')
ret.append(data)
return {element.tag.lower(): ret}
def _parse_logical_drive(self, element):
data = {}
for elt in element:
tag = elt.tag.lower()
if tag == 'physical_drive':
tag += 's'
if tag not in data:
data[tag] = []
data[tag].append(self._element_children_to_dict(elt))
else:
data[tag] = elt.get('VALUE')
return data
def get_encrypt_settings(self):
"""Get the iLO encryption settings"""
return self._info_tag('RIB_INFO', 'GET_ENCRYPT_SETTINGS')
def get_ers_settings(self):
"""Get the ERS Insight Remote Support settings"""
return self._info_tag('RIB_INFO', 'GET_ERS_SETTINGS')
def get_federation_all_groups(self):
"""Get all federation group names"""
def process(data):
if isinstance(data, dict):
data = data.values()
return data
return self._info_tag('RIB_INFO', 'GET_FEDERATION_ALL_GROUPS', process=process)
def get_federation_all_groups_info(self):
"""Get all federation group names and associated privileges"""
def process(data):
if isinstance(data, dict):
data = data.values()
data = [dict([(key, {'yes': True, 'no': False}.get(val['value'].lower(), val['value'])) for (key, val) in group]) for group in data]
return dict([(x['group_name'], x) for x in data])