-
Notifications
You must be signed in to change notification settings - Fork 199
/
Copy pathremovetemplate.py
815 lines (652 loc) · 33.6 KB
/
removetemplate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
readMe = """This is script to create a copy of a template-based network that preserves as many of the
network's settings as possible, while not relying on a configuration template. The initial focus of
the script is converting MX appliance networks.
Syntax:
removetemplate -k <key> -o <org name> -n <source net name>
Notes:
This release supports the following features:
General:
Basic network settings
Network admin privileges
Network tags
Group policy objects
MX:
VLANs and subnets
VLAN group policies
Static routes (affected by an API bug at time of writing. See comments in code)
NAT
L3 Firewall
L7 Firewall
Traffic shaping
Content filtering
AMP
IPS
The source for NAT/Port forwarding rules is the Config template. Local NAT overrides are ignored
Required Python version and modules:
Developed and tested with Python 3.8.3
Requests: https://requests.readthedocs.io/
* Install by running: pip install requests
"""
import sys, getopt, time, json, ipaddress
from urllib.parse import urlencode
from requests import Session, utils
class NoRebuildAuthSession(Session):
def rebuild_auth(self, prepared_request, response):
"""
This method is intentionally empty. Needed to prevent auth header stripping on redirect. More info:
https://stackoverflow.com/questions/60358216/python-requests-post-request-dropping-authorization-header
"""
API_MAX_RETRIES = 3
API_CONNECT_TIMEOUT = 60
API_TRANSMIT_TIMEOUT = 60
API_STATUS_RATE_LIMIT = 429
#Set to True or False to enable/disable console logging of sent API requests
FLAG_REQUEST_VERBOSE = True
API_BASE_URL = "https://api.meraki.com/api/v1"
def merakiRequest(p_apiKey, p_httpVerb, p_endpoint, p_additionalHeaders=None, p_queryItems=None, p_requestBody=None, p_verbose=False, p_retry=0):
#returns success, errors, responseHeaders, responseBody
if p_retry > API_MAX_RETRIES:
if(p_verbose):
print("ERROR: Reached max retries")
return False, None, None, None
bearerString = "Bearer " + p_apiKey
headers = {"Authorization": bearerString}
if not p_additionalHeaders is None:
headers.update(p_additionalHeaders)
query = ""
if not p_queryItems is None:
query = "?" + urlencode(p_queryItems)
url = API_BASE_URL + p_endpoint + query
verb = p_httpVerb.upper()
session = NoRebuildAuthSession()
try:
if(p_verbose):
print(verb, url)
if verb == "GET":
r = session.get(
url,
headers = headers,
timeout = (API_CONNECT_TIMEOUT, API_TRANSMIT_TIMEOUT)
)
elif verb == "PUT":
if not p_requestBody is None:
if (p_verbose):
print("body", p_requestBody)
r = session.put(
url,
headers = headers,
json = p_requestBody,
timeout = (API_CONNECT_TIMEOUT, API_TRANSMIT_TIMEOUT)
)
elif verb == "POST":
if not p_requestBody is None:
if (p_verbose):
print("body", p_requestBody)
r = session.post(
url,
headers = headers,
json = p_requestBody,
timeout = (API_CONNECT_TIMEOUT, API_TRANSMIT_TIMEOUT)
)
elif verb == "DELETE":
r = session.delete(
url,
headers = headers,
timeout = (API_CONNECT_TIMEOUT, API_TRANSMIT_TIMEOUT)
)
else:
return False, None, None, None
except:
return False, None, None, None
if(p_verbose):
print(r.status_code)
success = r.status_code in range (200, 299)
errors = None
responseHeaders = None
responseBody = None
if r.status_code == API_STATUS_RATE_LIMIT:
if(p_verbose):
print("Hit max request rate. Retrying %s after %s seconds" % (p_retry+1, r.headers["Retry-After"]))
time.sleep(int(r.headers["Retry-After"]))
success, errors, responseHeaders, responseBody = merakiRequest(p_apiKey, p_httpVerb, p_endpoint, p_additionalHeaders,
p_queryItems, p_requestBody, p_verbose, p_retry+1)
return success, errors, responseHeaders, responseBody
try:
rjson = r.json()
except:
rjson = None
if not rjson is None:
if "errors" in rjson:
errors = rjson["errors"]
if(p_verbose):
print(errors)
else:
responseBody = rjson
if "Link" in r.headers:
parsedLinks = utils.parse_header_links(r.headers["Link"])
for link in parsedLinks:
if link["rel"] == "next":
if(p_verbose):
print("Next page:", link["url"])
splitLink = link["url"].split("/api/v1")
success, errors, responseHeaders, nextBody = merakiRequest(p_apiKey, p_httpVerb, splitLink[1],
p_additionalHeaders=p_additionalHeaders,
p_requestBody=p_requestBody,
p_verbose=p_verbose)
if success:
if not responseBody is None:
responseBody.append(nextBody)
else:
responseBody = None
return success, errors, responseHeaders, responseBody
def getOrganizations(p_apiKey):
endpoint = "/organizations"
success, errors, headers, response = merakiRequest(p_apiKey, "GET", endpoint, p_verbose=FLAG_REQUEST_VERBOSE)
return success, errors, headers, response
def getNetworks(p_apiKey, p_organizationId):
endpoint = "/organizations/" + p_organizationId + "/networks"
success, errors, headers, response = merakiRequest(p_apiKey, "GET", endpoint, p_verbose=FLAG_REQUEST_VERBOSE)
return success, errors, headers, response
def getAdministrators(p_apiKey, p_organizationId):
endpoint = "/organizations/" + p_organizationId + "/admins"
success, errors, headers, response = merakiRequest(p_apiKey, "GET", endpoint, p_verbose=FLAG_REQUEST_VERBOSE)
return success, errors, headers, response
def updateAdministrator(p_apiKey, p_organizationId, p_adminId, p_settings):
endpoint = "/organizations/" + p_organizationId + "/admins/" + p_adminId
body = {}
for item in p_settings:
if item != "id":
body[item] = p_settings[item]
success, errors, headers, response = merakiRequest(p_apiKey, "PUT", endpoint, p_requestBody=body, p_verbose=FLAG_REQUEST_VERBOSE)
return success, errors, headers, response
def getVlanSettings(p_apiKey, p_networkId):
endpoint = "/networks/" + p_networkId + "/appliance/vlans/settings"
success, errors, headers, response = merakiRequest(p_apiKey, "GET", endpoint, p_verbose=FLAG_REQUEST_VERBOSE)
return success, errors, headers, response
def updateVlanSettings(p_apiKey, p_networkId, p_vlansEnabled):
#p_vlansEnabled: boolean
endpoint = "/networks/" + p_networkId + "/appliance/vlans/settings"
body = {"vlansEnabled":p_vlansEnabled}
success, errors, headers, response = merakiRequest(p_apiKey, "PUT", endpoint, p_requestBody=body, p_verbose=FLAG_REQUEST_VERBOSE)
return success, errors, headers, response
def getVlans(p_apiKey, p_networkId):
endpoint = "/networks/" + p_networkId + "/appliance/vlans"
success, errors, headers, response = merakiRequest(p_apiKey, "GET", endpoint, p_verbose=FLAG_REQUEST_VERBOSE)
return success, errors, headers, response
def updateVlan(p_apiKey, p_networkId, p_vlanId, p_vlanConfiguration):
#p_vlanConfiguration: struct
endpoint = "/networks/" + p_networkId + "/appliance/vlans/" + str(p_vlanId)
if "id" in p_vlanConfiguration:
del p_vlanConfiguration["id"]
if "networkId" in p_vlanConfiguration:
del p_vlanConfiguration["networkId"]
if "fixedIpAssignments" in p_vlanConfiguration:
if p_vlanConfiguration["fixedIpAssignments"] == {}:
del p_vlanConfiguration["fixedIpAssignments"]
if "reservedIpRanges" in p_vlanConfiguration:
if p_vlanConfiguration["reservedIpRanges"] == []:
del p_vlanConfiguration["reservedIpRanges"]
if "dhcpBootOptionsEnabled" in p_vlanConfiguration and (not p_vlanConfiguration["dhcpBootOptionsEnabled"]) and ("dhcpOptions" in p_vlanConfiguration):
del p_vlanConfiguration["dhcpOptions"]
success = True
errors = None
headers = None
response= None
if len(p_vlanConfiguration) > 0:
success, errors, headers, response = merakiRequest(p_apiKey, "PUT", endpoint, p_requestBody=p_vlanConfiguration, p_verbose=FLAG_REQUEST_VERBOSE)
return success, errors, headers, response
def createVlan(p_apiKey, p_networkId, p_vlanConfiguration):
#p_vlanConfiguration: struct
endpoint = "/networks/" + p_networkId + "/appliance/vlans"
vlanId = p_vlanConfiguration["id"]
body = {
"id" : vlanId,
"name" : p_vlanConfiguration["name"],
"subnet" : p_vlanConfiguration["subnet"],
"applianceIp" : p_vlanConfiguration["applianceIp"], }
success, errors, headers, response = merakiRequest(p_apiKey, "POST", endpoint, p_requestBody=body, p_verbose=FLAG_REQUEST_VERBOSE)
del p_vlanConfiguration["id"]
del p_vlanConfiguration["name"]
del p_vlanConfiguration["subnet"]
del p_vlanConfiguration["applianceIp"]
if success:
success, errors, headers, response = updateVlan(p_apiKey, p_networkId, vlanId, p_vlanConfiguration)
return success, errors, headers, response
def deleteVlan(p_apiKey, p_networkId, p_vlanId):
endpoint = "/networks/" + p_networkId + "/appliance/vlans/" + str(p_vlanId)
success, errors, headers, response = merakiRequest(p_apiKey, "DELETE", endpoint, p_verbose=FLAG_REQUEST_VERBOSE)
return success, errors, headers, response
def getSingleLan(p_apiKey, p_networkId):
#Returns MX LAN subnet configuration when VLANs are not enabled
endpoint = "/networks/" + p_networkId + "/appliance/singleLan"
success, errors, headers, response = merakiRequest(p_apiKey, "GET", endpoint, p_verbose=FLAG_REQUEST_VERBOSE)
return success, errors, headers, response
def updateSingleLan(p_apiKey, p_networkId, p_lanConfiguration):
#Updates MX LAN subnet configuration when VLANs are not enabled
endpoint = "/networks/" + p_networkId + "/appliance/singleLan"
body = {
"subnet": p_lanConfiguration["subnet"],
"applianceIp": p_lanConfiguration["applianceIp"]
}
success, errors, headers, response = merakiRequest(p_apiKey, "PUT", endpoint, p_requestBody=body, p_verbose=FLAG_REQUEST_VERBOSE)
return success, errors, headers, response
def getMxL3FirewallRules(p_apiKey, p_networkId):
endpoint = "/networks/" + p_networkId + "/appliance/firewall/l3FirewallRules"
success, errors, headers, response = merakiRequest(p_apiKey, "GET", endpoint, p_verbose=FLAG_REQUEST_VERBOSE)
return success, errors, headers, response
def matchDefaultL3FirewallRule(p_rule):
defaultRule = {
'comment': 'Default rule',
'policy': 'allow',
'protocol': 'Any',
'srcPort': 'Any',
'srcCidr': 'Any',
'destPort': 'Any',
'destCidr': 'Any',
'syslogEnabled': False
}
for item in defaultRule:
if not item in p_rule or p_rule[item] != defaultRule[item]:
return False
return True
def updateMxL3FirewallRules(p_apiKey, p_networkId, p_firewallRuleset):
endpoint = "/networks/" + p_networkId + "/appliance/firewall/l3FirewallRules"
if matchDefaultL3FirewallRule(p_firewallRuleset["rules"][-1]):
del p_firewallRuleset["rules"][-1]
success = True
errors = None
headers = None
response = None
if len(p_firewallRuleset["rules"]) > 0:
success, errors, headers, response = merakiRequest(p_apiKey, "PUT", endpoint, p_requestBody=p_firewallRuleset, p_verbose=FLAG_REQUEST_VERBOSE)
return success, errors, headers, response
def getMxL7FirewallRules(p_apiKey, p_networkId):
endpoint = "/networks/" + p_networkId + "/appliance/firewall/l7FirewallRules"
success, errors, headers, response = merakiRequest(p_apiKey, "GET", endpoint, p_verbose=FLAG_REQUEST_VERBOSE)
return success, errors, headers, response
def updateMxL7FirewallRules(p_apiKey, p_networkId, p_firewallRuleset):
endpoint = "/networks/" + p_networkId + "/appliance/firewall/l7FirewallRules"
success = True
errors = None
headers = None
response = None
if len(p_firewallRuleset["rules"]) > 0:
success, errors, headers, response = merakiRequest(p_apiKey, "PUT", endpoint, p_requestBody=p_firewallRuleset, p_verbose=FLAG_REQUEST_VERBOSE)
return success, errors, headers, response
def getMxIpsSettings(p_apiKey, p_networkId):
endpoint = "/networks/" + p_networkId + "/appliance/security/intrusion"
success, errors, headers, response = merakiRequest(p_apiKey, "GET", endpoint, p_verbose=FLAG_REQUEST_VERBOSE)
return success, errors, headers, response
def updateMxIpsSettings(p_apiKey, p_networkId, p_IpsSettings):
endpoint = "/networks/" + p_networkId + "/appliance/security/intrusion"
success, errors, headers, response = merakiRequest(p_apiKey, "PUT", endpoint, p_requestBody=p_IpsSettings, p_verbose=FLAG_REQUEST_VERBOSE)
return success, errors, headers, response
def getMxAmpSettings(p_apiKey, p_networkId):
endpoint = "/networks/" + p_networkId + "/appliance/security/malware"
success, errors, headers, response = merakiRequest(p_apiKey, "GET", endpoint, p_verbose=FLAG_REQUEST_VERBOSE)
return success, errors, headers, response
def updateMxAmpSettings(p_apiKey, p_networkId, p_AmpSettings):
endpoint = "/networks/" + p_networkId + "/appliance/security/malware"
success, errors, headers, response = merakiRequest(p_apiKey, "PUT", endpoint, p_requestBody=p_AmpSettings, p_verbose=FLAG_REQUEST_VERBOSE)
return success, errors, headers, response
def getMxContentFilteringSettings(p_apiKey, p_networkId):
endpoint = "/networks/" + p_networkId + "/appliance/contentFiltering"
success, errors, headers, response = merakiRequest(p_apiKey, "GET", endpoint, p_verbose=FLAG_REQUEST_VERBOSE)
return success, errors, headers, response
def updateMxContentFilteringSettings(p_apiKey, p_networkId, p_settings):
#NOTE: p_settings["blockedUrlCategories"] must be in the format returned by GET /networks/{netId}/appliance/contentFiltering
endpoint = "/networks/" + p_networkId + "/appliance/contentFiltering"
itemsToDelete = []
for item in p_settings:
if len(p_settings[item]) == 0:
itemsToDelete.append(item)
for item in itemsToDelete:
del p_settings[item]
#convert blockedUrlCategories records' format from dict to str
if "blockedUrlCategories" in p_settings:
categoryIds = []
for item in p_settings["blockedUrlCategories"]:
categoryIds.append(item["id"])
p_settings["blockedUrlCategories"] = categoryIds
success, errors, headers, response = merakiRequest(p_apiKey, "PUT", endpoint, p_requestBody=p_settings, p_verbose=FLAG_REQUEST_VERBOSE)
return success, errors, headers, response
def createNetwork(p_apiKey, p_organizationId, p_networkName, p_productTypes, p_timeZone=None, p_tags=None, p_copyFromNetworkId=None):
endpoint = "/organizations/" + p_organizationId + "/networks"
body = {
"name" : p_networkName,
"productTypes" : p_productTypes
}
if not p_copyFromNetworkId is None:
body["copyFromNetworkId"] = p_copyFromNetworkId
if not p_timeZone is None:
body["timeZone"] = p_timeZone
if not p_tags is None:
body["tags"] = p_tags
success, errors, headers, response = merakiRequest(p_apiKey, "POST", endpoint, p_requestBody=body, p_verbose=FLAG_REQUEST_VERBOSE)
return success, errors, headers, response
def getGroupPolicies(p_apiKey, p_networkId):
endpoint = "/networks/" + p_networkId + "/groupPolicies"
success, errors, headers, response = merakiRequest(p_apiKey, "GET", endpoint, p_verbose=FLAG_REQUEST_VERBOSE)
return success, errors, headers, response
def createGroupPolicy(p_apiKey, p_networkId, p_policy):
endpoint = "/networks/" + p_networkId + "/groupPolicies"
body = {}
for item in p_policy:
if item != "groupPolicyId":
body[item] = p_policy[item]
success, errors, headers, response = merakiRequest(p_apiKey, "POST", endpoint, p_requestBody=body, p_verbose=FLAG_REQUEST_VERBOSE)
return success, errors, headers, response
def getNewPolicyIdForOldId(p_oldId, p_oldPolicies, p_newPolicies):
name = None
for policy in p_oldPolicies:
if policy["groupPolicyId"] == p_oldId:
name = policy["name"]
break
if not name is None:
for policy in p_newPolicies:
if policy["name"] == name:
return policy["groupPolicyId"]
return None
def getMxStaticRoutes(p_apiKey, p_networkId):
endpoint = "/networks/" + p_networkId + "/appliance/staticRoutes"
success, errors, headers, response = merakiRequest(p_apiKey, "GET", endpoint, p_verbose=FLAG_REQUEST_VERBOSE)
return success, errors, headers, response
def createMxStaticRoute(p_apiKey, p_networkId, p_route):
endpoint = "/networks/" + p_networkId + "/appliance/staticRoutes"
body = {}
for item in p_route:
if item != "id":
body[item] = p_route[item]
success, errors, headers, response = merakiRequest(p_apiKey, "POST", endpoint, p_requestBody=body, p_verbose=FLAG_REQUEST_VERBOSE)
return success, errors, headers, response
def getMxTrafficShapingRules(p_apiKey, p_networkId):
endpoint = "/networks/" + p_networkId + "/appliance/trafficShaping/rules"
success, errors, headers, response = merakiRequest(p_apiKey, "GET", endpoint, p_verbose=FLAG_REQUEST_VERBOSE)
return success, errors, headers, response
def updateMxTrafficShapingRules(p_apiKey, p_networkId, p_settings):
endpoint = "/networks/" + p_networkId + "/appliance/trafficShaping/rules"
success, errors, headers, response = merakiRequest(p_apiKey, "PUT", endpoint, p_requestBody=p_settings, p_verbose=FLAG_REQUEST_VERBOSE)
return success, errors, headers, response
def getMxOneToOneNatRules(p_apiKey, p_networkId):
endpoint = "/networks/" + p_networkId + "/appliance/firewall/oneToOneNatRules"
success, errors, headers, response = merakiRequest(p_apiKey, "GET", endpoint, p_verbose=FLAG_REQUEST_VERBOSE)
return success, errors, headers, response
def updateMxOneToOneNatRules(p_apiKey, p_networkId, p_settings):
endpoint = "/networks/" + p_networkId + "/appliance/firewall/oneToOneNatRules"
success, errors, headers, response = merakiRequest(p_apiKey, "PUT", endpoint, p_requestBody=p_settings, p_verbose=FLAG_REQUEST_VERBOSE)
return success, errors, headers, response
def getMxOneToManyNatRules(p_apiKey, p_networkId):
endpoint = "/networks/" + p_networkId + "/appliance/firewall/oneToManyNatRules"
success, errors, headers, response = merakiRequest(p_apiKey, "GET", endpoint, p_verbose=FLAG_REQUEST_VERBOSE)
return success, errors, headers, response
def updateMxOneToManyNatRules(p_apiKey, p_networkId, p_settings):
endpoint = "/networks/" + p_networkId + "/appliance/firewall/oneToManyNatRules"
success, errors, headers, response = merakiRequest(p_apiKey, "PUT", endpoint, p_requestBody=p_settings, p_verbose=FLAG_REQUEST_VERBOSE)
return success, errors, headers, response
def getMxPortForwardingRules(p_apiKey, p_networkId):
endpoint = "/networks/" + p_networkId + "/appliance/firewall/portForwardingRules"
success, errors, headers, response = merakiRequest(p_apiKey, "GET", endpoint, p_verbose=FLAG_REQUEST_VERBOSE)
return success, errors, headers, response
def updateMxPortForwardingRules(p_apiKey, p_networkId, p_settings):
endpoint = "/networks/" + p_networkId + "/appliance/firewall/portForwardingRules"
success, errors, headers, response = merakiRequest(p_apiKey, "PUT", endpoint, p_requestBody=p_settings, p_verbose=FLAG_REQUEST_VERBOSE)
return success, errors, headers, response
def getOrganizationIdWithName(p_apiKey, p_organizationName):
success, errors, headers, response = getOrganizations(p_apiKey)
if not response is None:
for org in response:
if org["name"] == p_organizationName:
return org["id"]
print("%-30s %s" %("Organization ID","Organization Name"))
for org in response:
print('%-30s "%s"' %(org["id"],org["name"]))
return None
def printHelpAndExit():
print(readMe)
sys.exit(0)
def killScript():
print("ERROR: Execution interrupted.")
sys.exit(2)
def findUniqueNewNetName(p_sourceNetName, p_orgNetworks):
newNameRoot = p_sourceNetName + " - NO TEMPLATE"
newName = newNameRoot
doLoop = True
maxIterations = 100
i = 2
while(doLoop):
doLoop = False
networkIsUnique = True
for net in p_orgNetworks:
if net["name"] == newName:
networkIsUnique = False
break
if networkIsUnique:
return newName
else:
newName = newNameRoot + " " + str(i)
i += 1
if i < maxIterations:
doLoop = True
return None
def checkSubnetInKnownRoutes(p_subnet, p_routesList):
try:
sourceSubnet = ipaddress.ip_network(p_subnet)
except:
return None
for route in p_routesList:
try:
routeSubnet = ipaddress.ip_network(route)
except:
return None
if sourceSubnet.subnet_of(routeSubnet):
return True
return False
def cleanUnknownSourceSubnets(p_firewallRuleset, p_knownSubnets):
#Configuration templates allow for source IP addresses/subnets that are unroutable by a MX
#Such sources must be removed/modified, or copying rules will fail
cleanRuleset = []
for rule in p_firewallRuleset["rules"]:
splitSourceSubnets = rule["srcCidr"].split(",")
cleanSources = []
removedSubnets = []
for source in splitSourceSubnets:
isSubnet = False
try:
validationCheck = ipaddress.ip_network(source)
isSubnet = True
except:
if source.lower() != "any":
print('WARNING: "%s" is not an IPv4 subnet' % source)
if isSubnet:
success = checkSubnetInKnownRoutes(source, p_knownSubnets)
if success is None or not success:
removedSubnets.append(source)
print('WARNING: Firewall rule modified to remove unroutable source "%s"' % source)
else:
cleanSources.append(source)
else:
#source is "any" or a FQDN or something. Do not attempt to modify
cleanSources.append(source)
if len(cleanSources) > 0:
#form source CIDR string
cidr = ','.join(cleanSources)
#form comment
comment = ''
if len(removedSubnets) > 0:
comment = 'REMOVED SOURCE'
if len(removedSubnets) > 1:
comment += 'S'
for subnet in removedSubnets:
comment += ' ' + str(subnet)
comment += ' -- '
comment += rule["comment"]
#assemble rule with modified/cleaned up sources and comment
cleanRule = {}
for item in rule:
cleanRule[item] = rule[item]
cleanRule["srcCidr"] = cidr
cleanRule["comment"] = comment
#paste rule to ruleset
cleanRuleset.append(cleanRule)
else:
print("WARNING: Firewall rule removed: No routable source subnets")
print(rule)
return {"rules": cleanRuleset}
def main(argv):
#set default values for command line arguments
arg_apiKey = None
arg_orgName = None
arg_sourceNetName = None
try:
opts, args = getopt.getopt(argv, 'hk:o:n:')
except getopt.GetoptError:
printHelpAndExit()
for opt, arg in opts:
if opt == '-h':
printHelpAndExit()
elif opt == '-k':
arg_apiKey = arg
elif opt == '-o':
arg_orgName = arg
elif opt == '-n':
arg_sourceNetName = arg
if arg_apiKey is None or arg_orgName is None or arg_sourceNetName is None:
printHelpAndExit()
orgId = getOrganizationIdWithName(arg_apiKey, arg_orgName)
if orgId is None:
print('ERROR: No organization found with name "%s"' % arg_orgName)
killScript()
#find network
success, errors, headers, orgNetworks = getNetworks(arg_apiKey, orgId)
if orgNetworks is None:
print("ERROR: Unable to fetch networks' list")
killScript()
sourceNet = None
for net in orgNetworks:
if net["name"] == arg_sourceNetName:
sourceNet = net
if sourceNet is None:
print('ERROR: No network found with name "%s"' % arg_sourceNetName)
killScript()
if not "appliance" in sourceNet["productTypes"]:
print('ERROR: Source network needs to have security appliance configuration')
killScript()
newNetName = findUniqueNewNetName(arg_sourceNetName, orgNetworks)
if newNetName is None:
print('ERROR: Unable to create unique new network name')
killScript()
#create new network
success, errors, headers, newNetwork = createNetwork(arg_apiKey, orgId, newNetName, sourceNet["productTypes"],
p_timeZone=sourceNet["timeZone"], p_tags=sourceNet["tags"])
if newNetwork is None:
print('ERROR: Unable to create new network with name', newNetName)
killScript()
#- Copy Group policies
success, errors, headers, sourceGroupPolicies = getGroupPolicies(arg_apiKey, sourceNet["id"])
newGroupPolicies = []
if not sourceGroupPolicies is None:
for policy in sourceGroupPolicies:
success, errors, headers, response = createGroupPolicy(arg_apiKey, newNetwork["id"], policy)
if not response is None:
newGroupPolicies.append(response)
#- Copy VLANs and IP addressing
knownSubnets = []
success, errors, headers, response = getVlanSettings(arg_apiKey, sourceNet["configTemplateId"])
if not response is None:
if "vlansEnabled" in response and not response["vlansEnabled"]:
#Source does not have VLANs enabled. Copy the single subnet parameters
success, errors, headers, response = getSingleLan(arg_apiKey, sourceNet["id"])
if not response is None:
updateSingleLan(arg_apiKey, newNetwork["id"], response)
else:
success, errors, headers, response = updateVlanSettings(arg_apiKey, newNetwork["id"], True)
if not response is None:
success, errors, headers, sourceVlans = getVlans(arg_apiKey, sourceNet["id"])
if not sourceVlans is None:
noConfigForVlanOne = True
for vlan in sourceVlans:
knownSubnets.append(vlan["subnet"])
if "groupPolicyId" in vlan:
policyId = getNewPolicyIdForOldId(vlan["groupPolicyId"], sourceGroupPolicies, newGroupPolicies)
if not policyId is None:
vlan["groupPolicyId"] = policyId
else:
print("WARNING: Please check that VLAN group policies of new network are correct!")
if int(vlan["id"]) == 1:
updateVlan(arg_apiKey, newNetwork["id"], vlan["id"], vlan)
noConfigForVlanOne = False
else:
createVlan(arg_apiKey, newNetwork["id"], vlan)
if noConfigForVlanOne:
deleteVlan(arg_apiKey, newNetwork["id"], 1)
else:
print("WARNING: Skipping copying VLAN settings: Could not check if enabled")
#- Copy Static Routes
success, errors, headers, response = getMxStaticRoutes(arg_apiKey, sourceNet["id"])
for route in response:
gatewayIpResolvedCorrectly = False
try:
#At time of writing, the GET staticRoutes endpoint has a bug that can return an invalid gatewayIp
#If this bug is detected, skip processing of invalid route and print a warning
ipValidationCheck = ipaddress.ip_address(route["gatewayIp"])
gatewayIpResolvedCorrectly = True
except:
print("WARNING: Skipping route: Invalid route gateway IP address")
if gatewayIpResolvedCorrectly:
knownSubnets.append(route["subnet"])
createMxStaticRoute(arg_apiKey, newNetwork["id"], route)
#- Copy L3 Firewall Rules
success, errors, headers, sourceRules = getMxL3FirewallRules(arg_apiKey, sourceNet["id"])
if not sourceRules is None:
#Configuration templates allow for source IP addresses/subnets that are unroutable by a MX
#Such sources must be removed/modified, or copying rules will fail
cleanRules = cleanUnknownSourceSubnets(sourceRules, knownSubnets)
updateMxL3FirewallRules(arg_apiKey, newNetwork["id"], cleanRules)
#- L7 Firewall Rules
success, errors, headers, sourceRules = getMxL7FirewallRules(arg_apiKey, sourceNet["id"])
if not sourceRules is None:
if sourceRules["rules"] != []:
updateMxL7FirewallRules(arg_apiKey, newNetwork["id"], sourceRules)
#- Copy Threat Protection
success, errors, headers, sourceRules = getMxIpsSettings(arg_apiKey, sourceNet["id"])
if not sourceRules is None:
updateMxIpsSettings(arg_apiKey, newNetwork["id"], sourceRules)
success, errors, headers, sourceRules = getMxAmpSettings(arg_apiKey, sourceNet["id"])
if not sourceRules is None:
updateMxAmpSettings(arg_apiKey, newNetwork["id"], sourceRules)
#- Content Filtering
success, errors, headers, sourceRules = getMxContentFilteringSettings(arg_apiKey, sourceNet["id"])
if not sourceRules is None:
updateMxContentFilteringSettings(arg_apiKey, newNetwork["id"], sourceRules)
#- Traffic Shaping
success, errors, headers, sourceRules = getMxTrafficShapingRules(arg_apiKey, sourceNet["configTemplateId"])
if not sourceRules is None:
updateMxTrafficShapingRules(arg_apiKey, newNetwork["id"], sourceRules)
#- Port forwarding NAT rules
success, errors, headers, sourceRules = getMxPortForwardingRules(arg_apiKey, sourceNet["configTemplateId"])
if not sourceRules is None and sourceRules != []:
updateMxPortForwardingRules(arg_apiKey, newNetwork["id"], sourceRules)
#- 1-to-1 NAT rules
success, errors, headers, sourceRules = getMxOneToOneNatRules(arg_apiKey, sourceNet["configTemplateId"])
if not sourceRules is None and sourceRules != []:
updateMxOneToOneNatRules(arg_apiKey, newNetwork["id"], sourceRules)
#- 1-to-many NAT rules
success, errors, headers, sourceRules = getMxOneToManyNatRules(arg_apiKey, sourceNet["configTemplateId"])
if not sourceRules is None and sourceRules != []:
updateMxOneToManyNatRules(arg_apiKey, newNetwork["id"], sourceRules)
#- Copy Admin accounts
success, errors, headers, response = getAdministrators(arg_apiKey, orgId)
for admin in response:
if "networks" in admin:
for net in admin["networks"]:
if net["id"] == sourceNet["id"]:
privilege = {
"id" : newNetwork["id"],
"access" : net["access"]
}
admin["networks"].append(privilege)
updateAdministrator(arg_apiKey, orgId, admin["id"], admin)
print('\nCreated network "%s"' % newNetName)
print("End of script.")
if __name__ == '__main__':
main(sys.argv[1:])