Skip to content

Commit

Permalink
CellCollection: accept query without a date
Browse files Browse the repository at this point in the history
  • Loading branch information
wowtor committed Apr 6, 2024
1 parent 79f5fb5 commit e263c12
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 32 deletions.
4 changes: 2 additions & 2 deletions telcell/celldb/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from .cell_database import CellDatabase
from .pgdatabase import PgDatabase
from .cell_collection import CellCollection
from .pgdatabase import PgCollection
from . import duplicate_policy
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations
from abc import abstractmethod
import datetime
from typing import Optional, Iterable
from typing import Optional, Iterable, Sized

import geopy

Expand All @@ -19,7 +19,18 @@ def __getattr__(self, item):
return Properties()


class CellDatabase(Iterable[Properties]):
class CellCollection(Iterable[Properties], Sized):
"""
Collection of cell towers in a cellular network, that may be queried by cell id (e.g. for geocoding) or by
coordinates (e.g. reverse geocoding).
The known fields for cells may vary by database and is generally a subset of:
- cell: the cell identity
- wgs84: the geolocation of the cell tower or a connected device
- accuracy: the accuracy of the location of the connected device in meters
- azimuth: the orientation of the cell tower in meters
"""

@abstractmethod
def get(self, date: datetime.datetime, ci: CellIdentity) -> Optional[Properties]:
"""
Expand All @@ -43,7 +54,7 @@ def search(
mnc: int = None,
count_limit: Optional[int] = 10000,
exclude: Optional[CellIdentity] = None,
) -> CellDatabase:
) -> CellCollection:
"""
Given a Point, find antennas that are in reach from this point sorted by the distance from the grid point.
Expand All @@ -59,8 +70,5 @@ def search(
"""
raise NotImplementedError

def limit(self, count_limit: int) -> CellDatabase:
def limit(self, count_limit: int) -> CellCollection:
return self.search(None, count_limit=count_limit)

def __len__(self):
raise NotImplementedError
6 changes: 4 additions & 2 deletions telcell/celldb/duplicate_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Optional, Sequence

from telcell.cell_identity import CellIdentity
from telcell.celldb.cell_database import Properties
from telcell.celldb.cell_collection import Properties


def get_duplicate_policy(name: str):
Expand All @@ -21,7 +21,9 @@ def warn(ci: CellIdentity, results: Sequence[Properties]) -> Optional[Properties
return results[0]


def take_first(_ci: CellIdentity, results: Sequence[Properties]) -> Optional[Properties]:
def take_first(
_ci: CellIdentity, results: Sequence[Properties]
) -> Optional[Properties]:
return results[0]


Expand Down
95 changes: 95 additions & 0 deletions telcell/celldb/google.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import datetime
from typing import Optional, Iterable

import geopy
import requests
import requests_cache

from telcell.cell_identity import (
CellIdentity,
CellGlobalIdentity,
NRCell,
EutranCellGlobalIdentity,
Radio,
)
from telcell.celldb import CellCollection
from telcell.celldb.cell_collection import Properties


def _ci_to_dict(cell: CellIdentity) -> dict[str, str | int]:
tower = {
"mobileCountryCode": cell.mcc,
"mobileNetworkCode": cell.mnc,
}
if isinstance(cell, CellGlobalIdentity):
tower["locationAreaCode"] = cell.lac
if isinstance(cell, NRCell):
tower["newRadioCellId"] = cell.eci
elif isinstance(cell, CellGlobalIdentity):
tower["cellId"] = cell.ci
elif isinstance(cell, EutranCellGlobalIdentity):
tower["cellId"] = cell.eci

return tower


class GoogleGeolocationService(CellCollection):
"""
Google geolocation service.
See: https://developers.google.com/maps/documentation/geolocation/overview
"""

def __init__(self, key: str, user_agent: str = "TestApp", cache_name: str = None):
self._url = f"https://www.googleapis.com/geolocation/v1/geolocate?key={key}"
self.headers = {"User-Agent": user_agent, "Content-Type": "application/json"}
if cache_name is None:
self._session = requests.Session()
else:
self._session = requests_cache.CachedSession(cache_name)

def get(self, date: datetime.datetime, cell: CellIdentity) -> Properties:
if cell.radio is None and isinstance(cell, EutranCellGlobalIdentity):
info = self.get(date, CellIdentity.parse(f"{Radio.LTE.value}/{cell}"))
if info is None:
info = self.get(date, CellIdentity.parse(f"{Radio.NR.value}/{cell}"))
return info

data = {}
if cell.radio:
data["radioType"] = cell.radio

data["cellTowers"] = [_ci_to_dict(cell)]

res = self._session.post(self._url, json=data).json()
if "error" in res:
raise ValueError(
f'google location service: {res["error"]["message"]}; input={data}'
)

point = geopy.Point(
latitude=res["location"]["lat"], longitude=res["location"]["lng"]
)
accuracy = res["accuracy"]

return Properties(cell=cell, wgs84=point, accuracy=accuracy)

def search(
self,
coords: geopy.Point = None,
distance_limit_m: float = None,
distance_lower_limit_m: float = None,
date: datetime.datetime = None,
radio: Optional[str | Iterable[str]] = None,
mcc: int = None,
mnc: int = None,
count_limit: Optional[int] = 10000,
exclude: Optional[CellIdentity] = None,
) -> CellCollection:
raise NotImplemented("google geolocation service does not support this feature")

def __iter__(self):
raise NotImplemented("google geolocation service does not support this feature")

def __len__(self):
raise NotImplemented("google geolocation service does not support this feature")
64 changes: 46 additions & 18 deletions telcell/celldb/pgdatabase.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,14 @@

import geopy

from telcell.cell_identity import Radio, CellIdentity, CellGlobalIdentity, EutranCellGlobalIdentity
from telcell.cell_identity import (
Radio,
CellIdentity,
CellGlobalIdentity,
EutranCellGlobalIdentity,
)
from . import duplicate_policy
from .cell_database import CellDatabase, Properties
from .cell_collection import CellCollection, Properties
from ..data.models import RD_TO_WGS84, WGS84_TO_RD
from ..geography import Angle

Expand All @@ -34,18 +39,23 @@ def point_to_rd(point: geopy.Point) -> Tuple[int, int]:
def _build_antenna(row):
date_start, date_end, radio, mcc, mnc, lac, ci, eci, rdx, rdy, azimuth_degrees = row
if radio == Radio.GSM.value or radio == Radio.UMTS.value:
retrieved_ci = CellIdentity.create(radio=radio, mcc=mcc, mnc=mnc, lac=lac, ci=ci)
retrieved_ci = CellIdentity.create(
radio=radio, mcc=mcc, mnc=mnc, lac=lac, ci=ci
)
elif radio == Radio.LTE.value or radio == Radio.NR.value:
retrieved_ci = CellIdentity.create(radio=radio, mcc=mcc, mnc=mnc, eci=eci)
elif radio is not None:
raise ValueError(f"unrecognized radio type: {radio}")
elif lac is not None and ci <= 0xFFFF:
retrieved_ci = CellIdentity.create(radio=radio, mcc=mcc, mnc=mnc, lac=lac, ci=ci)
retrieved_ci = CellIdentity.create(
radio=radio, mcc=mcc, mnc=mnc, lac=lac, ci=ci
)
else:
retrieved_ci = CellIdentity.create(radio=radio, mcc=mcc, mnc=mnc, eci=eci)

coords = rd_to_point(rdx, rdy)
return Properties(wgs84=coords, azimuth=Angle(degrees=azimuth_degrees), cell=retrieved_ci)
azimuth = Angle(degrees=azimuth_degrees) if azimuth_degrees is not None else None
return Properties(wgs84=coords, azimuth=azimuth, cell=retrieved_ci)


def _build_cell_identity_query(ci):
Expand Down Expand Up @@ -78,7 +88,7 @@ def _build_cell_identity_query(ci):
return " AND ".join(qwhere), qargs


class PgDatabase(CellDatabase):
class PgCollection(CellCollection):
def __init__(
self,
con,
Expand Down Expand Up @@ -126,8 +136,14 @@ def get(self, date: datetime.datetime, ci: CellIdentity) -> Optional[Properties]
if isinstance(date, datetime.date):
date = datetime.datetime.combine(date, datetime.datetime.min.time())

qwhere = self._qwhere + ["(date_start is NULL OR %s >= date_start) AND (date_end is NULL OR %s < date_end)"]
qargs = self._qargs + [date, date]
qwhere = list(self._qwhere)
qargs = list(self._qargs)

if date is not None:
qwhere = qwhere + [
"(date_start is NULL OR %s >= date_start) AND (date_end is NULL OR %s < date_end)"
]
qargs = qargs + [date, date]

add_qwhere, add_qargs = _build_cell_identity_query(ci)
qwhere.append(add_qwhere)
Expand Down Expand Up @@ -163,7 +179,7 @@ def search(
count_limit: Optional[int] = 10000,
random_order: bool = False,
exclude: Optional[List[CellIdentity]] = None,
) -> CellDatabase:
) -> CellCollection:
"""
Given a Point, find antennas that are in reach from this point sorted by the distance from the grid point.
Expand All @@ -180,12 +196,16 @@ def search(
"""
qwhere = list(self._qwhere)
qargs = list(self._qargs)
if coords is not None:
assert distance_limit_m is not None, "search for coords without distance limit"

if coords is not None and distance_limit_m is not None:
x, y = point_to_rd(coords)
qwhere.append(f"ST_DWithin(rd, 'SRID=4326;POINT({x} {y})', {distance_limit_m})")
qwhere.append(
f"ST_DWithin(rd, 'SRID=4326;POINT({x} {y})', {distance_limit_m})"
)
if distance_lower_limit_m is not None:
qwhere.append(f"NOT ST_DWithin(rd, 'SRID=4326;POINT({x} {y})', {distance_lower_limit_m})")
qwhere.append(
f"NOT ST_DWithin(rd, 'SRID=4326;POINT({x} {y})', {distance_lower_limit_m})"
)
if date is not None:
qwhere.append("(date_start is NULL OR %s >= date_start)")
qwhere.append("(date_end is NULL OR %s < date_end)")
Expand All @@ -210,28 +230,36 @@ def search(
qwhere.append(f"NOT ({add_qwhere})")
qargs.extend(add_qargs)

count_limit = count_limit if count_limit is not None else self._count_limit
qorder = self._qorder
if random_order is not None and random_order:
qorder = "ORDER BY RANDOM()"
elif coords is not None:
x, y = point_to_rd(coords)
qorder = f"ORDER BY ST_Distance(rd, 'SRID=4326;POINT({x} {y})')"

return PgDatabase(self._con, qwhere, qargs, qorder, count_limit, self._on_duplicate)
count_limit = count_limit if count_limit is not None else self._count_limit

return PgCollection(
self._con, qwhere, qargs, qorder, count_limit, self._on_duplicate
)

def close(self):
if self._cur is not None:
self._cur.close()

def __enter__(self):
self._cur = self._con.cursor()
return self

def __exit__(self, type, value, tb):
self._cur.close()
self.close()

def __iter__(self):
assert self._cur is not None, "use within context"
if self._cur is None:
self._cur = self._con.cursor()

q = f"""
SELECT date_start, date_end, radio, mcc, mnc, lac, ci, ST_X(rd), ST_Y(rd), azimuth
SELECT date_start, date_end, radio, mcc, mnc, lac, ci, eci, ST_X(rd), ST_Y(rd), azimuth
FROM antenna_light
WHERE {' AND '.join(qw for qw in self._qwhere)}
{self._qorder}
Expand Down
8 changes: 5 additions & 3 deletions telcell/utils/script_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import confidence

from telcell.celldb import PgDatabase, CellDatabase
from telcell.celldb import PgCollection, CellCollection
from telcell.utils import postgres

DEFAULT_LOGLEVEL = logging.WARNING
Expand Down Expand Up @@ -45,8 +45,10 @@ def get_database_connection(config_path: str, drop_schema: bool = False):


@contextlib.contextmanager
def get_cell_database(config_path: str, drop_schema: bool = False, **kwargs) -> CellDatabase:
yield PgDatabase(get_database_connection(config_path, drop_schema), **kwargs)
def get_cell_database(
config_path: str, drop_schema: bool = False, **kwargs
) -> CellCollection:
yield PgCollection(get_database_connection(config_path, drop_schema), **kwargs)


def setup_logging(path, verbosity_offset):
Expand Down

0 comments on commit e263c12

Please sign in to comment.