forked from openebs/rawfile-localpv
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrawfile_util.py
136 lines (97 loc) · 3.41 KB
/
rawfile_util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import glob
import json
from os.path import basename, dirname
from pathlib import Path
import time
from consts import DATA_DIR
from declarative import be_absent
from fs_util import path_stats
from volume_schema import migrate_to, LATEST_SCHEMA_VERSION
from util import run, run_out
def img_dir(volume_id):
return Path(f"{DATA_DIR}/{volume_id}")
def meta_file(volume_id):
return Path(f"{img_dir(volume_id)}/disk.meta")
def metadata(volume_id):
try:
return json.loads(meta_file(volume_id).read_text())
except FileNotFoundError:
return {}
def img_file(volume_id):
return Path(metadata(volume_id)["img_file"])
def destroy(volume_id, dry_run=True):
print(f"Destroying {volume_id}")
if not dry_run:
be_absent(img_file(volume_id))
be_absent(meta_file(volume_id))
be_absent(img_dir(volume_id))
def gc_if_needed(volume_id, dry_run=True):
meta = metadata(volume_id)
deleted_at = meta.get("deleted_at", None)
gc_at = meta.get("gc_at", None)
if deleted_at is None or gc_at is None:
return False
now = time.time()
if gc_at <= now:
destroy(volume_id, dry_run=dry_run)
return False
def update_metadata(volume_id: str, obj: dict) -> dict:
meta_file(volume_id).write_text(json.dumps(obj))
return obj
def patch_metadata(volume_id: str, obj: dict) -> dict:
old_data = metadata(volume_id)
new_data = {**old_data, **obj}
return update_metadata(volume_id, new_data)
def migrate_metadata(volume_id, target_version):
old_data = metadata(volume_id)
new_data = migrate_to(old_data, target_version)
return update_metadata(volume_id, new_data)
def attached_loops(file: str) -> [str]:
out = run_out(f"losetup -j {file}").stdout.decode()
lines = out.splitlines()
devs = [line.split(":", 1)[0] for line in lines]
return devs
def attach_loop(file) -> str:
def next_loop():
loop_file = run_out(f"losetup -f").stdout.decode().strip()
if not Path(loop_file).exists():
pfx_len = len("/dev/loop")
loop_dev_id = loop_file[pfx_len:]
run(f"mknod {loop_file} b 7 {loop_dev_id}")
return loop_file
while True:
devs = attached_loops(file)
if len(devs) > 0:
return devs[0]
next_loop()
run(f"losetup --direct-io=on -f {file}")
def detach_loops(file) -> None:
devs = attached_loops(file)
for dev in devs:
run(f"losetup -d {dev}")
def list_all_volumes():
metas = glob.glob(f"{DATA_DIR}/*/disk.meta")
return [basename(dirname(meta)) for meta in metas]
def migrate_all_volume_schemas():
target_version = LATEST_SCHEMA_VERSION
for volume_id in list_all_volumes():
migrate_metadata(volume_id, target_version)
def gc_all_volumes(dry_run=True):
for volume_id in list_all_volumes():
gc_if_needed(volume_id, dry_run=dry_run)
def get_volumes_stats() -> [dict]:
volumes_stats = {}
for volume_id in list_all_volumes():
file = img_file(volume_id=volume_id)
stats = file.stat()
volumes_stats[volume_id] = {
"used": stats.st_blocks * 512,
"total": stats.st_size,
}
return volumes_stats
def get_capacity():
disk_free_size = path_stats(DATA_DIR)["fs_avail"]
capacity = disk_free_size
for volume_stat in get_volumes_stats().values():
capacity -= volume_stat["total"] - volume_stat["used"]
return capacity