-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathCP-to-FMC-Network-Object-Import.py
executable file
·505 lines (370 loc) · 17.3 KB
/
CP-to-FMC-Network-Object-Import.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#####################
# ABOUT THIS SCRIPT #
#####################
#
# CP-to-FMC-Network-Object-Import.py
# ----------------
# Author: Alan Nix
# Property of: Cisco Systems
# Version: 1.0
# Release Date: 09/19/2017
#
# Summary
# -------
#
# This script will use API calls in the Checkpoint manager to pull in Host, Network, Address Range and Group objects.
# We then manipulate those objects and import them into a Firepower Management Console (FMC)
#
# Requirements
# ------------
#
# 1) Must have Python 3.x installed.
# 2) Must have 'requests' Python module installed.
# You'll probably want to set up a virtual environment (https://docs.python.org/3/tutorial/venv.html)
# - wget https://bootstrap.pypa.io/get-pip.py
# - python get-pip.py (may need to use 'sudo')
# - pip install requests (may need to use 'sudo')
# 3) Must have API access to a Checkpoint SmartConsole
# 4) Must have API access to a Firepower Management Console
#
# How To Run
# ----------
#
# 1) Configure Checkpoint Manager IP
# - Optionally, you can statically assign a username and password for "zero touch"
# 2) Configure Firepower Management Console IP
# - Optionally, you can statically assign a username and password for "zero touch"
# 3) Set this file to be executable.
# 4) Run it.
#
############################################################
import getpass
import logging
import json
import os
import requests
from requests.auth import HTTPBasicAuth
from requests.packages import urllib3
# Disable SSL Cert warnings
urllib3.disable_warnings()
###################
# CONFIGURATION #
###################
#
# ---------------------------------------------------- #
#
# Logging Parameters
logging.basicConfig(filename='CP-to-FMC-Network-Object-Import.log', filemode='w', level=logging.INFO)
# File Paramters
UUID_MAP_FILE = "network_objects_uuid_map.json"
# Object Prefix
OBJECT_PREFIX = ""
# Checkpoint Console Variables
CP_IP = ""
CP_USERNAME = None
CP_PASSWORD = None
CP_AUTH_TOKEN = None
# Firepower Management Console Variables
FMC_IP = ""
FMC_USERNAME = None
FMC_PASSWORD = None
FMC_AUTH_TOKEN = None
FMC_DOMAIN_UUID = None
#
# ---------------------------------------------------- #
###################
# FUNCTIONS #
###################
def getAuthTokenCP():
'''A function to get the authentication token from Checkpoint'''
global CP_AUTH_TOKEN
logging.info('Fetching Authentication Token from Checkpoint...')
# Build HTTP Headers
auth_headers = {'Content-Type': 'application/json'}
# Build the Authentication JSON data
auth_data = {'user': CP_USERNAME, 'password': CP_PASSWORD}
# Build the Authentication URL
auth_url = "https://{}/web_api/login".format(CP_IP)
try:
http_req = requests.post(url=auth_url, headers=auth_headers, json=auth_data, verify=False)
logging.debug('Checkpoint Auth Response: {}'.format(http_req.json()))
CP_AUTH_TOKEN = http_req.json()
CP_AUTH_TOKEN = CP_AUTH_TOKEN['sid']
# If we didn't get a token, then something went wrong
if CP_AUTH_TOKEN is None:
print('Authentication Token Not Found...')
logging.error('Authentication Token Not Found. Exiting...')
exit()
logging.info('Authentication Token Successfully Fetched.')
except Exception as err:
print('Error fetching auth token from Checkpoint: ' + str(err))
logging.error('Error fetching auth token from Checkpoint: ' + str(err))
exit()
def postObjectCallCP(endpoint, json_data=None):
'''A function to get data from Checkpoint'''
print('Fetching {} data from Checkpoint.'.format(endpoint))
logging.info('Fetching {} data from Checkpoint.'.format(endpoint))
# If there's no CP Authentication Token, then fetch one
if CP_AUTH_TOKEN is None:
getAuthTokenCP()
# Build endpoint URL
endpoint_url = "https://{}/web_api/{}".format(CP_IP, endpoint)
# Build headers with the access token
headers = {'Content-Type': 'application/json', 'X-chkp-sid': CP_AUTH_TOKEN}
try:
http_req = requests.post(url=endpoint_url, headers=headers, json=json_data, verify=False)
# Check to make sure the POST was successful
if http_req.status_code >= 200 and http_req.status_code < 300:
logging.info('Request succesfully sent to Checkpoint.')
logging.debug('HTTP Response: ' + str(http_req.text))
return http_req.json()
else:
print("Checkpoint Connection Failure - HTTP Return Code: {}\nResponse: {}".format(http_req.status_code, http_req.json()))
logging.error("Checkpoint Connection Failure - HTTP Return Code: {}\nResponse: {}".format(http_req.status_code, http_req.json()))
exit()
except Exception as err:
print('Error posting request to Checkpoint: ' + str(err))
logging.error('Error posting request to Checkpoint: ' + str(err))
exit()
def getAllObjectsCP(endpoint):
'''A function to get all paginated data from Checkpoint'''
logging.info('Fetching all objects from the {} endpoint.'.format(endpoint))
# Query Loop Parameters
query_limit = 500
query_offset = 0
returned_objects = query_limit
# Complete Object List
object_list = []
# Loop through all objects
while (returned_objects == query_limit):
# Build the post data
pagination_data = {'limit': query_limit, 'offset': query_offset, "details-level": "full"}
logging.info('Submitting request to {} with following parameters: {}'.format(endpoint, str(pagination_data)))
# Get the objects from Checkpoint
object_response = postObjectCallCP(endpoint, json_data=pagination_data)
# Iterate through returned objects
for current_object in object_response['objects']:
# Append the current object chunk to our list
object_list.append(current_object)
# Update the number of returned objects
returned_objects = len(object_response['objects'])
# Increment the query offset
query_offset += query_limit
return object_list
def getAuthTokenFMC():
'''A function to get the authentication token from the FMC'''
global FMC_AUTH_TOKEN, FMC_DOMAIN_UUID
logging.info('Fetching Authentication Token from FMC...')
# Build HTTP Authentication Instance
auth = HTTPBasicAuth(FMC_USERNAME, FMC_PASSWORD)
# Build HTTP Headers
auth_headers = {'Content-Type': 'application/json'}
# Build URL for Authentication
auth_url = "https://{}/api/fmc_platform/v1/auth/generatetoken".format(FMC_IP)
try:
http_req = requests.post(url=auth_url, auth=auth, headers=auth_headers, verify=False)
logging.debug('FMC Auth Response: ' + str(http_req.headers))
FMC_AUTH_TOKEN = http_req.headers.get('X-auth-access-token', default=None)
FMC_DOMAIN_UUID = http_req.headers.get('DOMAIN_UUID', default=None)
# If we didn't get a token, then something went wrong
if FMC_AUTH_TOKEN is None:
print('Authentication Token Not Found...')
logging.error('Authentication Token Not Found. Exiting...')
exit()
logging.info('Authentication Token Successfully Fetched.')
except Exception as err:
print('Error fetching auth token from FMC: ' + str(err))
logging.error('Error fetching auth token from FMC: ' + str(err))
exit()
def ObjectCallFMC(method, endpoint, json_data=None):
'''A function to modify an Object in the FMC'''
print('Submitting ' + str(endpoint) + ' Object to the FMC via ' + str(method) + ' request. Data: ' + json.dumps(json_data))
logging.info('Submitting ' + str(endpoint) + ' Object to the FMC via ' + str(method) + ' request. Data: ' + json.dumps(json_data))
# If there's no FMC Authentication Token, then fetch one
if FMC_AUTH_TOKEN is None:
getAuthTokenFMC()
# Build URL for Object endpoint
endpoint_url = "https://{}/api/fmc_config/v1/domain/{}/object/{}".format(FMC_IP, FMC_DOMAIN_UUID, endpoint)
# Build new headers with the access token
headers = {'Content-Type': 'application/json', 'X-auth-access-token': FMC_AUTH_TOKEN}
try:
if method is 'POST':
http_req = requests.post(url=endpoint_url, headers=headers, json=json_data, verify=False)
elif method is 'PUT':
http_req = requests.put(url=endpoint_url, headers=headers, json=json_data, verify=False)
elif method is 'DELETE':
http_req = requests.delete(url=endpoint_url, headers=headers, json=json_data, verify=False)
else:
http_req = requests.get(url=endpoint_url, headers=headers, json=json_data, verify=False)
# Check to make sure the POST was successful
if http_req.status_code >= 200 and http_req.status_code < 300:
logging.info('Request succesfully sent to FMC.')
logging.debug('HTTP Response: ' + str(http_req.text))
return http_req.json()
else:
print("FMC Connection Failure - HTTP Return Code: {}\nResponse: {}".format(http_req.status_code, http_req.json()))
logging.error("FMC Connection Failure - HTTP Return Code: {}\nResponse: {}".format(http_req.status_code, http_req.json()))
exit()
except Exception as err:
print('Error posting request to FMC: ' + str(err))
logging.error('Error posting request to FMC: ' + str(err))
exit()
def storeUUIDs(uuid_map):
'''A function to store our UUID translations to file'''
with open(UUID_MAP_FILE, 'w') as output_file:
json.dump(uuid_map, output_file)
###################
# !!! DO WORK !!! #
###################
if __name__ == "__main__":
'''Parse JSON from Checkpoint, manipulate it, and import it into FMC'''
logging.info('Starting Network Object Import...')
# If not hard coded, get the Checkpoint Username and Password
if CP_USERNAME is None:
CP_USERNAME = input("Checkpoint Username:")
if CP_PASSWORD is None:
CP_PASSWORD = getpass.getpass("Checkpoint Password:")
# If not hard coded, get the FMC Username and Password
if FMC_USERNAME is None:
FMC_USERNAME = input("Firepower Username:")
if FMC_PASSWORD is None:
FMC_PASSWORD = getpass.getpass("Firepower Password:")
logging.info('Fetching Network Objects from Checkpoint...')
# Get all of the data sets from Checkpoint
hosts_json = getAllObjectsCP('show-hosts')
networks_json = getAllObjectsCP('show-networks')
address_ranges_json = getAllObjectsCP('show-address-ranges')
network_groups_json = getAllObjectsCP('show-groups')
logging.info('Network Object Fetch Complete...')
# If we have a stored UUID_MAP, then use it, otherwise create an empty one
if os.path.isfile(UUID_MAP_FILE):
# Open the UUID_MAP file and load it
with open(UUID_MAP_FILE, 'r') as uuid_file:
UUID_MAP = json.loads(uuid_file.read())
else:
# Make a placeholder UUID_MAP
UUID_MAP = {
'Group': {},
'Host': {},
'Network': {},
'Range': {},
}
GROUP_OBJECTS = {}
HOST_OBJECTS = {}
NETWORK_OBJECTS = {}
ADDRESS_RANGE_OBJECTS = {}
logging.info('Transforming Host Object Data...')
# Iterate through each Host object and transform it into FMC format
for host in hosts_json:
logging.info('Processing Host Object with UUID: {}'.format(host['uid']))
# Check to see if we already imported this Host
if host['uid'] not in UUID_MAP['Host'].keys():
if 'ipv4-address' in host:
value = host['ipv4-address']
else:
value = host['ipv6-address']
# Add the host JSON to the host objects dictionary
HOST_OBJECTS[host['uid']] = {
'description': host['comments'],
'name': OBJECT_PREFIX + host['name'],
'type': 'Host',
'value': value,
}
logging.info('Submitting Hosts to the Firepower Management Console...')
# Post each Host to the FMC
for host_uuid, host_json in HOST_OBJECTS.items():
# Post the Host to the FMC and get the JSON response
host_response = ObjectCallFMC('POST', 'hosts', host_json)
# Store the UUID mapping for the newly created Host object
UUID_MAP['Host'][host_uuid] = host_response['id']
# Write the UUID_MAP to disk
storeUUIDs(UUID_MAP)
logging.info('Transforming Network Object Data...')
# Iterate through each Network object and transform it into FMC format
for network in networks_json:
logging.info('Processing Network Object with UUID: {}'.format(network['uid']))
# Check to see if we already imported this Network
if network['uid'] not in UUID_MAP['Network'].keys():
if 'subnet4' in network:
value = network['subnet4'] + '/' + str(network['mask-length4'])
else:
value = network['subnet6'] + '/' + str(network['mask-length6'])
# Add the network JSON to the network objects dictionary
NETWORK_OBJECTS[network['uid']] = {
'description': network['comments'],
'name': OBJECT_PREFIX + network['name'],
'type': 'Network',
'value': value,
}
logging.info('Submitting Networks to the Firepower Management Console...')
# Post each Network to the FMC
for network_uuid, network_json in NETWORK_OBJECTS.items():
# Post the Network to the FMC and get the JSON response
network_response = ObjectCallFMC('POST', 'networks', network_json)
# Store the UUID mapping for the newly created Network object
UUID_MAP['Network'][network_uuid] = network_response['id']
# Write the UUID_MAP to disk
storeUUIDs(UUID_MAP)
logging.info('Transforming Address Range Object Data...')
# Iterate through each Address Range object and transform it into FMC format
for address_range in address_ranges_json:
logging.info('Processing Address Range Object with UUID: {}'.format(address_range['uid']))
# Check to see if we already imported this Address Range
if address_range['uid'] not in UUID_MAP['Range'].keys():
if 'ipv4-address-first' in address_range:
value = address_range['ipv4-address-first'] + '-' + address_range['ipv4-address-last']
else:
value = address_range['ipv6-address-first'] + '-' + address_range['ipv6-address-last']
# Add the address range JSON to the address range objects dictionary
ADDRESS_RANGE_OBJECTS[address_range['uid']] = {
'description': address_range['comments'],
'name': OBJECT_PREFIX + address_range['name'],
'type': 'Range',
'value': value,
}
logging.info('Submitting Address Ranges to the Firepower Management Console...')
# Post each Address Range to the FMC
for address_range_uuid, address_range_json in ADDRESS_RANGE_OBJECTS.items():
# Post the Address Range to the FMC and get the JSON response
address_range_response = ObjectCallFMC('POST', 'ranges', address_range_json)
# Store the UUID mapping for the newly created Address Range object
UUID_MAP['Range'][address_range_uuid] = address_range_response['id']
# Write the UUID_MAP to disk
storeUUIDs(UUID_MAP)
logging.info('Transforming Network Group Data...')
# Iterate through each Group object and transform it into FMC format
for group in network_groups_json:
logging.info('Processing Group Object with UUID: {}'.format(group['uid']))
# Check to see if we already imported the Group
if group['uid'] not in UUID_MAP['Group'].keys():
# Add the group JSON to the group objects dictionary
GROUP_OBJECTS[group['uid']] = {
'description': group['comments'],
'name': OBJECT_PREFIX + group['name'],
'objects': [],
}
# Add each member's UUID to the objects of the group dictionary
for member in group['members']:
if member['uid'] in UUID_MAP['Host'].keys() or member['uid'] in UUID_MAP['Network'].keys() or member['uid'] in UUID_MAP['Range'].keys():
# Get the member type and capitalize the first letter
member_type = member['type'].title()
# Checkpoint calls ranges "Address-Range" - so convert it
if member_type == "Address-Range":
member_type = "Range"
# Add the translated object UUID to the objects list
GROUP_OBJECTS[group['uid']]['objects'].append({
'type': member_type,
'id': UUID_MAP[member_type][member['uid']],
})
else:
continue
# Post each Group to the FMC
for group_uuid, group_json in GROUP_OBJECTS.items():
# Post the Group to the FMC and get the JSON response
group_response = ObjectCallFMC('POST', 'networkgroups', group_json)
# Store the UUID mapping for the newly created Group object
UUID_MAP['Group'][group_uuid] = group_response['id']
# Write the UUID_MAP to disk
storeUUIDs(UUID_MAP)