Skip to content

Commit

Permalink
Run parsing using process, terminate when client disconnects
Browse files Browse the repository at this point in the history
  • Loading branch information
NastyBoget committed Dec 16, 2024
1 parent 5ffdb35 commit 549dd76
Show file tree
Hide file tree
Showing 13 changed files with 141 additions and 104 deletions.
8 changes: 2 additions & 6 deletions dedoc/api/cancellation.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,9 @@
from anyio import create_task_group
from fastapi import Request

from dedoc.config import get_config

logger = get_config().get("logger", logging.getLogger())


@asynccontextmanager
async def cancel_on_disconnect(request: Request) -> None:
async def cancel_on_disconnect(request: Request, logger: logging.Logger) -> None:
"""
Async context manager for async code that needs to be cancelled if client disconnects prematurely.
The client disconnect is monitored through the Request object.
Expand All @@ -25,7 +21,7 @@ async def watch_disconnect() -> None:

if message["type"] == "http.disconnect":
client = f"{request.client.host}:{request.client.port}" if request.client else "-:-"
logger.info(f"{client} - `{request.method} {request.url.path}` 499 DISCONNECTED")
logger.warning(f"{client} - `{request.method} {request.url.path}` 499 DISCONNECTED")

task_group.cancel_scope.cancel()
break
Expand Down
48 changes: 8 additions & 40 deletions dedoc/api/dedoc_api.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
import asyncio
import base64
import dataclasses
import importlib
import json
import os
import tempfile
import traceback
from typing import Optional

from anyio import get_cancelled_exc_class
from fastapi import Depends, FastAPI, File, Request, Response, UploadFile
from fastapi.responses import ORJSONResponse, UJSONResponse
from fastapi.staticfiles import StaticFiles
Expand All @@ -17,25 +13,22 @@
import dedoc.version
from dedoc.api.api_args import QueryParameters
from dedoc.api.api_utils import json2collapsed_tree, json2html, json2tree, json2txt
from dedoc.api.cancellation import cancel_on_disconnect
from dedoc.api.process_handler import ProcessHandler
from dedoc.api.schema.parsed_document import ParsedDocument
from dedoc.common.exceptions.dedoc_error import DedocError
from dedoc.common.exceptions.missing_file_error import MissingFileError
from dedoc.config import get_config
from dedoc.dedoc_manager import DedocManager
from dedoc.utils.utils import save_upload_file

config = get_config()
logger = config["logger"]
PORT = config["api_port"]
static_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "web")
static_files_dirs = config.get("static_files_dirs")

app = FastAPI()
app.mount("/web", StaticFiles(directory=config.get("static_path", static_path)), name="web")

module_api_args = importlib.import_module(config["import_path_init_api_args"])
logger = config["logger"]
manager = DedocManager(config=config)
process_handler = ProcessHandler(logger=logger)


@app.get("/")
Expand Down Expand Up @@ -65,28 +58,15 @@ def _get_static_file_path(request: Request) -> str:
return os.path.abspath(os.path.join(directory, file))


def __add_base64_info_to_attachments(document_tree: ParsedDocument, attachments_dir: str) -> None:
for attachment in document_tree.attachments:
with open(os.path.join(attachments_dir, attachment.metadata.temporary_file_name), "rb") as attachment_file:
attachment.metadata.add_attribute("base64", base64.b64encode(attachment_file.read()).decode("utf-8"))


@app.post("/upload", response_model=ParsedDocument)
async def upload(request: Request, file: UploadFile = File(...), query_params: QueryParameters = Depends()) -> Response:
parameters = dataclasses.asdict(query_params)
if not file or file.filename == "":
raise MissingFileError("Error: Missing content in request_post file parameter", version=dedoc.version.__version__)

loop = asyncio.get_running_loop()
async with cancel_on_disconnect(request):
try:
future = loop.run_in_executor(None, __parse_file, parameters, file)
document_tree = await future
except get_cancelled_exc_class():
future.cancel(DedocError)
loop.stop()
loop.close()
return JSONResponse(status_code=499, content={})
document_tree = await process_handler.handle(request=request, parameters=parameters, file=file)
if document_tree is None:
return JSONResponse(status_code=499, content={})

return_format = str(parameters.get("return_format", "json")).lower()
if return_format == "html":
Expand Down Expand Up @@ -121,22 +101,11 @@ async def upload(request: Request, file: UploadFile = File(...), query_params: Q
return ORJSONResponse(content=document_tree.to_api_schema().model_dump())


def __parse_file(parameters: dict, file: UploadFile) -> ParsedDocument:
return_format = str(parameters.get("return_format", "json")).lower()
with tempfile.TemporaryDirectory() as tmpdir:
file_path = save_upload_file(file, tmpdir)
document_tree = manager.parse(file_path, parameters={**dict(parameters), "attachments_dir": tmpdir})

if return_format == "html":
__add_base64_info_to_attachments(document_tree, tmpdir)
return document_tree


@app.get("/upload_example")
async def upload_example(file_name: str, return_format: Optional[str] = None) -> Response:
async def upload_example(request: Request, file_name: str, return_format: Optional[str] = None) -> Response:
file_path = os.path.join(static_path, "examples", file_name)
parameters = {} if return_format is None else {"return_format": return_format}
document_tree = manager.parse(file_path, parameters=parameters)
document_tree = await process_handler.handle(request=request, parameters=parameters, file=file_path)

if return_format == "html":
html_page = json2html(
Expand All @@ -152,7 +121,6 @@ async def upload_example(file_name: str, return_format: Optional[str] = None) ->

@app.exception_handler(DedocError)
async def exception_handler(request: Request, exc: DedocError) -> Response:
logger.error(f"Exception {exc}\n{traceback.format_exc()}")
result = {"message": exc.msg}
if exc.filename:
result["file_name"] = exc.filename
Expand Down
111 changes: 111 additions & 0 deletions dedoc/api/process_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import asyncio
import base64
import logging
import os
import pickle
import signal
import tempfile
import traceback
from multiprocessing import Process, Queue
from typing import Optional, Union
from urllib.request import Request

from anyio import get_cancelled_exc_class
from fastapi import UploadFile

from dedoc import DedocManager
from dedoc.api.cancellation import cancel_on_disconnect
from dedoc.common.exceptions.dedoc_error import DedocError
from dedoc.config import get_config
from dedoc.data_structures import ParsedDocument
from dedoc.utils.utils import save_upload_file


class ProcessHandler:
"""
Class for file parsing by DedocManager with support for client disconnection.
If client disconnects during file parsing, the process of parsing is fully terminated and API is available to receive new connections.
Handler uses the following algorithm:
1. Master process is used for checking current connection (client disconnect)
2. Child process is working on the background and waiting for the input file in the input_queue
3. Master process calls the child process for parsing and transfers data through the input_queue
4. Child process is parsing file using DedocManager
5. The result of parsing is transferred to the master process through the output_queue
6. If client disconnects, the child process is terminated. The new child process with queues will start with the new request
"""
def __init__(self, logger: logging.Logger) -> None:
self.input_queue = Queue()
self.output_queue = Queue()
self.logger = logger
self.process = Process(target=self.__parse_file, args=[self.input_queue, self.output_queue])
self.process.start()

async def handle(self, request: Request, parameters: dict, file: Union[UploadFile, str]) -> Optional[ParsedDocument]:
"""
Handle request in a separate process.
Checks for client disconnection and terminate the child process if client disconnected.
"""
if not self.process.is_alive():
self.__init__(logger=self.logger)

self.logger.info("Putting file to the input queue")
self.input_queue.put(pickle.dumps((parameters, file)), block=True)

loop = asyncio.get_running_loop()
async with cancel_on_disconnect(request, self.logger):
try:
future = loop.run_in_executor(None, self.output_queue.get)
result = await future
except get_cancelled_exc_class():
self.logger.warning("Terminating the parsing process")
self.process.terminate()
future.cancel(DedocError)
return None

result = pickle.loads(result)
if isinstance(result, ParsedDocument):
self.logger.info("Got the result from the output queue")
return result

raise DedocError.from_dict(result)

def __parse_file(self, input_queue: Queue, output_queue: Queue) -> None:
"""
Function for file parsing in a separate (child) process.
It's a background process, i.e. it is waiting for a task in the input queue.
The result of parsing is returned in the output queue.
Operations with `signal` are used for saving master process while killing child process.
See the issue for more details: https://github.com/fastapi/fastapi/issues/1487
"""
signal.set_wakeup_fd(-1)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_DFL)

manager = DedocManager(config=get_config())
manager.logger.info("Parsing process is waiting for the task in the input queue")

while True:
try:
parameters, file = pickle.loads(input_queue.get(block=True))
manager.logger.info("Parsing process got task from the input queue")
return_format = str(parameters.get("return_format", "json")).lower()
with tempfile.TemporaryDirectory() as tmpdir:
file_path = file if isinstance(file, str) else save_upload_file(file, tmpdir)
document_tree = manager.parse(file_path, parameters={**dict(parameters), "attachments_dir": tmpdir})

if return_format == "html":
self.__add_base64_info_to_attachments(document_tree, tmpdir)

output_queue.put(pickle.dumps(document_tree), block=True)
manager.logger.info("Parsing process put task to the output queue")
except Exception as e:
tb = traceback.format_exc()
manager.logger.error(f"Exception {e}\n{tb}")
output_queue.put(pickle.dumps(e.__dict__), block=True)

def __add_base64_info_to_attachments(self, document_tree: ParsedDocument, attachments_dir: str) -> None:
for attachment in document_tree.attachments:
with open(os.path.join(attachments_dir, attachment.metadata.temporary_file_name), "rb") as attachment_file:
attachment.metadata.add_attribute("base64", base64.b64encode(attachment_file.read()).decode("utf-8"))
6 changes: 1 addition & 5 deletions dedoc/common/exceptions/bad_file_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,7 @@ class BadFileFormatError(DedocError):
"""

