-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathclean.py
275 lines (210 loc) · 8.46 KB
/
clean.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
import os
import shutil
from helper import convert_to_map, valid_obd_file, valid_gps_file
import constants
from utils import read_csv_file
debug = True
def clean_file(input_string, configs=None):
"""
Performs the operations for the clean command, i.e, moves the files to a temporary folder
or moves them to trash depending on the given and/or default criteria.
Parameters:
input_string : str, or list of str
options for clean command.
"""
if input_string == "syntax":
msg = """clean -d directory [-acc=True] [-gps interval=5] [-len duration=10]
[-gyro=True] [-obd=False] [-f=False] [-temp path=None]
-gps interval=5: The maximum average sampling interval of a good GPS.
-len min_duration=10: The minimum duration for a good trip, whose unit is minute.
-gyro=True: True if a gyro file should be treated as a necessary file for a good trip.
-obd=False: The same meaning as -gyro.
-f force_delete=False: If True, then delete bad folder directly. Otherwise, move to temp folder. Default is False.
-temp path=None: The path/directory to move all bad trips into.
"""
print(msg)
else:
options = convert_to_map(input_string)
force_delete = options.get('-f', "False")
top_folder = options.get('-d', None)
if not top_folder and configs:
top_folder = configs['data_path']
if not top_folder:
print("top folder is required")
return
temp_folder = options.get('-temp', os.path.join(top_folder, constants.TEMP_FOLDER))
acc_need_valid = options.get('-acc', "True")
gyro_need_valid = options.get('-gyro', "True")
obd_need_valid = options.get('-obd', "False")
gps_max_interval = int(options.get('-gps', 5))
min_duration = int(options.get('-len', 10))
if debug:
print('force_detele, top folder, acc needed, gyro needed, obd needed, gps interval, temp folder, min duration')
print(force_delete, top_folder, acc_need_valid, gyro_need_valid, obd_need_valid, gps_max_interval, temp_folder, min_duration)
clean_all(top_folder, force_delete, acc_need_valid, gyro_need_valid, obd_need_valid, gps_max_interval, temp_folder, min_duration)
def clean_all(root, force_delete, acc_need_valid, gyro_need_valid, obd_need_valid, gps_max_interval, temp_folder, min_duration):
"""
Performs the clean operations of the individual files, invoked from within the clean_file method.
Parameters
----------
root : str
The path of folder
force_delete: str, 'True' or 'False'
If 'True', then bad files will be deleted.
acc_need_valid : str, 'True', or 'False'
If 'True', then good trip needs 'raw_acc.txt' file.
gyro_need_valid : str, 'True' or 'False'
If 'True', then good trip needs 'raw_gyro.txt' file.
obd_need_valid : str, 'True' or 'False'
If 'True', then good trip needs 'raw_obd.txt' file.
gps_max_interval : int
The maximum of average sample interval of a good gps data file.
temp_folder : str
The temp folder to store the 'bad' trips data if they are not going to be force_deleted
min_duration: int
The minimum duration/length of a good trip. Unit is minute.
"""
for _root, _, _ in os.walk(root):
# TODO: this will prevent the program working on a single trip directly
if _root == root:
continue
if _root.startswith(temp_folder):
continue
clean_single_folder(_root, force_delete, acc_need_valid, gyro_need_valid, obd_need_valid, gps_max_interval, temp_folder, min_duration)
def clean_single_folder(root, force_delete, acc_need_valid, gyro_need_valid, obd_need_valid, gps_max_interval, temp_folder, min_duration):
"""
Valid the given folder as request
Parameters
----------
root : str
The path of folder
force_delete: str, 'True' or 'False'
If 'True', then bad files will be deleted.
acc_need_valid : str, 'True', or 'False'
If 'True', then good trip needs 'raw_acc.txt' file.
gyro_need_valid : str, 'True' or 'False'
If 'True', then good trip needs 'raw_gyro.txt' file.
obd_need_valid : str, 'True' or 'False'
If 'True', then good trip needs 'raw_obd.txt' file.
gps_max_interval : int
The maximum of average sample interval of a good gps data file.
temp_folder : str
The temp folder to store the 'bad' trips data if they are not going to be force_deleted
min_duration: int
The minimum duration/length of a good trip. Unit is minute.
"""
files = [f for f in os.listdir(root) if os.path.isfile(os.path.join(root, f)) and f != '.DS_Store']
if not files:
return
if (acc_need_valid.lower() == 'true' and not valid_acc(root)) or\
(gyro_need_valid.lower() == 'true' and not valid_gyro(root)) or\
(obd_need_valid.lower() == 'true' and not valid_obd(root)) or\
(not valid_gps(root, gps_max_interval, min_duration)):
deal_bad_trip(root, force_delete, temp_folder)
def valid_acc(root):
"""
Valid if the acc file is good or not in the given folder
Parameters
----------
root : str
The path that contains the file
Returns
-------
True if the file is valid; False, otherwise
"""
acc_file = os.path.join(root, constants.ACC_FILE_NAME)
if not os.path.isfile(acc_file):
if debug:
print("Invalid acc file %s" % acc_file)
return False
return True
def valid_gyro(root):
"""
Valid if the gyro file is good or not in the given folder
Parameters
----------
root : str
The path that contains the file
Returns
-------
True if the file is valid; False, otherwise
"""
gyro_file = os.path.join(root, constants.GYRO_FILE_NAME)
if not os.path.isfile(gyro_file):
if debug:
print("Invalid gyro file %s" % gyro_file)
return False
return True
def valid_obd(root):
"""
Valid if the obd file is good or not in the given folder
Parameters
----------
root : str
The path that contains the file
Returns
-------
True if the file is valid; False, otherwise
"""
obd_file = os.path.join(root, constants.OBD_FILE_NAME)
if not valid_obd_file(obd_file):
if debug:
print("Invalid obd file %s" % obd_file)
return False
return True
def valid_gps(root, gps_max_interval, min_duration):
"""
Check the gps file is valid or not
Parameters
----------
root : str
The folder contains the gps file.
gps_max_interval : int
The maximum sampling interval of a good trip.
min_duration : int
The minimum duration of a good trip. Unit is minute.
Return
------
valid : bool
True if the gps file is good. Fasle, otherwise.
"""
gps_file = os.path.join(root, constants.GPS_FILE_NAME)
if not valid_gps_file(gps_file):
if debug:
print("invalid gps file: %s" % root)
return False
time_speed = read_csv_file(gps_file, columns=[1, 4])
trip_duration = (time_speed[-1][0] - time_speed[0][0]) / 1000.0 # seconds
ave_time = trip_duration / len(time_speed)
if ave_time > gps_max_interval:
if debug:
print("Trip: %s" % root)
print("average interval of GPS samples: %.2f seconds, which is too large." % ave_time)
return False
if trip_duration / 60.0 < min_duration:
if debug:
print("Trip: %s" % root)
print("Trip is too short: %.2f minutes." % (trip_duration / 60.0))
return False
if debug:
print("Trip: %s" % root)
print("\tAverage interval of GPS samples: %.2f seconds, which is good." % ave_time)
print("\tTrip length is: %.2f minutes." % (trip_duration / 60.0))
return True
def deal_bad_trip(root, force_delete, temp_folder):
"""
Deal with bad trip, either delete it or move it to temp_folder
root : dir
The path of the folder
force_delete : str, 'True' or 'False'
If 'True', then folder will be deleted directly.
temp_folder : str
The path of the temp folder to host the bad folder.
"""
if force_delete.lower() == 'true':
shutil.rmtree(root)
return
# folder_name = os.path.basename(root)
if not os.path.isdir(temp_folder):
os.makedirs(temp_folder)
shutil.move(root, temp_folder)