Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

410 refacto repositories #412

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env.template
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
###############################################################################################
# these values are used in the local docker env. You can use "localhost" hostname
# if you run the application without docker
POSTGRES_DRIVER=postgresql
POSTGRES_HOSTNAME=postgres_bloom
POSTGRES_USER=bloom_user
POSTGRES_PASSWORD=bloom
Expand Down
1 change: 1 addition & 0 deletions backend/bloom/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class Settings(BaseSettings):
default=5432)

postgres_db:str = Field(min_length=1,max_length=32,pattern=r'^(?:[a-zA-Z]|_)[\w\d_]*$')
postgres_schema:str = Field(default='public')
srid: int = Field(default=4326)
spire_token:str = Field(default='')
data_folder:str=Field(default=str(Path(__file__).parent.parent.parent.joinpath('./data')))
Expand Down
1 change: 0 additions & 1 deletion backend/bloom/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ class UseCases(containers.DeclarativeContainer):

vessel_repository = providers.Factory(
VesselRepository,
session_factory=db.provided.session,
)

alert_repository = providers.Factory(
Expand Down
14 changes: 14 additions & 0 deletions backend/bloom/infra/database/sql_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

class Vessel(Base):
__tablename__ = "dim_vessel"
__table_args__ = {'schema': settings.postgres_schema}
id = Column("id", Integer, primary_key=True)
mmsi = Column("mmsi", Integer)
ship_name = Column("ship_name", String, nullable=False)
Expand All @@ -47,6 +48,7 @@ class Vessel(Base):

class Alert(Base):
__tablename__ = "alert"
__table_args__ = {'schema': settings.postgres_schema}
id = Column("id", Integer, primary_key=True, index=True)
timestamp = Column("timestamp", DateTime)
mpa_id = Column("mpa_id", Integer)
Expand All @@ -55,6 +57,7 @@ class Alert(Base):

class Port(Base):
__tablename__ = "dim_port"
__table_args__ = {'schema': settings.postgres_schema}
id = Column("id", Integer, primary_key=True, index=True)
name = Column("name", String, nullable=False)
locode = Column("locode", String, nullable=False)
Expand All @@ -71,6 +74,7 @@ class Port(Base):

class SpireAisData(Base):
__tablename__ = "spire_ais_data"
__table_args__ = {'schema': settings.postgres_schema}

id = Column("id", Integer, primary_key=True)
spire_update_statement = Column("spire_update_statement", DateTime(timezone=True))
Expand Down Expand Up @@ -108,6 +112,7 @@ class SpireAisData(Base):

class Zone(Base):
__tablename__ = "dim_zone"
__table_args__ = {'schema': settings.postgres_schema}
id = Column("id", Integer, primary_key=True)
category = Column("category", String, nullable=False)
sub_category = Column("sub_category", String)
Expand All @@ -121,6 +126,7 @@ class Zone(Base):

class WhiteZone(Base):
__tablename__ = "dim_white_zone"
__table_args__ = {'schema': settings.postgres_schema}
id = Column("id", Integer, primary_key=True)
geometry = Column("geometry", Geometry(geometry_type="GEOMETRY", srid=settings.srid))
created_at = Column("created_at", DateTime(timezone=True), server_default=func.now())
Expand All @@ -129,6 +135,7 @@ class WhiteZone(Base):

class VesselPosition(Base):
__tablename__ = "vessel_positions"
__table_args__ = {'schema': settings.postgres_schema}
id = Column("id", Integer, primary_key=True)
timestamp = Column("timestamp", DateTime(timezone=True), nullable=False)
accuracy = Column("accuracy", String)
Expand All @@ -148,6 +155,7 @@ class VesselPosition(Base):

class VesselData(Base):
__tablename__ = "vessel_data"
__table_args__ = {'schema': settings.postgres_schema}
id = Column("id", Integer, primary_key=True)
timestamp = Column("timestamp", DateTime(timezone=True), nullable=False)
ais_class = Column("ais_class", String)
Expand All @@ -166,6 +174,7 @@ class VesselData(Base):

class VesselVoyage(Base):
__tablename__ = "vessel_voyage"
__table_args__ = {'schema': settings.postgres_schema}
id = Column("id", Integer, primary_key=True)
timestamp = Column("timestamp", DateTime(timezone=True), nullable=False)
destination = Column("destination", String)
Expand All @@ -177,6 +186,7 @@ class VesselVoyage(Base):

class Excursion(Base):
__tablename__ = "fct_excursion"
__table_args__ = {'schema': settings.postgres_schema}
id = Column("id", Integer, primary_key=True)
vessel_id = Column("vessel_id", Integer, ForeignKey("dim_vessel.id"), nullable=False)
departure_port_id = Column("departure_port_id", Integer, ForeignKey("dim_port.id"))
Expand All @@ -201,6 +211,7 @@ class Excursion(Base):

class Segment(Base):
__tablename__ = "fct_segment"
__table_args__ = {'schema': settings.postgres_schema}
id = Column("id", Integer, primary_key=True)
excursion_id = Column("excursion_id", Integer, ForeignKey("fct_excursion.id"), nullable=False)
timestamp_start = Column("timestamp_start", DateTime(timezone=True))
Expand All @@ -226,6 +237,7 @@ class Segment(Base):

class TaskExecution(Base):
__tablename__ = "tasks_executions"
__table_args__ = {'schema': settings.postgres_schema}
id = Column("id", Integer, Identity(), primary_key=True)
task_name = Column("task_name", String)
point_in_time = Column("point_in_time", DateTime(timezone=True))
Expand All @@ -241,6 +253,7 @@ class RelSegmentZone(Base):
__tablename__ = "rel_segment_zone"
__table_args__ = (
PrimaryKeyConstraint('segment_id', 'zone_id'),
{'schema': settings.postgres_schema}
)
segment_id = Column("segment_id", Integer, ForeignKey("fct_segment.id"), nullable=False)
zone_id = Column("zone_id", Integer, ForeignKey("dim_zone.id"), nullable=False)
Expand All @@ -260,6 +273,7 @@ class RelSegmentZone(Base):

class MetricsVesselInActivity(Base):
__table__ = vessel_in_activity_request
__table_args__ = {'schema': settings.postgres_schema}
#vessel_id: Mapped[Optional[int]]
#total_time_at_sea: Mapped[Optional[timedelta]]

Expand Down
114 changes: 50 additions & 64 deletions backend/bloom/infra/repositories/repository_port.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
from sqlalchemy.orm import Session
from bloom.domain.excursion import Excursion

from bloom.infra.repository import GenericRepository, GenericSqlRepository
from abc import ABC, abstractmethod

class PortRepository:
def __init__(self, session_factory: Callable) -> None:
self.session_factory = session_factory
class PortRepositoryBase(GenericRepository[Port], ABC):
def get_empty_geometry_buffer_ports(self, session: Session) -> list[Port]:
raise NotImplementedError()

def get_port_by_id(self, session: Session, port_id: int) -> Union[Port, None]:
entity = session.get(sql_model.Port, port_id)
Expand All @@ -37,67 +39,51 @@ def get_empty_geometry_buffer_ports(self, session: Session) -> list[Port]:
if not q:
return []
return [PortRepository.map_to_domain(entity) for entity in q]

def get_ports_updated_created_after(self, session: Session, created_updated_after: datetime) -> list[Port]:
stmt = select(sql_model.Port).where(or_(sql_model.Port.created_at >= created_updated_after,
sql_model.Port.updated_at >= created_updated_after))
q = session.execute(stmt).scalars()
if not q:
return []
return [PortRepository.map_to_domain(entity) for entity in q]

def update_geometry_buffer(self, session: Session, port_id: int, buffer: Polygon) -> None:
session.execute(update(sql_model.Port), [{"id": port_id, "geometry_buffer": from_shape(buffer)}])

def batch_update_geometry_buffer(self, session: Session, id_buffers: list[dict[str, Any]]) -> None:
items = [{"id": item["id"], "geometry_buffer": from_shape(item["geometry_buffer"])} for item in id_buffers]
session.execute(update(sql_model.Port), items)

def create_port(self, session: Session, port: Port) -> Port:
orm_port = PortRepository.map_to_sql(port)
session.add(orm_port)
return PortRepository.map_to_domain(orm_port)

def batch_create_port(self, session: Session, ports: list[Port]) -> list[Port]:
orm_list = [PortRepository.map_to_sql(port) for port in ports]
session.add_all(orm_list)
return [PortRepository.map_to_domain(orm) for orm in orm_list]

def find_port_by_position_in_port_buffer(self, session: Session, position: Point) -> Union[Port, None]:
stmt = select(sql_model.Port).where(
func.ST_contains(sql_model.Port.geometry_buffer, from_shape(position, srid=settings.srid)) == True)
port = session.execute(stmt).scalar()
if not port:
return None
else:
return PortRepository.map_to_domain(port)

def find_port_by_distance(self,
session: Session,
longitude: float,
latitude: float,
threshold_distance_to_port: float) -> Union[Port, None]:
position = Point(longitude, latitude)
stmt = select(sql_model.Port).where(
and_(
func.ST_within(from_shape(position, srid=settings.srid),
sql_model.Port.geometry_buffer) == True,
func.ST_distance(from_shape(position, srid=settings.srid),
sql_model.Port.geometry_point) < threshold_distance_to_port
)
).order_by(asc(func.ST_distance(from_shape(position, srid=settings.srid),
sql_model.Port.geometry_point)))
result = session.execute(stmt).scalars()
return [PortRepository.map_to_domain(e) for e in result]

def get_closest_port_in_range(self, session: Session, longitude: float, latitude: float, range: float) -> Union[
tuple[int, float], None]:
res = session.execute(text("""SELECT id,ST_Distance(ST_POINT(:longitude,:latitude, 4326)::geography, geometry_point::geography)
FROM dim_port WHERE ST_Within(ST_POINT(:longitude,:latitude, 4326),geometry_buffer) = true
AND ST_Distance(ST_POINT(:longitude,:latitude, 4326)::geography, geometry_point::geography) < :range
ORDER by ST_Distance(ST_POINT(:longitude,:latitude, 4326)::geography, geometry_point::geography) ASC LIMIT 1"""),
{"longitude": longitude, "latitude": latitude, "range": range}).first()
return res
pass

# class PortRepository:
# def __init__(self, session_factory: Callable) -> None:
# self.session_factory = session_factory

# def get_port_by_id(self, session: Session, port_id: int) -> Union[Port, None]:
# entity = session.get(sql_model.Port, port_id)
# if entity is not None:
# return PortRepository.map_to_domain(entity)
# else:
# return None

# def get_all_ports(self, session: Session) -> List[Port]:
# q = session.query(sql_model.Port)
# if not q:
# return []
# return [PortRepository.map_to_domain(entity) for entity in q]

# def get_empty_geometry_buffer_ports(self, session: Session) -> list[Port]:
# stmt = select(sql_model.Port).where(sql_model.Port.geometry_buffer.is_(None))
# q = session.execute(stmt).scalars()
# if not q:
# return []
# return [PortRepository.map_to_domain(entity) for entity in q]

# def get_ports_updated_created_after(self, session: Session, created_updated_after: datetime) -> list[Port]:
# stmt = select(sql_model.Port).where(or_(sql_model.Port.created_at >= created_updated_after,
# sql_model.Port.updated_at >= created_updated_after))
# q = session.execute(stmt).scalars()
# if not q:
# return []
# return [PortRepository.map_to_domain(entity) for entity in q]

# def update_geometry_buffer(self, session: Session, port_id: int, buffer: Polygon) -> None:
# session.execute(update(sql_model.Port), [{"id": port_id, "geometry_buffer": from_shape(buffer)}])

# def batch_update_geometry_buffer(self, session: Session, id_buffers: list[dict[str, Any]]) -> None:
# items = [{"id": item["id"], "geometry_buffer": from_shape(item["geometry_buffer"])} for item in id_buffers]
# session.execute(update(sql_model.Port), items)

# def create_port(self, session: Session, port: Port) -> Port:
# orm_port = PortRepository.map_to_sql(port)
# session.add(orm_port)
# return PortRepository.map_to_domain(orm_port)


def update_port_has_excursion(self, session : Session, port_id: int ):
Expand Down
Loading
Loading