def __init__(self, msg: str, msg_api: Optional[str] = None, filename: Optional[str] = None, version: Optional[str] = None) -> None:
super(BadFileFormatError, self).__init__(msg_api=msg_api, msg=msg, filename=filename, version=version)
super(BadFileFormatError, self).__init__(msg_api=msg_api, msg=msg, filename=filename, version=version, code=415)

def __str__(self) -> str:
return f"BadFileFormatError({self.msg})"

@property
def code(self) -> int:
return 415
4 changes: 0 additions & 4 deletions dedoc/common/exceptions/bad_parameters_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,3 @@ def __init__(self, msg: str, msg_api: Optional[str] = None, filename: Optional[s

def __str__(self) -> str:
return f"BadParametersError({self.msg})"

@property
def code(self) -> int:
return 400
6 changes: 1 addition & 5 deletions dedoc/common/exceptions/conversion_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,7 @@ class ConversionError(DedocError):
"""

def __init__(self, msg: str, msg_api: Optional[str] = None, filename: Optional[str] = None, version: Optional[str] = None) -> None:
super(ConversionError, self).__init__(msg_api=msg_api, msg=msg, filename=filename, version=version)
super(ConversionError, self).__init__(msg_api=msg_api, msg=msg, filename=filename, version=version, code=415)

def __str__(self) -> str:
return f"ConversionError({self.msg})"

@property
def code(self) -> int:
return 415
17 changes: 15 additions & 2 deletions dedoc/common/exceptions/dedoc_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,30 @@ def __init__(self,
msg_api: Optional[str] = None,
filename: Optional[str] = None,
version: Optional[str] = None,
metadata: Optional[dict] = None) -> None:
metadata: Optional[dict] = None,
code: Optional[int] = None) -> None:
super(DedocError, self).__init__()
self.msg = msg
self.msg_api = msg if msg_api is None else msg_api
self.filename = filename
self.version = version if version is not None else dedoc.version.__version__
self.metadata = metadata
self._code = 400 if code is None else code

def __str__(self) -> str:
return f"DedocError({self.msg})"

@property
def code(self) -> int:
return 400
return self._code

@staticmethod
def from_dict(error_dict: dict) -> "DedocError":
return DedocError(
msg=error_dict.get("msg", ""),
msg_api=error_dict.get("msg_api", ""),
filename=error_dict.get("filename", ""),
version=error_dict.get("version", dedoc.version.__version__),
metadata=error_dict.get("metadata", {}),
code=error_dict.get("_code", 500)
)
6 changes: 1 addition & 5 deletions dedoc/common/exceptions/java_not_found_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,7 @@ class JavaNotFoundError(DedocError):
"""

def __init__(self, msg: str, msg_api: Optional[str] = None, filename: Optional[str] = None, version: Optional[str] = None) -> None:
super(JavaNotFoundError, self).__init__(msg_api=msg_api, msg=msg, filename=filename, version=version)
super(JavaNotFoundError, self).__init__(msg_api=msg_api, msg=msg, filename=filename, version=version, code=500)

def __str__(self) -> str:
return f"JavaNotFoundError({self.msg})"

@property
def code(self) -> int:
return 500
19 changes: 0 additions & 19 deletions dedoc/common/exceptions/minio_error.py

This file was deleted.

4 changes: 0 additions & 4 deletions dedoc/common/exceptions/missing_file_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,3 @@ def __init__(self, msg: str, msg_api: Optional[str] = None, filename: Optional[s

def __str__(self) -> str:
return f"MissingFileError({self.msg})"

@property
def code(self) -> int:
return 400
6 changes: 1 addition & 5 deletions dedoc/common/exceptions/recognize_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,7 @@
class RecognizeError(DedocError):

def __init__(self, msg: str, msg_api: Optional[str] = None, filename: Optional[str] = None, version: Optional[str] = None) -> None:
super(RecognizeError, self).__init__(msg_api=msg_api, msg=msg, filename=filename, version=version)
super(RecognizeError, self).__init__(msg_api=msg_api, msg=msg, filename=filename, version=version, code=500)

def __str__(self) -> str:
return f"RecognizeError({self.msg})"

@property
def code(self) -> int:
return 500
4 changes: 0 additions & 4 deletions dedoc/common/exceptions/structure_extractor_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,3 @@ def __init__(self, msg: str, msg_api: Optional[str] = None, filename: Optional[s

def __str__(self) -> str:
return f"StructureExtractorError({self.msg})"

@property
def code(self) -> int:
return 400
6 changes: 1 addition & 5 deletions dedoc/common/exceptions/tabby_pdf_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,7 @@ class TabbyPdfError(DedocError):
"""

def __init__(self, msg: str, msg_api: Optional[str] = None, filename: Optional[str] = None, version: Optional[str] = None) -> None:
super(TabbyPdfError, self).__init__(msg_api=msg_api, msg=msg, filename=filename, version=version)
super(TabbyPdfError, self).__init__(msg_api=msg_api, msg=msg, filename=filename, version=version, code=500)

def __str__(self) -> str:
return f"TabbyPdfError({self.msg})"

@property
def code(self) -> int:
return 500

0 comments on commit 549dd76

Please sign in to comment.