-
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathosm_fetch.py
212 lines (144 loc) · 6.32 KB
/
osm_fetch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
'''
osm_fetch.py created for import and convert the desired OSM data
'''
import requests, os, time, json
# import codecs
# import geopandas as gpd
# from geopandas import read_file
import osm2geojson
from itertools import cycle
from qgis.core import QgsApplication
# doing some stuff again to avoid circular imports:
# homepath = os.path.expanduser('~')
profilepath = QgsApplication.qgisSettingsDirPath()
base_pluginpath_p2 = 'python/plugins/osm_sidewalkreator'
basepath = os.path.join(profilepath,base_pluginpath_p2)
'''
## MAJOR TODO: evaluate the use of "import gdal" to use gdal ogr api and osm driver to convert the .osm files, leaving zero external dependencies...
'''
def delete_filelist_that_exists(filepathlist):
for filepath in filepathlist:
if os.path.exists(filepath):
os.remove(filepath)
def join_to_a_outfolder(filename,foldername='temporary'):
outfolder = os.path.join(basepath,foldername)
return os.path.join(outfolder,filename)
def osm_query_string_by_bbox(min_lat,min_lgt,max_lat,max_lgt,interest_key="highway",node=False,way=True,relation=False,print_querystring=False,interest_value = None,dump_path=None):
node_part = way_part = relation_part = ''
query_bbox = f'{min_lat},{min_lgt},{max_lat},{max_lgt}'
interest_value_part = ''
if interest_value:
interest_value_part = f'="{interest_value}"'
if node:
node_part = f'node["{interest_key}"{interest_value_part}]({query_bbox});'
if way:
way_part = f'way["{interest_key}"{interest_value_part}]({query_bbox});'
if relation:
relation_part = f'relation["{interest_key}"{interest_value_part}]({query_bbox});'
overpass_query = f"""
(
{node_part}
{way_part}
{relation_part}
);
/*added by auto repair*/
(._;>;);
/*end of auto repair*/
out;
"""
if print_querystring:
print(overpass_query)
if dump_path:
with open(dump_path,'w+') as querydumper:
querydumper.write(overpass_query)
return overpass_query
def filter_gjsonfeats_bygeomtype(geojson,geomtype='LineString',lvl1='features',include_feats_without_tags=False):
'''
Flexible function that can receives either a path to geojson file or geojson as a dictionary
'''
if type(geojson) == str:
with open(geojson) as reader:
data = reader.read()
# print(data)
as_dict = json.loads(data)
else:
as_dict = geojson
feat_list = as_dict[lvl1]
new_list = []
# in order to deal with relations
allowed_types = [geomtype]
if geomtype == 'Polygon':
allowed_types.append('MultiPolygon')
for entry in feat_list:
# if entry['geometry']['type'] == geomtype:
if any(entry['geometry']['type']==val for val in allowed_types):
# fixing tags not appearing as fields
# checking if tags in 'properties'
if 'tags' in entry['properties']:
tags_dict = entry['properties']['tags']
for key in tags_dict:
entry['properties'][key] = entry['properties']['tags'][key]
del entry['properties']['tags']
new_list.append(entry)
else:
# so by default, features without tags will not be included
if include_feats_without_tags:
new_list.append(entry)
as_dict[lvl1] = new_list
return as_dict
def get_osm_data(querystring,tempfilesname,geomtype='LineString',print_response=False,timeout=30,return_as_string=False):
'''
get the osmdata and stores in files or in a geojson string, also generates temporary files
'''
overpass_url_list = ["http://overpass-api.de/api/interpreter","https://lz4.overpass-api.de/api/interpreter","https://z.overpass-api.de/api/interpreter",'https://overpass.openstreetmap.ru/api/interpreter','https://overpass.openstreetmap.fr/api/interpreter','https://overpass.kumi.systems/api/interpreter']
# to iterate circularly, thx: https://stackoverflow.com/a/23416519/4436950
circular_iterator = cycle(overpass_url_list)
overpass_url = next(circular_iterator)
while True:
# TODO: ensure sucess
# (the try statement is an improvement already)
try:
response = requests.get(overpass_url,params={'data':querystring},timeout=timeout)
if response.status_code == 200:
break
except:
print('request not sucessful, retrying in 5 seconds...')
time.sleep(5)
overpass_url = next(circular_iterator)
print('retrying with server',overpass_url)
# TODO check the response, beyond terminal printing
if print_response:
print(response)
if return_as_string:
xml_filecontent = response.text
else:
# the outpaths for temporary files
xmlfilepath = join_to_a_outfolder(tempfilesname+'_osm.xml')
geojsonfilepath = join_to_a_outfolder(tempfilesname+'_osm.geojson')
print('xml will be written to: ',xmlfilepath)
# the xml file writing part:
with open(xmlfilepath,'w+') as handle:
handle.write(response.text)
print('geojson will be written to: ',geojsonfilepath)
# # # # # the command-line call
# # # # # old method: using osmtogeojson app
# # # # runstring = f'osmtogeojson "{xmlfilepath}" > "{geojsonfilepath}"'
# # # # out = subprocess.run(runstring,shell=True)
# # new method : osm2geojson library
# codecs.
with open(xmlfilepath, 'r', encoding='utf-8') as data:
xml_filecontent = data.read()
# converting OSM XML to Geojson:
geojson_datadict = osm2geojson.xml2geojson(xml_filecontent, filter_used_refs=False, log_level='INFO')
if not return_as_string:
with open(geojsonfilepath.replace('.geojson','_unfiltered.geojson'),'w+') as geojson_handle:
json.dump(geojson_datadict,geojson_handle)
filtered_geojson_dict = filter_gjsonfeats_bygeomtype(geojson_datadict,geomtype)
print('conversion sucessfull!!')
if return_as_string:
return json.dumps(filtered_geojson_dict)
else:
# dumping geojson file:
with open(geojsonfilepath,'w+') as geojson_handle:
json.dump(filtered_geojson_dict,geojson_handle)
return geojsonfilepath