-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathget_compliance_into_csv.py
308 lines (249 loc) · 8.74 KB
/
get_compliance_into_csv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
"""
This is an example of using the Fugue API to output compliance results as CSV.
Follow instructions in the API User Guide to create a client ID and secret
that are used to authenticate with Fugue:
https://docs.fugue.co/api.html#api-user-guide
The client ID and secret may be passed to this script using the following
environment variables: FUGUE_API_ID and FUGUE_API_SECRET. Alternatively,
edit the script itself to set the values directly in this script.
This script should be run using Python 3 however it could be modified for
Python 2 compatibility if needed.
One dependency must be installed using pip: the requests library.
* pip install requests
"""
from datetime import datetime
import json
import os
import requests
# Fugue API base URL
api_url = "https://api.riskmanager.fugue.co"
api_ver = 'v0'
# Client ID and secret used to authenticate with Fugue. Follow the guide here
# to create an API client: https://docs.fugue.co/api.html#getting-started
# You can set these values via environment variables or replace the os.getenv
# calls below with the string values themselves.
client_id = os.getenv('FUGUE_API_ID')
client_secret = os.getenv('FUGUE_API_SECRET')
if not client_id or not client_secret:
print('Please follow the user guide at https://docs.fugue.co/api.html#api-user-guide to set \'FUGUE_API_ID\' and \'FUGUE_API_SECRET\'')
exit(1)
# Authentication
# https://docs.fugue.co/api.html#auth-n
auth = (client_id, client_secret)
def get(path, params=None):
"""
Executes an authenticated GET request to the Fugue API with the provided
API path and query parameters.
"""
url = '%s/%s/%s' % (api_url, api_ver, path.strip('/'))
return requests.get(url, params=params, auth=auth).json()
def list_environments():
"""
Returns all environments present in your Fugue account.
https://docs.fugue.co/_static/swagger.html#tag-environments
"""
return get('environments')['items']
def list_scans(environment_id, max_items=10, status='SUCCESS'):
"""
Returns the most recent successful scans on the specified environment.
https://docs.fugue.co/_static/swagger.html#tag-scans
"""
params = {
'environment_id': environment_id,
'status': status,
'max_items': max_items,
}
return get('scans', params)['items']
def get_latest_scan(environment_id):
"""
Returns the most recent successful scan of the specified environment.
None is returned if there has not yet been a successful scan.
"""
scans = list_scans(environment_id, max_items=1)
if scans:
return scans[0]
return None
def get_compliance_by_rules(scan_id):
"""
Lists compliance results by rule for a scan.
"""
items = []
offset = 0
while True:
params = {'offset': offset}
response = get('scans/%s/compliance_by_rules' % scan_id, params)
items.extend(response['items'])
if not response['is_truncated']:
break
offset = response['next_offset']
return items
def format_message(message):
"""
Ensures the message does not have commas since that would interfere with
CSV formatting.
"""
return message.replace(',', ' ').replace('"', '')
def records_from_failed_type(family, control, failure):
"""
Builds a spreadsheet record for a failure for a resource type.
"""
return [dict(
family=family,
control=control,
resource_type=failure['resource_type'],
resource_id=None,
message=format_message(message),
) for message in failure['messages']]
def records_from_failed_resource(family, control, failure):
"""
Builds a spreadsheet record for a failure for a single resource.
"""
return [dict(
family=family,
control=control,
resource_type=failure['resource']['resource_type'],
resource_id=failure['resource']['resource_id'],
message=format_message(message),
) for message in failure['messages']]
def records_from_unsurveyed_type(family, control, resource_type):
"""
Builds a spreadsheet record for a resource type that was not surveyed.
"""
return [dict(
family=family,
control=control,
resource_type=resource_type,
resource_id=None,
message=format_message('Resource type was not scanned'),
)]
def records_from_rule(rule):
"""
Generator that yields spreadsheet records for a given compliance rule.
"""
family = rule['family']
control = rule['rule']
for failure in rule['failed_resource_types']:
for record in records_from_failed_type(family, control, failure):
yield record
for failure in rule['failed_resources']:
for record in records_from_failed_resource(family, control, failure):
yield record
for failure in rule['unsurveyed_resource_types']:
for record in records_from_unsurveyed_type(family, control, failure):
yield record
def record_with_metadata(record, environment, scan):
"""
Adds environment and scan metadata to a compliance record.
"""
day, tod = date_from_timestamp(scan['finished_at'])
record.update(dict(
environment_id=environment['id'],
environment_name=environment['name'],
account=account_from_environment(environment),
region=region_from_environment(environment),
scan_id=scan['id'],
day=day,
time=tod,
))
return record
def account_from_environment(environment):
"""
Returns the AWS account ID or Azure subscription ID of the environment.
"""
provider_opts = environment['provider_options']
if environment['provider'] == 'aws':
return account_from_role_arn(provider_opts['aws']['role_arn'])
elif environment['provider'] == 'aws_govcloud':
return account_from_role_arn(provider_opts['aws_govcloud']['role_arn'])
elif environment['provider'] == 'azure':
return provider_opts['azure']['subscription_id']
return '-'
def region_from_environment(environment):
"""
Returns the AWS region of the environment.
"""
provider_opts = environment['provider_options']
if environment['provider'] == 'aws':
if 'region' in provider_opts['aws']:
return provider_opts['aws']['region']
else:
return ','.join(provider_opts['aws']['regions'])
elif environment['provider'] == 'aws_govcloud':
return provider_opts['aws_govcloud']['region']
return '-'
def account_from_role_arn(role_arn):
"""
Returns the AWS account ID portion of the given IAM role ARN.
"""
parts = role_arn.split(':')
if len(parts) == 6:
return parts[4]
return '-'
def date_from_timestamp(ts):
"""
Returns a tuple containing (date, time) strings for a given Unix timestamp.
"""
day = datetime.utcfromtimestamp(ts).strftime('%Y-%m-%d')
tod = datetime.utcfromtimestamp(ts).strftime('%H:%M:%S')
return (day, tod)
# Columns to be output as CSV
COLUMNS = [
'environment_name',
'account',
'region',
'family',
'control',
'resource_type',
'resource_id',
'day',
'time',
'message',
'environment_id',
'scan_id',
]
def value_or_default(value, default='-'):
if value is not None:
return value
return default
def quote_csv_value(value):
"""
Surrounds a string value with quoting to avoid excel autoformatting.
Ref: https://stackoverflow.com/a/165052/9806588
"""
if len(value) < 64:
return '"=""%s"""' % value
return value
def csv(values):
return ",".join(values)
def format_value(column_name, value):
# All columns but the message column should be wrapped with double quotes
# to avoid excel autoformatting behavior.
value = value_or_default(value)
if column_name != 'message':
value = quote_csv_value(value)
return ' '.join(value.split())
def format(record, fmt='csv'):
if fmt == 'csv':
return csv([format_value(col, record[col]) for col in COLUMNS])
else:
return json.dumps(record)
def main():
"""
Loop over all Fugue environments in your account and output compliance
results from the most recent scan in each. Output is in CSV format.
"""
now = datetime.now().strftime('%Y-%m-%d-%H%M%S')
filename = 'compliance-%s.csv' % now
with open(filename, 'w') as f:
print(csv(COLUMNS), file=f)
for env in list_environments():
scan = get_latest_scan(env['id'])
if not scan:
continue
for rule in get_compliance_by_rules(scan['id']):
for record in records_from_rule(rule):
record = record_with_metadata(record, env, scan)
print(format(record), file=f)
print('Wrote %s' % f.name)
if __name__ == '__main__':
main()