forked from stackrox/k8s-i-use
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgather_information.py
154 lines (120 loc) · 5.22 KB
/
gather_information.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import json
from typing import NamedTuple, Dict
import sys
import os
def _validate_version(version: int):
assert type(version) is int and version >= 6
class GVK(NamedTuple):
group: str
version: str
kind: str
class FieldKey(NamedTuple):
gvk: GVK
field_name: str
class GVKWithVersions:
def __init__(self, gvk):
self.gvk = gvk
self.seen_in_versions = []
self.deprecated_in_versions = []
def mark_seen_in_version(self, version):
_validate_version(version)
self.seen_in_versions.append(version)
def mark_deprecated_in_version(self, version):
_validate_version(version)
self.deprecated_in_versions.append(version)
class Field:
def __init__(self, field_key):
self.field_key = field_key
self.seen_in_versions = []
self.deprecated_in_versions = []
def mark_seen_in_version(self, version):
_validate_version(version)
self.seen_in_versions.append(version)
def mark_deprecated_in_version(self, version):
_validate_version(version)
self.deprecated_in_versions.append(version)
def _process_fields_recursive(all_definitions: Dict, gvk: GVK, version: int, definition_key: str, all_fields: Dict[FieldKey, Field],
def_keys_seen_so_far=None, path_so_far=None):
if def_keys_seen_so_far is None:
def_keys_seen_so_far = set()
if path_so_far is None:
path_so_far = []
# Prevents cycles
if definition_key in def_keys_seen_so_far:
return
def_keys_seen_so_far.add(definition_key)
for field_name, field_desc in all_definitions[definition_key].get("properties", {}).items():
path_with_field_name = path_so_far + [field_name]
if field_desc.get("type") == "array":
ref = field_desc["items"].get("$ref")
else:
ref = field_desc.get("$ref")
if ref:
assert ref.startswith("#/definitions/")
_process_fields_recursive(all_definitions, gvk, version, ref[len("#/definitions/"):],
all_fields, def_keys_seen_so_far, path_with_field_name)
continue
field_key = FieldKey(gvk=gvk, field_name=".".join(path_with_field_name))
if field_key not in all_fields:
all_fields[field_key] = Field(field_key)
# TODO: improve this heuristic?
deprecated = "deprecated" in field_desc.get('description', '').lower()
if deprecated:
all_fields[field_key].mark_deprecated_in_version(version)
else:
all_fields[field_key].mark_seen_in_version(version)
def parse_swagger_file(path: str, all_gvks: Dict[GVK, GVKWithVersions], all_fields: Dict[FieldKey, Field]):
print(f"Parsing {path}...")
with open(path, 'r') as f:
swagger_loaded = json.load(f)
# this extracts version x out of paths that end in v1.x.json
version = int(path.split('.')[-2])
for key, desc in swagger_loaded['definitions'].items():
gvk_jsons = desc.get('x-kubernetes-group-version-kind')
if not gvk_jsons:
continue
# TODO: improve this heuristic?
deprecated = "deprecated" in desc.get('description', '').lower()
for gvk_json in gvk_jsons:
gvk = GVK(group=gvk_json["group"], version=gvk_json["version"], kind=gvk_json["kind"])
if gvk not in all_gvks:
all_gvks[gvk] = GVKWithVersions(gvk)
if deprecated:
all_gvks[gvk].mark_deprecated_in_version(version)
else:
all_gvks[gvk].mark_seen_in_version(version)
_process_fields_recursive(swagger_loaded['definitions'], gvk, version, key, all_fields)
def parse_files(dir, all_data_path):
all_gvks: Dict[GVK, GVKWithVersions] = {}
all_fields: Dict[FieldKey, Field] = {}
for filename in os.listdir(dir):
if not filename.endswith('.json'):
continue
parse_swagger_file(os.path.join(dir, filename), all_gvks, all_fields)
giant_json = []
for gvk, gvk_with_versions in all_gvks.items():
curr = {
"group": gvk.group, "version": gvk.version, "kind": gvk.kind,
"seen_in": sorted(gvk_with_versions.seen_in_versions), "deprecated_in": sorted(gvk_with_versions.deprecated_in_versions),
"fields": []
}
for field_key, field in all_fields.items():
if field_key.gvk == gvk:
curr["fields"].append({
"name": field_key.field_name,
"seen_in": sorted(field.seen_in_versions),
"deprecated_in": sorted(field.deprecated_in_versions),
})
giant_json.append(curr)
# write to frontend dir to be picked up by the React app
# a bit hacky but produces a valid JS data structure
all_data = os.path.join(all_data_path, "allData.js")
with open(all_data, "w") as out_ad:
print("Writing GVK info to frontend datastructure")
out_ad.write("export const allData = ")
json.dump(giant_json, out_ad)
out_ad.write(";")
# write to clean json - e.g. for further processing
with open("gvk_metadata.json", "w") as out_f:
print("Writing GVK info to json")
json.dump(giant_json, out_f)