Skip to content

Commit

Permalink
refactor: combine georeferencing and digitizing
Browse files Browse the repository at this point in the history
Combine georeferencing and digitizing into one Celery group.
This leads to not needing to ma a request UUID to two Celery task/group
ids.
  • Loading branch information
matthiasschaub committed Dec 30, 2024
1 parent e352eab commit 9413bc8
Show file tree
Hide file tree
Showing 19 changed files with 88,945 additions and 166,312 deletions.
37 changes: 2 additions & 35 deletions sketch_map_tool/database/client_flask.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json
from uuid import UUID

import psycopg2
Expand Down Expand Up @@ -34,27 +33,7 @@ def close_connection(e=None):
db_conn.close()


def _insert_id_map(uuid: str, map_: dict):
create_query = """
CREATE TABLE IF NOT EXISTS uuid_map(
uuid uuid PRIMARY KEY,
map json NOT NULL
)
"""
insert_query = "INSERT INTO uuid_map(uuid, map) VALUES (%s, %s)"
db_conn = open_connection()
with db_conn.cursor() as curs:
curs.execute(create_query)
curs.execute(insert_query, [uuid, json.dumps(map_)])


def _delete_id_map(uuid: str):
query = "DELETE FROM uuid_map WHERE uuid = %s"
db_conn = open_connection()
with db_conn.cursor() as curs:
curs.execute(query, [uuid])


# TODO: Legacy support: Delete this function after PR 515 has been deployed for 1 day
def _select_id_map(uuid) -> dict:
query = "SELECT map FROM uuid_map WHERE uuid = %s"
db_conn = open_connection()
Expand All @@ -69,6 +48,7 @@ def _select_id_map(uuid) -> dict:
)


# TODO: Legacy support: Delete this function after PR 515 has been deployed for 1 day
def get_async_result_id(request_uuid: str, request_type: REQUEST_TYPES) -> str:
"""Get the Celery Async Result IDs for a request."""
map_ = _select_id_map(request_uuid)
Expand All @@ -84,11 +64,6 @@ def get_async_result_id(request_uuid: str, request_type: REQUEST_TYPES) -> str:
) from error


def set_async_result_ids(request_uuid, map_: dict[REQUEST_TYPES, str]):
"""Set the Celery Result IDs for a request."""
_insert_id_map(request_uuid, map_)


def insert_files(
files, consent: bool
) -> tuple[list[int], list[str], list[str], list[Bbox], list[Layer]]:
Expand Down Expand Up @@ -215,11 +190,3 @@ def select_map_frame(uuid: UUID) -> tuple[bytes, str, str]:
),
{"UUID": uuid},
)


def delete_map_frame(uuid: UUID):
"""Delete map frame of the associated UUID from the database."""
query = "DELETE FROM map_frame WHERE uuid = %s"
db_conn = open_connection()
with db_conn.cursor() as curs:
curs.execute(query, [str(uuid)])
11 changes: 6 additions & 5 deletions sketch_map_tool/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,17 @@ def merge(fcs: list[FeatureCollection]) -> FeatureCollection:


def zip_(results: list[tuple[str, str, BytesIO]]) -> BytesIO:
"""ZIP the results of the Celery group of `georeference_sketch_map` tasks."""
"""ZIP the raster results of the Celery group of `upload_processing` tasks."""
buffer = BytesIO()
raw = set([r[1].replace("<br />", "\n") for r in results])
attributions = BytesIO("\n".join(raw).encode())
attributions = []
with ZipFile(buffer, "a") as zip_file:
for file_name, _, file in results:
for file_name, attribution, file in results:
stem = Path(file_name).stem
name = Path(stem).with_suffix(".geotiff")
zip_file.writestr(str(name), file.read())
zip_file.writestr("attributions.txt", attributions.read())
attributions.append(attribution.replace("<br />", "\n"))
file = BytesIO("\n".join(set(attributions)).encode())
zip_file.writestr("attributions.txt", file.read())
buffer.seek(0)
return buffer

Expand Down
144 changes: 72 additions & 72 deletions sketch_map_tool/routes.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
from io import BytesIO
from pathlib import Path
from uuid import UUID, uuid4
from uuid import UUID

import geojson
from celery import chord, group
Expand All @@ -15,6 +15,7 @@
send_from_directory,
url_for,
)
from psycopg2.errors import UndefinedTable
from werkzeug import Response

