Skip to content

Commit

Permalink
add mechanism to unblock dsgsessioon poll messages method when a PyEn…
Browse files Browse the repository at this point in the history
…Sight gRPC request is incoming (#505)
  • Loading branch information
mariostieriansys authored Jan 17, 2025
1 parent 4028e44 commit b01dbfb
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 3 deletions.
9 changes: 8 additions & 1 deletion src/ansys/pyensight/core/ensight_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@
import sys
import tempfile
import threading
from typing import Any, Callable, List, Optional, Tuple, Union
from typing import TYPE_CHECKING, Any, Callable, List, Optional, Tuple, Union
import uuid

from ansys.api.pyensight.v0 import dynamic_scene_graph_pb2_grpc, ensight_pb2, ensight_pb2_grpc
import grpc

if TYPE_CHECKING:
from ansys.pyensight.core.utils.dsg_server import DSGSession


class EnSightGRPC(object):
"""Wrapper around a gRPC connection to an EnSight instance
Expand Down Expand Up @@ -60,6 +63,10 @@ def __init__(self, host: str = "127.0.0.1", port: int = 12345, secret_key: str =
self._image = None
self._image_number = 0
self._sub_service = None
self._dsg_session: Optional["DSGSession"] = None

def set_dsg_session(self, dsg_session: "DSGSession"):
self._dsg_session = dsg_session

@property
def host(self) -> str:
Expand Down
20 changes: 20 additions & 0 deletions src/ansys/pyensight/core/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from ansys.api.pyensight import ensight_api
from ansys.pyensight.core import enscontext, ensight_grpc, renderable
from ansys.pyensight.core.ensobj import ENSOBJ
from ansys.pyensight.core.utils.dsg_server import DSGSession


class InvalidEnSightVersion(Exception):
Expand Down Expand Up @@ -140,6 +141,7 @@ def __init__(
webui_port: Optional[int] = None,
) -> None:
# every session instance needs a unique name that can be used as a cache key
self._dsg_session: Optional["DSGSession"] = None
self._session_name = str(uuid.uuid1())
# when objects come into play, we can reuse them, so hash ID to instance here
self._ensobj_hash: Dict[int, "ENSOBJ"] = {}
Expand Down Expand Up @@ -962,8 +964,12 @@ def cmd(self, value: str, do_eval: bool = True) -> Any:
>>> print(session.cmd("10+4"))
14
"""
if self._dsg_session:
self._dsg_session._pyensight_grpc_coming = True
self._establish_connection()
ret = self._grpc.command(value, do_eval=do_eval)
if self._dsg_session:
self._dsg_session._pyensight_grpc_coming = False
if do_eval:
ret = self._convert_ctor(ret)
value = eval(ret, dict(session=self, ensobjlist=ensobjlist))
Expand Down Expand Up @@ -1820,3 +1826,17 @@ def find_remote_unused_ports(
cmd += f"ports = find_unused_ports({count}, start={start}, end={end}, avoid={avoid})"
self.cmd(cmd, do_eval=False)
return self.cmd("ports")

def set_dsg_session(self, dsg_session: "DSGSession"):
"""Set a DSG Session for the current PyEnSight session.
This is required if a DSGSession is running together with
PyEnSight and the second might send gRPC requests while the first
is blocked because the gRPC queue is full.
Parameters
----------
dsg_session: DSGSession
a DSGSession object
"""
self._dsg_session = dsg_session
19 changes: 17 additions & 2 deletions src/ansys/pyensight/core/utils/dsg_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@
import sys
import threading
import time
from typing import Any, Dict, List, Optional
from typing import TYPE_CHECKING, Any, Dict, List, Optional

from ansys.api.pyensight.v0 import dynamic_scene_graph_pb2
from ansys.pyensight.core import ensight_grpc
import numpy

if TYPE_CHECKING:
from ansys.pyensight.core import Session


class Part(object):
def __init__(self, session: "DSGSession"):
Expand Down Expand Up @@ -640,6 +643,7 @@ def __init__(
vrmode: bool = False,
time_scale: float = 1.0,
handler: UpdateHandler = UpdateHandler(),
session: Optional["Session"] = None,
):
"""
Manage a gRPC connection and link it to an UpdateHandler instance
Expand Down Expand Up @@ -678,6 +682,9 @@ def __init__(
"""
super().__init__()
self._grpc = ensight_grpc.EnSightGRPC(port=port, host=host, secret_key=security_code)
self._session = session
if self._session:
self._session.set_dsg_session(self)
self._callback_handler = handler
self._verbose = verbose
self._thread: Optional[threading.Thread] = None
Expand Down Expand Up @@ -705,6 +712,7 @@ def __init__(
# log any status changes to this file. external apps will be monitoring
self._status_file = os.environ.get("ANSYS_OV_SERVER_STATUS_FILENAME", "")
self._status = dict(status="idle", start_time=0.0, processed_buffers=0, total_buffers=0)
self._pyensight_grpc_coming = False

@property
def scene_bounds(self) -> Optional[List]:
Expand Down Expand Up @@ -892,6 +900,13 @@ def request_an_update(self, animation: bool = False, allow_spontaneous: bool = T
cmd.init.maximum_chunk_size = 1024 * 1024
self._dsg_queue.put(cmd) # type:ignore

def _is_queue_full(self):
if not self.max_dsg_queue_size:
return False
if self._pyensight_grpc_coming:
return False
return self._message_queue.qsize() >= self.max_dsg_queue_size

def _poll_messages(self) -> None:
"""Core interface to grab DSG events from gRPC and queue them for processing
Expand All @@ -905,7 +920,7 @@ def _poll_messages(self) -> None:
# if the queue is getting too deep, wait a bit to avoid holding too
# many messages (filling up memory)
if self.max_dsg_queue_size:
while self._message_queue.qsize() >= self.max_dsg_queue_size:
while self._is_queue_full():
time.sleep(0.001)
except Exception:
self._shutdown = True
Expand Down

0 comments on commit b01dbfb

Please sign in to comment.