-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathutils.py
38 lines (29 loc) · 894 Bytes
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import signal
import time
from contextlib import contextmanager
from multiprocessing import Queue
def stops_list_to_queue(data: list, queue: Queue = None) -> Queue:
"""Создает очередь из списка координат остановок (tuple)"""
if queue is None:
queue = Queue()
for stop in data:
coord = stop["stop_id"]
queue.put(coord)
return queue
def stops_list_to_stop_id_queue(data: list) -> Queue:
result = Queue()
for stop in data:
coord = stop["stop_id"]
result.put(coord)
return result
class TimeoutException(Exception): pass
@contextmanager
def time_limit(seconds):
def signal_handler(signum, frame):
raise TimeoutException("Timed out!")
signal.signal(signal.SIGALRM, signal_handler)
signal.alarm(seconds)
try:
yield
finally:
signal.alarm(0)