-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathcloudflare.py
134 lines (95 loc) · 3.86 KB
/
cloudflare.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
from typing import List
import requests
import logging
import os
logger = logging.getLogger("cloudflare")
from dotenv import load_dotenv
load_dotenv()
CF_API_TOKEN = os.getenv("CF_API_TOKEN") or os.environ.get("CF_API_TOKEN")
CF_IDENTIFIER = os.getenv("CF_IDENTIFIER") or os.environ.get("CF_IDENTIFIER")
if not CF_API_TOKEN or not CF_IDENTIFIER:
raise Exception("Missing Cloudflare credentials")
session = requests.Session()
session.headers.update({"Authorization": f"Bearer {CF_API_TOKEN}"})
def get_lists(name_prefix: str):
r = session.get(
f"https://api.cloudflare.com/client/v4/accounts/{CF_IDENTIFIER}/gateway/lists",
)
logger.debug(f"[get_lists] {r.status_code}")
if r.status_code != 200:
raise Exception("Failed to get Cloudflare lists")
lists = r.json()["result"] or []
return [l for l in lists if l["name"].startswith(name_prefix)]
def create_list(name: str, domains: List[str]):
r = session.post(
f"https://api.cloudflare.com/client/v4/accounts/{CF_IDENTIFIER}/gateway/lists",
json={
"name": name,
"description": "Created by script.",
"type": "DOMAIN",
"items": [*map(lambda d: {"value": d}, domains)],
},
)
logger.debug(f"[create_list] {r.status_code}")
if r.status_code != 200:
raise Exception("Failed to create Cloudflare list: " + str(r.content))
print ("Created list " + name)
return r.json()["result"]
def delete_list(list_id: str):
r = session.delete(
f"https://api.cloudflare.com/client/v4/accounts/{CF_IDENTIFIER}/gateway/lists/{list_id}",
)
logger.debug(f"[delete_list] {r.status_code}")
if r.status_code != 200:
raise Exception("Failed to delete Cloudflare list: " + str(r.content))
return r.json()["result"]
def get_firewall_policies(name_prefix: str):
r = session.get(
f"https://api.cloudflare.com/client/v4/accounts/{CF_IDENTIFIER}/gateway/rules",
)
logger.debug(f"[get_firewall_policies] {r.status_code}")
if r.status_code != 200:
raise Exception("Failed to get Cloudflare firewall policies")
lists = r.json()["result"] or []
return [l for l in lists if l["name"].startswith(name_prefix)]
def delete_firewall_policy(policy_id: str):
r = session.delete(
f"https://api.cloudflare.com/client/v4/accounts/{CF_IDENTIFIER}/gateway/rules/{policy_id}",
)
logger.debug(f"[delete_policy] {r.status_code}")
if r.status_code != 200:
raise Exception("Failed to delete Cloudflare policy")
return r.json()["result"]
def create_gateway_policy(name: str, list_ids: List[str]):
r = session.post(
f"https://api.cloudflare.com/client/v4/accounts/{CF_IDENTIFIER}/gateway/rules",
json={
"name": name,
"description": "Created by script.",
"action": "block",
"enabled": True,
"filters": ["dns"],
"traffic": "or".join([f"any(dns.domains[*] in ${l})" for l in list_ids]),
"rule_settings": {
"block_page_enabled": False,
},
},
)
logger.debug(f"[create_gateway_policy] {r.status_code}")
if r.status_code != 200:
raise Exception("Failed to create Cloudflare firewall policy")
return r.json()["result"]
def update_gateway_policy(name: str, policy_id: str, list_ids: List[str]):
r = session.put(
f"https://api.cloudflare.com/client/v4/accounts/{CF_IDENTIFIER}/gateway/rules/{policy_id}",
json={
"name": name,
"action": "block",
"enabled": True,
"traffic": "or".join([f"any(dns.domains[*] in ${l})" for l in list_ids]),
},
)
logger.debug(f"[update_gateway_policy] {r.status_code}")
if r.status_code != 200:
raise Exception("Failed to update Cloudflare firewall policy")
return r.json()["result"]