from sketch_map_tool import celery_app, config, definitions, tasks
Expand All @@ -28,12 +29,11 @@
UploadLimitsExceededError,
UUIDNotFoundError,
)
from sketch_map_tool.helpers import extract_errors, merge, to_array, zip_
from sketch_map_tool.helpers import N_, extract_errors, merge, to_array, zip_
from sketch_map_tool.models import Bbox, Layer, PaperFormat, Size
from sketch_map_tool.tasks import (
cleanup_blobs,
digitize_sketches,
georeference_sketch_map,
upload_processing,
)
from sketch_map_tool.validators import (
validate_type,
Expand Down Expand Up @@ -114,9 +114,7 @@ def create_results_post(lang="en") -> Response:
"""Create the sketch map"""
# Request parameters
bbox_raw = json.loads(request.form["bbox"])
bbox_wgs84_raw = json.loads(request.form["bboxWGS84"])
bbox = Bbox(*bbox_raw)
bbox_wgs84 = Bbox(*bbox_wgs84_raw)
format_raw = request.form["format"]
format_: PaperFormat = getattr(definitions, format_raw.upper())
orientation = request.form["orientation"]
Expand All @@ -125,30 +123,33 @@ def create_results_post(lang="en") -> Response:
scale = float(request.form["scale"])
layer = Layer(request.form["layer"].replace(":", "-").replace("_", "-").lower())

# feature flag for enabling aruco markers
# Feature flag for enabling aruco markers
if request.args.get("aruco") is None:
aruco = False
else:
aruco = True

# Unique id for current request
uuid = str(uuid4())

# Tasks
task_sketch_map = tasks.generate_sketch_map.apply_async(
args=(uuid, bbox, format_, orientation, size, scale, layer, aruco)
)
task_quality_report = tasks.generate_quality_report.apply_async(
args=tuple([bbox_wgs84])
args=(bbox, format_, orientation, size, scale, layer, aruco)
)
return redirect(url_for("create_results_get", lang=lang, uuid=task_sketch_map.id))

# Map of request type to multiple Async Result IDs
map_ = {
"sketch-map": str(task_sketch_map.id),
"quality-report": str(task_quality_report.id),
}
db_client_flask.set_async_result_ids(uuid, map_)
return redirect(url_for("create_results_get", lang=lang, uuid=uuid))

def get_async_result_id(uuid: str, type_: REQUEST_TYPES):
"""Get Celery Async or Group Result UUID for given request UUID.
Try to get Celery UUID for given request from datastore.
If no Celery UUID has been found the request UUID is the same as the Celery UUID.
This function exists only for legacy support.
"""
# TODO: Legacy support: Delete this function after PR 515 has been deployed
# for 1 day
try:
return db_client_flask.get_async_result_id(uuid, type_)
except (UUIDNotFoundError, UndefinedTable):
return uuid


@app.get("/create/results")
Expand All @@ -160,8 +161,8 @@ def create_results_get(lang="en", uuid: str | None = None) -> Response | str:
return redirect(url_for("create", lang=lang))
validate_uuid(uuid)
# Check if celery tasks for UUID exists
_ = db_client_flask.get_async_result_id(uuid, "sketch-map")
_ = db_client_flask.get_async_result_id(uuid, "quality-report")
id_ = get_async_result_id(uuid, "sketch-map")
_ = get_async_result(id_, "sketch-map")
return render_template("create-results.html", lang=lang)


Expand Down Expand Up @@ -205,11 +206,10 @@ def digitize_results_post(lang="en") -> Response:
bboxes_[uuid] = bbox
layers_[uuid] = layer

tasks_vector = []
tasks_raster = []
tasks = []
for file_id, file_name, uuid in zip(file_ids, file_names, uuids):
tasks_vector.append(
digitize_sketches.signature(
tasks.append(
upload_processing.signature(
(
file_id,
file_name,
Expand All @@ -219,40 +219,24 @@ def digitize_results_post(lang="en") -> Response:
)
)
)
tasks_raster.append(
georeference_sketch_map.signature(
(
file_id,
file_name,
map_frames[uuid],
layers_[uuid],
bboxes_[uuid],
)
)
)
async_result_raster = group(tasks_raster).apply_async()
c = chord(
group(tasks_vector),
chord_ = chord(
group(tasks),
cleanup_blobs.signature(
kwargs={"file_ids": list(set(file_ids))},
immutable=True,
),
).apply_async()
async_result_vector = c.parent
async_group_result = chord_.parent

# group results have to be saved for them to be able to be restored later
async_result_raster.save()
async_result_vector.save()

# Unique id for current request
uuid = str(uuid4())
# Mapping of request id to multiple tasks id's
map_ = {
"raster-results": str(async_result_raster.id),
"vector-results": str(async_result_vector.id),
}
db_client_flask.set_async_result_ids(uuid, map_)
return redirect(url_for("digitize_results_get", lang=lang, uuid=uuid))
async_group_result.save()
return redirect(
url_for(
"digitize_results_get",
lang=lang,
uuid=async_group_result.id,
)
)


@app.get("/digitize/results")
Expand All @@ -266,19 +250,32 @@ def digitize_results_get(lang="en", uuid: str | None = None) -> Response | str:
return render_template("digitize-results.html", lang=lang)


def get_async_result(uuid: str, type_: REQUEST_TYPES) -> AsyncResult | GroupResult:
"""Get Celery `AsyncResult` or restore `GroupResult` for given Celery UUID."""
if type_ in ("sketch-map", "quality-report"):
async_result = celery_app.AsyncResult(uuid)
elif type_ in ("vector-results", "raster-results"):
async_result = celery_app.GroupResult.restore(uuid)
else:
raise TypeError()

if async_result is None:
raise UUIDNotFoundError(
N_("There are no tasks for UUID {UUID}"),
{"UUID": uuid},
)
else:
return async_result


@app.get("/api/status/<uuid>/<type_>")
@app.get("/<lang>/api/status/<uuid>/<type_>")
def status(uuid: str, type_: REQUEST_TYPES, lang="en") -> Response:
validate_uuid(uuid)
validate_type(type_)

id_ = db_client_flask.get_async_result_id(uuid, type_)

# due to legacy support it is not possible to check only `type_`
# (in the past every Celery result was of type `AsyncResult`)
async_result = celery_app.GroupResult.restore(id_)
if async_result is None:
async_result = celery_app.AsyncResult(id_)
id_ = get_async_result_id(uuid, type_)
async_result = get_async_result(id_, type_)

href = ""
info = ""
Expand Down Expand Up @@ -336,18 +333,18 @@ def download(uuid: str, type_: REQUEST_TYPES, lang="en") -> Response:
validate_uuid(uuid)
validate_type(type_)

id_ = db_client_flask.get_async_result_id(uuid, type_)
id_ = get_async_result_id(uuid, type_)
async_result = get_async_result(id_, type_)

# due to legacy support it is not possible to check only `type_`
# (in the past every Celery result was of type `AsyncResult`)
async_result = celery_app.GroupResult.restore(id_)
if async_result is None:
async_result = celery_app.AsyncResult(id_)
if not async_result.ready() or async_result.failed():
# Abort if result not ready or failed.
# No nice error message here because user should first check /api/status.
if isinstance(async_result, GroupResult):
if not async_result.ready() or all([r.failed() for r in async_result.results]):
abort(500)
else:
if not async_result.ready() or all([r.failed() for r in async_result.results]):
if not async_result.ready() or async_result.failed():
abort(500)

match type_:
case "quality-report":
mimetype = "application/pdf"
Expand All @@ -361,16 +358,19 @@ def download(uuid: str, type_: REQUEST_TYPES, lang="en") -> Response:
mimetype = "application/zip"
download_name = type_ + ".zip"
if isinstance(async_result, GroupResult):
file: BytesIO = zip_(async_result.get(propagate=False))
results = async_result.get(propagate=False)
raster_results = [r[:-1] for r in results]
file: BytesIO = zip_(raster_results)
else:
# support legacy results
file: BytesIO = async_result.get()
case "vector-results":
mimetype = "application/geo+json"
download_name = type_ + ".geojson"
if isinstance(async_result, GroupResult):
result: list = async_result.get(propagate=False)
raw = geojson.dumps(merge(result))
results = async_result.get(propagate=False)
vector_results = [r[-1] for r in results]
raw = geojson.dumps(merge(vector_results))
file: BytesIO = BytesIO(raw.encode("utf-8"))
else:
# support legacy results
Expand Down
Loading

0 comments on commit 9413bc8

Please sign in to comment.