diff --git a/src/ansys/pyensight/core/ensight_grpc.py b/src/ansys/pyensight/core/ensight_grpc.py index dce5efb4927..105343ffaea 100644 --- a/src/ansys/pyensight/core/ensight_grpc.py +++ b/src/ansys/pyensight/core/ensight_grpc.py @@ -10,12 +10,15 @@ import sys import tempfile import threading -from typing import Any, Callable, List, Optional, Tuple, Union +from typing import TYPE_CHECKING, Any, Callable, List, Optional, Tuple, Union import uuid from ansys.api.pyensight.v0 import dynamic_scene_graph_pb2_grpc, ensight_pb2, ensight_pb2_grpc import grpc +if TYPE_CHECKING: + from ansys.pyensight.core.utils.dsg_server import DSGSession + class EnSightGRPC(object): """Wrapper around a gRPC connection to an EnSight instance @@ -60,6 +63,10 @@ def __init__(self, host: str = "127.0.0.1", port: int = 12345, secret_key: str = self._image = None self._image_number = 0 self._sub_service = None + self._dsg_session: Optional["DSGSession"] = None + + def set_dsg_session(self, dsg_session: "DSGSession"): + self._dsg_session = dsg_session @property def host(self) -> str: diff --git a/src/ansys/pyensight/core/session.py b/src/ansys/pyensight/core/session.py index 4a1a153bdb0..e87ca12277a 100644 --- a/src/ansys/pyensight/core/session.py +++ b/src/ansys/pyensight/core/session.py @@ -45,6 +45,7 @@ from ansys.api.pyensight import ensight_api from ansys.pyensight.core import enscontext, ensight_grpc, renderable from ansys.pyensight.core.ensobj import ENSOBJ + from ansys.pyensight.core.utils.dsg_server import DSGSession class InvalidEnSightVersion(Exception): @@ -140,6 +141,7 @@ def __init__( webui_port: Optional[int] = None, ) -> None: # every session instance needs a unique name that can be used as a cache key + self._dsg_session: Optional["DSGSession"] = None self._session_name = str(uuid.uuid1()) # when objects come into play, we can reuse them, so hash ID to instance here self._ensobj_hash: Dict[int, "ENSOBJ"] = {} @@ -962,8 +964,12 @@ def cmd(self, value: str, do_eval: bool = True) -> Any: >>> print(session.cmd("10+4")) 14 """ + if self._dsg_session: + self._dsg_session._pyensight_grpc_coming = True self._establish_connection() ret = self._grpc.command(value, do_eval=do_eval) + if self._dsg_session: + self._dsg_session._pyensight_grpc_coming = False if do_eval: ret = self._convert_ctor(ret) value = eval(ret, dict(session=self, ensobjlist=ensobjlist)) @@ -1820,3 +1826,17 @@ def find_remote_unused_ports( cmd += f"ports = find_unused_ports({count}, start={start}, end={end}, avoid={avoid})" self.cmd(cmd, do_eval=False) return self.cmd("ports") + + def set_dsg_session(self, dsg_session: "DSGSession"): + """Set a DSG Session for the current PyEnSight session. + + This is required if a DSGSession is running together with + PyEnSight and the second might send gRPC requests while the first + is blocked because the gRPC queue is full. + + Parameters + ---------- + dsg_session: DSGSession + a DSGSession object + """ + self._dsg_session = dsg_session diff --git a/src/ansys/pyensight/core/utils/dsg_server.py b/src/ansys/pyensight/core/utils/dsg_server.py index b7f5f48ec60..0875ce37aa7 100644 --- a/src/ansys/pyensight/core/utils/dsg_server.py +++ b/src/ansys/pyensight/core/utils/dsg_server.py @@ -6,12 +6,15 @@ import sys import threading import time -from typing import Any, Dict, List, Optional +from typing import TYPE_CHECKING, Any, Dict, List, Optional from ansys.api.pyensight.v0 import dynamic_scene_graph_pb2 from ansys.pyensight.core import ensight_grpc import numpy +if TYPE_CHECKING: + from ansys.pyensight.core import Session + class Part(object): def __init__(self, session: "DSGSession"): @@ -640,6 +643,7 @@ def __init__( vrmode: bool = False, time_scale: float = 1.0, handler: UpdateHandler = UpdateHandler(), + session: Optional["Session"] = None, ): """ Manage a gRPC connection and link it to an UpdateHandler instance @@ -678,6 +682,9 @@ def __init__( """ super().__init__() self._grpc = ensight_grpc.EnSightGRPC(port=port, host=host, secret_key=security_code) + self._session = session + if self._session: + self._session.set_dsg_session(self) self._callback_handler = handler self._verbose = verbose self._thread: Optional[threading.Thread] = None @@ -705,6 +712,7 @@ def __init__( # log any status changes to this file. external apps will be monitoring self._status_file = os.environ.get("ANSYS_OV_SERVER_STATUS_FILENAME", "") self._status = dict(status="idle", start_time=0.0, processed_buffers=0, total_buffers=0) + self._pyensight_grpc_coming = False @property def scene_bounds(self) -> Optional[List]: @@ -892,6 +900,13 @@ def request_an_update(self, animation: bool = False, allow_spontaneous: bool = T cmd.init.maximum_chunk_size = 1024 * 1024 self._dsg_queue.put(cmd) # type:ignore + def _is_queue_full(self): + if not self.max_dsg_queue_size: + return False + if self._pyensight_grpc_coming: + return False + return self._message_queue.qsize() >= self.max_dsg_queue_size + def _poll_messages(self) -> None: """Core interface to grab DSG events from gRPC and queue them for processing @@ -905,7 +920,7 @@ def _poll_messages(self) -> None: # if the queue is getting too deep, wait a bit to avoid holding too # many messages (filling up memory) if self.max_dsg_queue_size: - while self._message_queue.qsize() >= self.max_dsg_queue_size: + while self._is_queue_full(): time.sleep(0.001) except Exception: self._shutdown = True