-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathosm_interpreter.py
283 lines (238 loc) · 10.9 KB
/
osm_interpreter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
import os
import platform
import re
import sys
import time
from pathlib import Path
from helper.global_var import FLAG_DEBUG, SAVE_TYPE_JSON, SAVE_TYPE_PICKLE
from helper.graph_writer import graph_writer
from osm_handler import OSMHandler
def debug_show_all_route(relations, final_node_table, final_way_table):
"""
For debugging purposes, I plot all of the routes I saved, showing that I have all of the NFTA bus routes.
Parameters
----------
relations: List
A list that contains the same information as final_relation_table, albeit in a less easily accessed form of a
list of 3-tuples.
final_node_table: Dict
A dictionary that stored the node id and the latitude/longitude coordinates
as a key value pair.
final_way_table: Dict
A dictionary that stored the way id and a list of node id's as a key value pair.
Returns
-------
An URL link to the saved map file, can be open in the browser
"""
import folium
import random
m = folium.Map(location=[42.89, -78.74], tiles="OpenStreetMap", zoom_start=10)
print("[Debug] All route:")
for relation in relations:
line_color = '#{}'.format(hex(random.randint(0, 16777215))[2:])
print("[Debug] - %s" % relation[2]['name'])
for index in relation[1]:
points = []
if index in final_way_table:
for waypoint in final_way_table[index]:
points.append((final_node_table[waypoint][0], final_node_table[waypoint][1]))
elif index in final_node_table:
points.append((final_node_table[index][0], final_node_table[index][1]))
if len(points) != 0:
folium.PolyLine(points, tooltip=relation[2]['name'], color=line_color).add_to(m)
# https://github.com/python-visualization/folium/issues/946
# a way to show the map outside ipython note book
temp_path = "debug/map_of_all_route.html"
m.save(temp_path)
url_path = "file://" + os.path.abspath(temp_path)
return url_path
def get_map_data(map_file, result_file_path, save_type):
"""
Get the routes data from OSM file
Parameters
----------
map_file: Path
The path of the osm file
result_file_path: Path
The path of the folder to save the result file.
save_type: int
The type to store the file, use the following variable
SAVE_TYPE_JSON or SAVE_TYPE_PICKLE
Returns
-------
final_node_table: Dict
A dictionary that stored the node id and the latitude/longitude coordinates
as a key value pair.
final_way_table: Dict
A dictionary that stored the way id and a list of node id's as a key value pair.
final_relation_table: Dict
A dictionary that stored the relation id and a tuple that had a list of nodes and ways and a list of tags.
The list of nodes and ways were the stops and streets that made up a specific NFTA route. Nodes are usually
included because the routes start and end at points that are generally not the natural endpoint of a road.
The tags are useful because they possess information on the route like its name and what type of vehicle
traverse the route (e.g. bus).
relations: List
A list that contains the same information as final_relation_table, albeit in a less easily accessed form of a
list of 3-tuples.
way_graph: Graph (Dictionary)
A graph use way_id as the node of the graph and use [OSM's node] as the edge of the graph. Use adjacency list
to represent the graph.
way_types: Dictionary
A dictionary that use way_id as key and the type of the way as the value
way_type_avg_speed_limit: Dictionary
A dictionary that use way_type as key and the average speed limit of that type of way as the value
"""
osmhandler = OSMHandler()
osmhandler.apply_file(str(map_file))
relations = []
node_table = osmhandler.node_table
way_table = osmhandler.way_table
final_node_table = {}
final_way_table = {}
final_relation_table = {}
used_nodes = set()
used_ways = set()
way_tag_table = osmhandler.way_tag_table
# In the OSMHandler, I only saved relations that had NFTA in the name,
# but I still had to save all of the nodes and ways because they don't
# have additional information to tell me which ones I will use.
# Here, I only save the nodes and ways that are found in the relations
# that I'm actually going to use.
for relation in osmhandler.relations:
relations.append(relation)
for index in relation[1]:
if index in way_table:
used_ways.add(index)
# print(index)
for waypoint in way_table[index]:
used_nodes.add(waypoint)
# print('({},{})'.format(node_table[waypoint][0],node_table[waypoint][1]))
elif index in node_table:
used_nodes.add(index)
# print('({},{})'.format(node_table[index][0], node_table[index][1]))
if FLAG_DEBUG:
print("[Debug] len(used_nodes) = %d" % len(used_nodes))
print("[Debug] len(used_nodes) = %d" % len(used_ways))
# Saves the final important nodes, ways, and relations to a format that is
# more practical for my purposes: the dictionary.
for node_id in used_nodes:
if node_id in node_table:
final_node_table[node_id] = node_table[node_id]
for way_id in used_ways:
if way_id in way_table:
final_way_table[way_id] = way_table[way_id]
for relation in relations:
final_relation_table.update({relation[0]: [relation[1], relation[2]]})
if FLAG_DEBUG:
print("[Debug] len(final_node_table) = %d" % len(final_node_table))
print("[Debug] len(final_way_table)= %d" % len(final_way_table))
print("[Debug] len(final_relation_table) = %d" % len(final_relation_table))
print("[Debug] Map of all route: ", debug_show_all_route(relations, final_node_table, final_way_table))
# Create a graph that use [OSM's way] as the node of the graph
# and use [OSM's node] as the edge of the graph
node_to_way_mapping = {}
way_graph_by_set = {}
for way in final_way_table:
for node in final_way_table[way]:
if node not in node_to_way_mapping:
node_to_way_mapping[node] = {way}
else:
node_to_way_mapping[node].add(way)
for node, ways in node_to_way_mapping.items():
if len(ways) > 1:
for way in ways:
for way_other in (ways - {way}):
if way in way_graph_by_set:
way_graph_by_set[way].add(way_other)
else:
way_graph_by_set[way] = {way_other}
else:
for way in ways:
if way not in way_graph_by_set:
way_graph_by_set[way] = set()
# Convert the graph to an adjacency list format data structure
way_graph_by_list = {}
for key, value in way_graph_by_set.items():
if key in final_way_table:
way_graph_by_list[key] = list(value)
# Create a dictionary that use way_id as key and store the type of the way, store in way_types
# (the type of the way is mark as "highway" tag in OSM)
# Also we compute the average speed limit for each type of the road and store in the way_type_avg_speed_limit
way_types = {}
way_type_avg_speed_limit = {}
for way_id in final_way_table:
if "highway" in way_tag_table[way_id]:
way_type = way_tag_table[way_id]["highway"]
way_types[way_id] = way_type
if way_type not in way_type_avg_speed_limit:
way_type_avg_speed_limit[way_type] = []
if "maxspeed" in way_tag_table[way_id]:
re_result = re.search(r"\d*", way_tag_table[way_id]["maxspeed"])
if re_result is None:
continue
way_type_avg_speed_limit[way_type].append(int(re_result.group()))
else:
way_types[way_id] = "unclassified"
# To simplify the result, we round the result to the nearest multiple of 5.
for key, value in way_type_avg_speed_limit.items():
if len(value) != 0:
way_type_avg_speed_limit[key] = 5 * round(sum(value) / len(value) / 5)
else:
way_type_avg_speed_limit[key] = 0
# Save data to files
save_filename_list = ["final_node_table", "final_way_table", "final_relation_table", "relations", "way_graph",
"way_types", "way_type_avg_speed_limit"]
save_variable_list = [final_node_table, final_way_table, final_relation_table, relations, way_graph_by_list,
way_types, way_type_avg_speed_limit]
graph_writer(result_file_path, save_type, save_filename_list, save_variable_list)
return final_node_table, final_way_table, final_relation_table, relations
if __name__ == '__main__':
if len(sys.argv) < 2:
print("Usage:")
print("osm_interpreter.py <map file> [result path] [result format]")
print("")
print("Require:")
print("Map file : the path of the osm file")
print("")
print("Optional:")
print("Result path: the path to the folder that store the result files")
print(" by default is: graph/")
print("Save format: the format to save the result, by default is pickle")
print(" possible value: JSON and pickle")
exit(0)
map_file = Path(sys.argv[1])
result_file_path = Path("graph")
if len(sys.argv) >= 3:
result_file_path = Path(sys.argv[2])
save_type = SAVE_TYPE_PICKLE
if len(sys.argv) >= 4:
if sys.argv[3] == "JSON":
save_type = SAVE_TYPE_JSON
elif sys.argv[3] == "pickle":
save_type = SAVE_TYPE_PICKLE
else:
print("invalid Save format")
print("Save format: the format to save the result, by default is pickle")
print(" possible value: JSON and pickle")
exit(0)
print("Map File : %s" % map_file)
print("Result path: %s" % result_file_path)
if save_type == SAVE_TYPE_PICKLE:
print("Result type: pickle")
elif save_type == SAVE_TYPE_JSON:
print("Result type: JSON")
start = time.process_time()
get_map_data(map_file, result_file_path, save_type)
if FLAG_DEBUG:
print("[Debug] Total runtime is %.3f s" % (time.process_time() - start))
print("Done")
# Todo: This bug needs to be fixed.
# In some case the script won't stop in windows, need to kill itself in order to stop (Shiluo)
if platform.system() != "Windows":
exit(0)
else:
print("")
print("Known BUG:")
print("For some unknown reason, this python script cannot close correctly on some Windows System")
print("Killing the program ...")
os.kill(os.getpid(), 9)