-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutil.xsh
executable file
·63 lines (55 loc) · 1.54 KB
/
util.xsh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
#!/usr/bin/env xonsh
import xonsh
import pathlib
import contextlib
import re
import os
import subprocess
import time
base_path = pathlib.Path(__file__).resolve().parent
if hasattr(xonsh, "jobs"):
xonsh_jobs = xonsh.jobs
else:
xonsh_jobs = xonsh.procs.jobs
def last_job():
jobs = list(xonsh_jobs.get_jobs().values())
if not jobs:
return None
last_job = max(jobs, key=lambda x: x["started"])
return last_job
def kill_job(*jobs):
for job in jobs:
if not job:
continue
for pid in job["pids"]:
try:
kill @(pid)
except subprocess.CalledProcessError:
pass
def kill_by_port(target_port):
netstat_out = $(sudo netstat -tulpn | grep LISTEN)
process_regex = r':(\d+).+?(\d+)/'
for port, pid in re.findall(process_regex, netstat_out):
if int(port) == target_port:
try:
kill @(pid)
except subprocess.CalledProcessError:
pass
#calculate the bandwidth on a tcp port over a certain interval
def measure_bandwidth(port, duration):
start = time.time()
iftop_out = $(timeout @(duration * 2) sudo iftop -i lo -f @(f"port {port}") -t -s @(duration) -B 2>/dev/null)
iftop_regex = r'Cumulative.+?:.+?([\d.]+)([A-Z]+)\n'
end = time.time()
unit_names = ["B", "KB", "MB", "GB", "TB"]
amount, unit = re.findall(iftop_regex, iftop_out)[0]
multiplier = 1024 ** unit_names.index(unit)
return float(amount) * multiplier / (end - start)
@contextlib.contextmanager
def temp_cd(path):
original_path = os.getcwd()
os.chdir(path)
try:
yield
finally:
os.chdir(original_path)