-
-
Notifications
You must be signed in to change notification settings - Fork 291
/
Copy pathfields.py
335 lines (287 loc) · 10.6 KB
/
fields.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
# Copyright 2011-2012 Nicolas Bessi (Camptocamp SA)
# Copyright 2016 Yannick Payot (Camptocamp SA)
# Copyright 2023 ACSONE SA/NV
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
import json
import logging
from operator import attrgetter
from odoo import _, fields
from odoo.tools import sql
from . import geo_convertion_helper as convert
from .geo_db import create_geo_column
logger = logging.getLogger(__name__)
try:
import geojson
from shapely.geometry import Point, shape
from shapely.geometry.base import BaseGeometry
from shapely.wkb import loads as wkbloads
except ImportError:
logger.warning("Shapely or geojson are not available in the sys path")
class GeoField(fields.Field):
"""The field descriptor contains the field definition common to all
specialized fields for geolocalization. Subclasses must define a type
and a geo_type. The type is the name of the corresponding column type,
the geo_type is the name of the corresponding type in the GIS system.
"""
geo_type = None
dim = 2
srid = 3857
gist_index = True
@property
def column_type(self):
return ("geometry", f"geometry({self.geo_type.upper()}, {self.srid})")
def convert_to_column(self, value, record, values=None):
"""Convert value to database format
value can be geojson, wkt, shapely geometry object.
If geo_direct_write in context you can pass diretly WKT"""
if not value:
return None
shape_to_write = self.entry_to_shape(value, same_type=True)
if shape_to_write.is_empty:
return None
else:
return f"SRID={self.srid};{shape_to_write.wkt}"
def convert_to_cache(self, value, record, validate=True):
val = value
if isinstance(val, (bytes, str)):
try:
int(val, 16)
except Exception:
# not an hex value -> try to load from a sting
# representation of a geometry
value = convert.value_to_shape(value, use_wkb=False)
if isinstance(value, BaseGeometry):
val = value.wkb_hex
return val
def convert_to_record(self, value, record):
"""Value may be:
- a GeoJSON string when field onchange is triggered
- a geometry object hexcode from cache
- a unicode containing dict
"""
if not value:
return False
return convert.value_to_shape(value, use_wkb=True)
def convert_to_read(self, value, record, use_name_get=True):
if not isinstance(value, BaseGeometry):
# read hexadecimal value from database
shape = self.load_geo(value)
else:
shape = value
if not shape or shape.is_empty:
return False
return geojson.dumps(shape)
#
# Field description
#
# properties used by get_description()
_description_dim = property(attrgetter("dim"))
_description_srid = property(attrgetter("srid"))
_description_gist_index = property(attrgetter("gist_index"))
@classmethod
def load_geo(cls, wkb):
"""Load geometry into browse record after read was done"""
if isinstance(wkb, BaseGeometry):
return wkb
return wkbloads(wkb, hex=True) if wkb else False
def entry_to_shape(self, value, same_type=False):
"""Transform input into an object"""
shape = convert.value_to_shape(value)
if same_type and not shape.is_empty:
if shape.geom_type.lower() != self.geo_type.lower():
msg = _(
"Geo Value %(geom_type)s must be of the same type %(geo_type)s as fields",
geom_type=shape.geom_type.lower(),
geo_type=self.geo_type.lower(),
)
raise TypeError(msg)
return shape
def update_geo_db_column(self, model):
"""Update the column type in the database."""
cr = model._cr
query = """SELECT srid, type, coord_dimension
FROM geometry_columns
WHERE f_table_name = %s
AND f_geometry_column = %s"""
cr.execute(query, (model._table, self.name))
check_data = cr.fetchone()
if not check_data:
raise TypeError(
_(
"geometry_columns table seems to be corrupted."
" SRID check is not possible"
)
)
if check_data[0] != self.srid:
raise TypeError(
_(
"Reprojection of column is not implemented."
" We can not change srid %(srid)s to %(data)s",
srid=self.srid,
data=check_data[0],
)
)
elif check_data[1] != self.geo_type.upper():
raise TypeError(
_(
"Geo type modification is not implemented."
" We can not change type %(data)s to %(geo_type)s",
data=check_data[1],
geo_type=self.geo_type.upper(),
)
)
elif check_data[2] != self.dim:
raise TypeError(
_(
"Geo dimention modification is not implemented."
" We can not change dimention %(data)s to %(dim)s",
data=check_data[2],
dim=self.dim,
)
)
if self.gist_index:
cr.execute(
"SELECT indexname FROM pg_indexes WHERE indexname = %s",
(self._postgis_index_name(model._table, self.name),),
)
index = cr.fetchone()
if index:
return True
self._create_index(cr, model._table, self.name)
return True
def update_db_column(self, model, column):
"""Create/update the column corresponding to ``self``.
For creation of geo column
:param model: an instance of the field's model
:param column: the column's configuration (dict)
if it exists, or ``None``
"""
# the column does not exist, create it
if not column:
create_geo_column(
model._cr,
model._table,
self.name,
self.geo_type.upper(),
self.srid,
self.dim,
self.string,
)
return
if column["udt_name"] == self.column_type[0]:
return
self.update_geo_db_column(model)
if column["udt_name"] in self.column_cast_from:
sql.convert_column(model._cr, model._table, self.name, self.column_type[1])
else:
newname = (self.name + "_moved{}").format
i = 0
while sql.column_exists(model._cr, model._table, newname(i)):
i += 1
if column["is_nullable"] == "NO":
sql.drop_not_null(model._cr, model._table, self.name)
sql.rename_column(model._cr, model._table, self.name, newname(i))
sql.create_column(
model._cr, model._table, self.name, self.column_type[1], self.string
)
class GeoLine(GeoField):
"""Field for POSTGIS geometry Line type"""
type = "geo_line"
geo_type = "LineString"
@classmethod
def from_points(cls, cr, point1, point2, srid=None):
"""
Converts given points in parameter to a line.
:param cr: DB cursor
:param point1: Point (BaseGeometry)
:param point2: Point (BaseGeometry)
:param srid: SRID
:return: LINESTRING Object
"""
sql = """
SELECT
ST_MakeLine(
ST_GeomFromText(%(wkt1)s, %(srid)s),
ST_GeomFromText(%(wkt2)s, %(srid)s)
)
"""
cr.execute(
sql,
{
"wkt1": point1.wkt,
"wkt2": point2.wkt,
"srid": srid or cls.srid,
},
)
res = cr.fetchone()
return cls.load_geo(res[0])
class GeoPoint(GeoField):
"""Field for POSTGIS geometry Point type"""
type = "geo_point"
geo_type = "Point"
@classmethod
def from_latlon(cls, cr, latitude, longitude):
"""Convert a (latitude, longitude) into an UTM coordinate Point:"""
pt = Point(longitude, latitude)
cr.execute(
"""
SELECT
ST_Transform(
ST_GeomFromText(%(wkt)s, 4326),
%(srid)s)
""",
{"wkt": pt.wkt, "srid": cls.srid},
)
res = cr.fetchone()
return cls.load_geo(res[0])
@classmethod
def to_latlon(cls, cr, geopoint):
"""Convert a UTM coordinate point to (latitude, longitude):"""
# Line to execute to retrieve longitude, latitude from UTM in postgres command line:
# SELECT ST_X(geom), ST_Y(geom) FROM (SELECT ST_TRANSFORM(ST_SetSRID(
# ST_MakePoint(601179.61612, 6399375,681364), 900913), 4326) as geom) g;
if isinstance(geopoint, BaseGeometry):
geo_point_instance = geopoint
else:
geo_point_instance = shape(json.loads(geopoint))
cr.execute(
"""
SELECT
ST_TRANSFORM(
ST_SetSRID(
ST_MakePoint(
%(coord_x)s, %(coord_y)s
),
%(srid)s
), 4326)""",
{
"coord_x": geo_point_instance.x,
"coord_y": geo_point_instance.y,
"srid": cls.srid,
},
)
res = cr.fetchone()
point_latlon = cls.load_geo(res[0])
return point_latlon.x, point_latlon.y
class GeoPolygon(GeoField):
"""Field for POSTGIS geometry Polygon type"""
type = "geo_polygon"
geo_type = "Polygon"
class GeoMultiLine(GeoField):
"""Field for POSTGIS geometry MultiLine type"""
type = "geo_multi_line"
geo_type = "MultiLineString"
class GeoMultiPoint(GeoField):
"""Field for POSTGIS geometry MultiPoint type"""
type = "geo_multi_point"
geo_type = "MultiPoint"
class GeoMultiPolygon(GeoField):
"""Field for POSTGIS geometry MultiPolygon type"""
type = "geo_multi_polygon"
geo_type = "MultiPolygon"
fields.GeoLine = GeoLine
fields.GeoPoint = GeoPoint
fields.GeoPolygon = GeoPolygon
fields.GeoMultiLine = GeoMultiLine
fields.GeoMultiPoint = GeoMultiPoint
fields.GeoMultiPolygon = GeoMultiPolygon