-
-
Notifications
You must be signed in to change notification settings - Fork 184
/
Copy pathcw_set_retention_policy.py
executable file
·177 lines (143 loc) · 5.75 KB
/
cw_set_retention_policy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# https://github.com/dannysteenman/aws-toolbox
#
# License: MIT
#
# This script sets CloudWatch Logs Retention Policy for log groups in the configured AWS region.
# It can set a specific retention period for all log groups or print a summary of current retention periods.
#
# Features:
# - Set retention from 1 day to 10 years
# - Print retention count summary
# - Detailed logging and verification of updates
#
# Usage:
# 1. Set retention: python script_name.py --retention <days>
# 2. Print retention counts: python script_name.py --print-retention-counts
import argparse
import sys
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
import boto3
import botocore
cloudwatch = boto3.client("logs")
def get_cloudwatch_log_groups():
kwargs = {"limit": 50}
cloudwatch_log_groups = []
while True: # Paginate
response = cloudwatch.describe_log_groups(**kwargs)
cloudwatch_log_groups += [log_group for log_group in response["logGroups"]]
if "nextToken" in response:
kwargs["nextToken"] = response["nextToken"]
else:
break
return cloudwatch_log_groups
def update_log_group_retention(group, retention):
try:
if "retentionInDays" not in group or group["retentionInDays"] != retention:
cloudwatch.put_retention_policy(logGroupName=group["logGroupName"], retentionInDays=retention)
# Verify the update
updated_group = cloudwatch.describe_log_groups(logGroupNamePrefix=group["logGroupName"])["logGroups"][0]
if updated_group.get("retentionInDays") == retention:
return f"Successfully updated retention for: {group['logGroupName']}"
else:
return f"Failed to update retention for: {group['logGroupName']}. Current retention: {updated_group.get('retentionInDays')}"
else:
return (
f"CloudWatch Loggroup: {group['logGroupName']} already has the specified retention of {retention} days."
)
except botocore.exceptions.ClientError as e:
return f"Error updating {group['logGroupName']}: {e}"
def count_retention_periods(cloudwatch_log_groups):
retention_counts = defaultdict(int)
for group in cloudwatch_log_groups:
retention = group.get("retentionInDays", "Not set")
retention_counts[retention] += 1
return retention_counts
def cloudwatch_set_retention(args):
cloudwatch_log_groups = get_cloudwatch_log_groups()
if args.print_retention_counts:
retention_counts = count_retention_periods(cloudwatch_log_groups)
print("Retention periods and log group counts:")
# Separate 'Not set' from other retention periods
not_set_count = retention_counts.pop("Not set", 0)
# Sort the remaining items (all integers now)
sorted_items = sorted(retention_counts.items(), key=lambda x: int(x[0]))
# Print 'Not set' first if it exists
if not_set_count:
print(f"Retention: Not set, Count: {not_set_count}")
# Print the rest of the sorted items
for retention_period, count in sorted_items:
print(f"Retention: {retention_period} days, Count: {count}")
return
retention = vars(args)["retention"]
groups_to_update = [
group
for group in cloudwatch_log_groups
if "retentionInDays" not in group or group["retentionInDays"] != retention
]
if not groups_to_update:
print(f"All log groups already have the specified retention of {retention} days.")
return
print(f"Log groups that need to be updated to {retention} days retention:")
for group in groups_to_update:
current_retention = group.get("retentionInDays", "Not set")
print(f" {group['logGroupName']} (current retention: {current_retention})")
if input("\nDo you want to proceed with the update? (y/n): ").lower() != "y":
print("Update cancelled.")
return
updated_count = 0
failed_count = 0
with ThreadPoolExecutor(max_workers=10) as executor:
future_to_group = {
executor.submit(update_log_group_retention, group, retention): group for group in groups_to_update
}
for future in as_completed(future_to_group):
result = future.result()
print(result)
if "Successfully updated" in result:
updated_count += 1
else:
failed_count += 1
print(f"\nAttempted to update {len(groups_to_update)} log groups.")
print(f"Successfully updated: {updated_count}")
print(f"Failed to update: {failed_count}")
if failed_count > 0:
print("\nSome updates failed. Please check the output above for details.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Set a retention in days for all your CloudWatch Logs in a single region."
)
parser.add_argument(
"--retention",
type=int,
choices=[
1,
3,
5,
7,
14,
30,
60,
90,
120,
150,
180,
365,
400,
545,
731,
1827,
3653,
],
help="Enter the retention in days for the CloudWatch Logs.",
)
parser.add_argument(
"--print-retention-counts", action="store_true", help="Print the number of log groups for each retention period"
)
if len(sys.argv) == 1:
parser.print_help(sys.stderr)
sys.exit(1)
args = parser.parse_args()
if args.print_retention_counts and args.retention is not None:
parser.error("--print-retention-counts cannot be used with --retention argument")
cloudwatch_set_retention(args)