diff --git a/src/ansys/aedt/core/desktop.py b/src/ansys/aedt/core/desktop.py index fddcb0ca821..c7df255fc9f 100644 --- a/src/ansys/aedt/core/desktop.py +++ b/src/ansys/aedt/core/desktop.py @@ -516,8 +516,8 @@ def __init__( if getattr(self, "_initialized", None) is not None and self._initialized: try: self.grpc_plugin.recreate_application(True) - except Exception: # nosec - pass + except Exception: + pyaedt_logger.debug("Failed to recreate application.") return else: self._initialized = True diff --git a/src/ansys/aedt/core/modeler/geometry_operators.py b/src/ansys/aedt/core/modeler/geometry_operators.py index 710900f2573..c6a46272f14 100644 --- a/src/ansys/aedt/core/modeler/geometry_operators.py +++ b/src/ansys/aedt/core/modeler/geometry_operators.py @@ -1535,9 +1535,8 @@ def v_angle_sign(va, vb, vn, right_handed=True): cross = GeometryOperators.v_cross(va, vb) if GeometryOperators.v_norm(cross) < tol: return math.pi - assert GeometryOperators.is_collinear(cross, vn), ( - "vn must be the normal to the " "plane containing va and vb." - ) # pragma: no cover + if not GeometryOperators.is_collinear(cross, vn): + raise ValueError("vn must be the normal to the plane containing va and vb") # pragma: no cover vnn = GeometryOperators.normalize_vector(vn) if right_handed: @@ -1762,10 +1761,11 @@ def is_segment_intersecting_polygon(a, b, polygon): float ``True`` if the segment intersect the polygon. ``False`` otherwise. """ - assert len(a) == 2, "point must be a list in the form [x, y]" - assert len(b) == 2, "point must be a list in the form [x, y]" + if len(a) != 2 or len(b) != 2: + raise ValueError("Point must be a list in the form [x, y]") pl = len(polygon[0]) - assert len(polygon[1]) == pl, "Polygon x and y lists must be the same length" + if len(polygon[1]) != pl: + raise ValueError("The two sublists in polygon must have the same length") a_in = GeometryOperators.is_point_in_polygon(a, polygon) b_in = GeometryOperators.is_point_in_polygon(b, polygon) diff --git a/src/ansys/aedt/core/modeler/modeler_3d.py b/src/ansys/aedt/core/modeler/modeler_3d.py index 4945c344536..820e74ba00b 100644 --- a/src/ansys/aedt/core/modeler/modeler_3d.py +++ b/src/ansys/aedt/core/modeler/modeler_3d.py @@ -104,7 +104,7 @@ def create_3dcomponent( is_encrypted=False, allow_edit=False, security_message="", - password=None, + password=None, # nosec edit_password=None, password_type="UserSuppliedPassword", hide_contents=False, @@ -195,8 +195,10 @@ def create_3dcomponent( name = self._app.design_name dt_string = datetime.datetime.now().strftime("%H:%M:%S %p %b %d, %Y") if password_type not in ["UserSuppliedPassword", "InternalPassword"]: + self.logger.error("Password type must be 'UserSuppliedPassword' or 'InternalPassword'") return False if component_outline not in ["BoundingBox", "None"]: + self.logger.error("Component outline must be 'BoundingBox' or 'None'") return False if password is None: password = os.getenv("PYAEDT_ENCRYPTED_PASSWORD", "") @@ -262,19 +264,16 @@ def create_3dcomponent( for el in objs: if "CreateRegion:1" in self.oeditor.GetChildObject(el).GetChildNames(): objs.remove(el) - arg.append("IncludedParts:="), arg.append(objs) - arg.append("HiddenParts:=") - if not hide_contents_flag: - arg.append([]) - else: - arg.append(hide_contents) - if coordinate_systems: - allcs = coordinate_systems - else: - allcs = self.oeditor.GetCoordinateSystems() - arg.append("IncludedCS:="), arg.append(allcs) - arg.append("ReferenceCS:="), arg.append(reference_coordinate_system) - par_description = [] + arg += [ + "IncludedParts:=", + objs, + "HiddenParts:=", + hide_contents if hide_contents_flag else [], + "IncludedCS:=", + coordinate_systems if coordinate_systems else list(self.oeditor.GetCoordinateSystems()), + "ReferenceCS:=", + reference_coordinate_system, + ] variables = [] dependent_variables = [] if variables_to_include is not None and not variables_to_include == []: @@ -289,39 +288,34 @@ def create_3dcomponent( elif variables_to_include is None: variables = self._app._variable_manager.independent_variable_names dependent_variables = self._app._variable_manager.dependent_variable_names + arg += [ + "IncludedParameters:=", + variables, + "IncludedDependentParameters:=", + dependent_variables, + "ParameterDescription:=", + [item for el in variables for item in (el + ":=", "")], + "IsLicensed:=", + False, + "LicensingDllName:=", + "", + "VendorComponentIdentifier:=", + "", + "PublicKeyFile:=", + "", + ] - for el in variables: - par_description.append(el + ":=") - par_description.append("") - arg.append("IncludedParameters:="), arg.append(variables) - - arg.append("IncludedDependentParameters:="), arg.append(dependent_variables) - for el in variables: - par_description.append(el + ":=") - par_description.append("") - arg.append("ParameterDescription:="), arg.append(par_description) - arg.append("IsLicensed:="), arg.append(False) - arg.append("LicensingDllName:="), arg.append("") - arg.append("VendorComponentIdentifier:="), arg.append("") - arg.append("PublicKeyFile:="), arg.append("") arg2 = ["NAME:DesignData"] - if boundaries is not None: - boundaries = boundaries - else: + if not boundaries: boundaries = self.get_boundaries_name() - arg2.append("Boundaries:="), arg2.append(boundaries) + if boundaries: + arg2 += ["Boundaries:=", boundaries] if self._app.design_type == "Icepak": - meshregions = [mr.name for mr in self._app.mesh.meshregions] - try: - meshregions.remove("Global") - except Exception: - pass - if meshregions: - arg2.append("MeshRegions:="), arg2.append(meshregions) + mesh_regions = [mr.name for mr in self._app.mesh.meshregions if mr.name != "Global"] + if mesh_regions: + arg2 += ["MeshRegions:=", mesh_regions] else: - if excitations is not None: - excitations = excitations - else: + if excitations is None: excitations = self._app.excitations if self._app.design_type == "HFSS": exc = self._app.get_oo_name(self._app.odesign, "Excitations") @@ -329,7 +323,7 @@ def create_3dcomponent( excitations.extend(exc) excitations = list(set([i.split(":")[0] for i in excitations])) if excitations: - arg2.append("Excitations:="), arg2.append(excitations) + arg2 += ["Excitations:=", excitations] meshops = [el.name for el in self._app.mesh.meshoperations] if meshops: used_mesh_ops = [] @@ -342,9 +336,9 @@ def create_3dcomponent( mesh_comp.append(self.objects[item].name) if all(included_obj in objs for included_obj in mesh_comp): used_mesh_ops.append(self._app.mesh.meshoperations[mesh].name) - arg2.append("MeshOperations:="), arg2.append(used_mesh_ops) + arg2 += ["MeshOperations:=", used_mesh_ops] else: - arg2.append("MeshOperations:="), arg2.append(meshops) + arg2 += ["MeshOperations:=", meshops] arg3 = ["NAME:ImageFile", "ImageFile:=", ""] if export_auxiliary: if isinstance(export_auxiliary, bool): @@ -514,13 +508,16 @@ def replace_3dcomponent( for el in objs: if "CreateRegion:1" in self.oeditor.GetChildObject(el).GetChildNames(): objs.remove(el) - arg.append("IncludedParts:="), arg.append(objs) - arg.append("HiddenParts:="), arg.append([]) - if not coordinate_systems: - coordinate_systems = list(self.oeditor.GetCoordinateSystems()) - arg.append("IncludedCS:="), arg.append(coordinate_systems) - arg.append("ReferenceCS:="), arg.append(reference_coordinate_system) - par_description = [] + arg += [ + "IncludedParts:=", + objs, + "HiddenParts:=", + [], + "IncludedCS:=", + coordinate_systems if coordinate_systems else list(self.oeditor.GetCoordinateSystems()), + "ReferenceCS:=", + reference_coordinate_system, + ] variables = [] if variables_to_include: dependent_variables = [] @@ -535,34 +532,24 @@ def replace_3dcomponent( else: variables = self._app._variable_manager.independent_variable_names dependent_variables = self._app._variable_manager.dependent_variable_names - - for el in variables: - par_description.append(el + ":=") - par_description.append("") - arg.append("IncludedParameters:="), arg.append(variables) - - arg.append("IncludedDependentParameters:="), arg.append(dependent_variables) - - for el in variables: - par_description.append(el + ":=") - par_description.append("") - arg.append("ParameterDescription:="), arg.append(par_description) + arg += [ + "IncludedParameters:=", + variables, + "IncludedDependentParameters:=", + dependent_variables, + "ParameterDescription:=", + [item for el in variables for item in (el + ":=", "")], + ] arg2 = ["NAME:DesignData"] - if boundaries: - boundaries = boundaries - else: + if not boundaries: boundaries = self.get_boundaries_name() if boundaries: - arg2.append("Boundaries:="), arg2.append(boundaries) + arg2 += ["Boundaries:=", boundaries] if self._app.design_type == "Icepak": - meshregions = [mr.name for mr in self._app.mesh.meshregions] - try: - meshregions.remove("Global") - except Exception: - pass - if meshregions: - arg2.append("MeshRegions:="), arg2.append(meshregions) + mesh_regions = [mr.name for mr in self._app.mesh.meshregions if mr.name != "Global"] + if mesh_regions: + arg2 += ["MeshRegions:=", mesh_regions] else: if excitations: excitations = excitations @@ -574,7 +561,7 @@ def replace_3dcomponent( excitations.extend(exc) excitations = list(set([i.split(":")[0] for i in excitations])) if excitations: - arg2.append("Excitations:="), arg2.append(excitations) + arg2 += ["Excitations:=", excitations] meshops = [el.name for el in self._app.mesh.meshoperations] if meshops: used_mesh_ops = [] @@ -587,9 +574,9 @@ def replace_3dcomponent( mesh_comp.append(self.objects[item].name) if all(included_obj in objs for included_obj in mesh_comp): used_mesh_ops.append(self._app.mesh.meshoperations[mesh].name) - arg2.append("MeshOperations:="), arg2.append(used_mesh_ops) + arg2 += ["MeshOperations:=", used_mesh_ops] else: - arg2.append("MeshOperations:="), arg2.append(meshops) + arg2 += ["MeshOperations:=", meshops] arg3 = ["NAME:ImageFile", "ImageFile:=", ""] old_components = self.user_defined_component_names self.oeditor.ReplaceWith3DComponent(arg, arg2, arg3) diff --git a/src/ansys/aedt/core/modeler/modeler_pcb.py b/src/ansys/aedt/core/modeler/modeler_pcb.py index 4c4a5c62a4a..4243ff77c6e 100644 --- a/src/ansys/aedt/core/modeler/modeler_pcb.py +++ b/src/ansys/aedt/core/modeler/modeler_pcb.py @@ -176,7 +176,8 @@ def model_units(self): @model_units.setter def model_units(self, units): - assert units in AEDT_UNITS["Length"], f"Invalid units string {units}." + if not units in AEDT_UNITS["Length"]: + raise ValueError(f"Invalid units '{units}'") self.oeditor.SetActiveUnits(units) self._model_units = units @@ -357,7 +358,7 @@ def merge_design(self, merged_design=None, x="0.0", y="0.0", z="0.0", rotation=" comp_name = str(i) break except Exception: - continue + self.logger.debug(f"Couldn't get component name from component {i}") if not comp_name: return False comp = ComponentsSubCircuit3DLayout(self, comp_name) @@ -648,8 +649,6 @@ def convert_to_selections(self, assignment, return_list=False): objnames.append(el) elif "name" in dir(el): objnames.append(el.name) - else: - pass if return_list: return objnames else: diff --git a/src/ansys/aedt/core/modeler/pcb/object_3d_layout.py b/src/ansys/aedt/core/modeler/pcb/object_3d_layout.py index 7850f9abc72..61ff2a36901 100644 --- a/src/ansys/aedt/core/modeler/pcb/object_3d_layout.py +++ b/src/ansys/aedt/core/modeler/pcb/object_3d_layout.py @@ -1064,12 +1064,12 @@ def name(self): def name(self, value): try: del self._primitives._lines[self.name] - vMaterial = ["NAME:Name", "Value:=", value] - self.change_property(vMaterial) + args = ["NAME:Name", "Value:=", value] + self.change_property(args) self._name = value self._primitives._lines[self._name] = self except Exception: - pass + self.logger.debug(f"Couldn't update geometry name into '{value}'.") @property def is_closed(self): diff --git a/src/ansys/aedt/core/modeler/pcb/primitives_3d_layout.py b/src/ansys/aedt/core/modeler/pcb/primitives_3d_layout.py index 14e613e554a..d0b5868bea8 100644 --- a/src/ansys/aedt/core/modeler/pcb/primitives_3d_layout.py +++ b/src/ansys/aedt/core/modeler/pcb/primitives_3d_layout.py @@ -769,7 +769,7 @@ def padstacks(self): if p[0] == "NAME:psd": props = p except Exception: - pass + self.logger.debug("Couldn't access first property.") self._padstacks[name] = Padstack(name, self.opadstackmanager, self.model_units) for prop in props: @@ -819,9 +819,8 @@ def padstacks(self): self._padstacks[name].layers[lay_name].connectiony = lay[14] self._padstacks[name].layers[lay_name].connectiondir = lay[16] i += 1 - pass except Exception: - pass + self.logger.debug(f"Exception caught when updating padstack '{name}' properties.") return self._padstacks diff --git a/src/ansys/aedt/core/modules/material_workbench.py b/src/ansys/aedt/core/modules/material_workbench.py index 7bbaca5a01f..ddeba623502 100644 --- a/src/ansys/aedt/core/modules/material_workbench.py +++ b/src/ansys/aedt/core/modules/material_workbench.py @@ -31,12 +31,12 @@ from collections import defaultdict import copy import re -from xml.etree.ElementTree import ParseError from ansys.aedt.core.aedt_logger import pyaedt_logger as logger from ansys.aedt.core.generic.data_handlers import normalize_string_format from ansys.aedt.core.modules.material import MatProperties import defusedxml +from defusedxml.ElementTree import ParseError defusedxml.defuse_stdlib() diff --git a/src/ansys/aedt/core/rpc/rpyc_services.py b/src/ansys/aedt/core/rpc/rpyc_services.py index c6e637d01db..ab58f12caa1 100644 --- a/src/ansys/aedt/core/rpc/rpyc_services.py +++ b/src/ansys/aedt/core/rpc/rpyc_services.py @@ -7,6 +7,7 @@ import signal import sys import time +from typing import List from ansys.aedt.core.generic.general_methods import generate_unique_name from ansys.aedt.core.generic.general_methods import env_path @@ -311,14 +312,12 @@ def exposed_run_script(self, script, aedt_version="2021.2", ansysem_path=None, n command.append(ng_feature) command = [exe_path, ng_feature, "-RunScriptAndExit", script_file] try: - p = subprocess.Popen(command) # nosec - p.wait() + subprocess.run(command, check=True) # nosec except subprocess.CalledProcessError as e: msg = f"Command failed with error: {e}" logger.error(msg) return msg return "Script Executed." - else: return "Ansys EM not found or wrong AEDT Version." @@ -891,7 +890,7 @@ def exposed_restore(self): sys.stdout = sys.__stdout__ @staticmethod - def aedt_grpc(port=None, beta_options=None, use_aedt_relative_path=False, non_graphical=True): + def aedt_grpc(port=None, beta_options: List[str]=None, use_aedt_relative_path=False, non_graphical=True, check_interval=2): """Start a new AEDT session on a specified gRPC port. Returns @@ -905,13 +904,13 @@ def aedt_grpc(port=None, beta_options=None, use_aedt_relative_path=False, non_gr import secrets secure_random = secrets.SystemRandom() port = check_port(secure_random.randint(18500, 20000)) - if port == 0: print("Error. No ports are available.") return False elif port in sessions: print(f"AEDT Session already opened on port {port}.") return True + ansysem_path = os.getenv("PYAEDT_SERVER_AEDT_PATH", "") if is_linux: executable = "ansysedt" @@ -919,45 +918,39 @@ def aedt_grpc(port=None, beta_options=None, use_aedt_relative_path=False, non_gr executable = "ansysedt.exe" if ansysem_path and not use_aedt_relative_path: aedt_exe = os.path.join(ansysem_path, executable) + if not is_safe_path(aedt_exe): + logger.warning("Ansys EM path not safe.") + return False else: aedt_exe = executable if non_graphical: ng_feature = "-features=SF6694_NON_GRAPHICAL_COMMAND_EXECUTION,SF159726_SCRIPTOBJECT" - if beta_options: - for option in range(beta_options.__len__()): - if beta_options[option] not in ng_feature: - ng_feature += "," + beta_options[option] - - command = [ - aedt_exe, - "-grpcsrv", - str(port), - ng_feature, - "-ng", - - ] else: ng_feature = "-features=SF159726_SCRIPTOBJECT" - if beta_options: - for option in range(beta_options.__len__()): - if beta_options[option] not in ng_feature: - ng_feature += "," + beta_options[option] - command = [aedt_exe, "-grpcsrv", str(port), ng_feature] - subprocess.Popen(command) + if beta_options: + for option in beta_options: + if option not in ng_feature: + ng_feature += f",{option}" + command = [aedt_exe, "-grpcsrv", str(port), ng_feature] + if non_graphical: + command.append("-ng") + + process = subprocess.Popen(command) # nosec timeout = 60 - s = socket.socket() - machine_name = "127.0.0.1" while timeout > 0: - try: - s.connect((machine_name, port)) - except socket.error: - timeout -= 2 - time.sleep(2) - else: - s.close() - timeout = 0 - print(f"Service has started on port {port}") - return port + with socket.socket() as s: + try: + s.connect(("127.0.0.1", port)) + logger.info(f"Service accessible on port {port}") + return port + except socket.error: + logger.debug(f"Service not available yet, new try in {check_interval}s.") + timeout -= 2 + time.sleep(check_interval) + + process.terminate() + logger.error(f"Service did not start within the timeout of {timeout} seconds.") + return False @property def aedt_port(self): @@ -1120,7 +1113,6 @@ def on_connect(self, connection): self.connection = connection self._processes = {} self._edb = [] - pass def on_disconnect(self, connection): """Finalize the service when the connection is closed.""" @@ -1148,20 +1140,21 @@ def start_service(self, port): """ try: port = check_port(port) - if os.getenv("PYAEDT_SERVER_AEDT_PATH",""): - ansysem_path = os.getenv("PYAEDT_SERVER_AEDT_PATH","") + ansysem_path = os.getenv("PYAEDT_SERVER_AEDT_PATH") + if ansysem_path and not os.path.exists(ansysem_path): + raise FileNotFoundError(f"The ANSYSEM path '{ansysem_path}' does not exist.") else: - aa = aedt_versions.list_installed_ansysem - if aa: - ansysem_path = os.environ[aa[0]] + version_list = aedt_versions.list_installed_ansysem + if version_list: + ansysem_path = os.environ[version_list[0]] else: - raise Exception("no ANSYSEM_ROOTXXX environment variable defined.") - name = os.path.normpath( + raise Exception("No ANSYSEM_ROOTXXX environment variable is defined.") + + script_path = os.path.normpath( os.path.join(os.path.abspath(os.path.dirname(__file__)), "local_server.py") ) - cmd_service = [sys.executable, name, ansysem_path, "1", str(port)] - print(cmd_service) - p = subprocess.Popen(cmd_service) + command = [sys.executable, script_path, ansysem_path, "1", str(port)] + p = subprocess.Popen(command) # nosec time.sleep(2) self._processes[port] = p return port diff --git a/src/ansys/aedt/core/visualization/advanced/touchstone_parser.py b/src/ansys/aedt/core/visualization/advanced/touchstone_parser.py index ff44c50950f..5cf6556dfab 100644 --- a/src/ansys/aedt/core/visualization/advanced/touchstone_parser.py +++ b/src/ansys/aedt/core/visualization/advanced/touchstone_parser.py @@ -26,7 +26,7 @@ import itertools import os import re -import subprocess +import subprocess # nosec import warnings from ansys.aedt.core.aedt_logger import pyaedt_logger as logger @@ -502,12 +502,12 @@ def check_touchstone_files(input_dir="", passivity=True, causality=True): """ out = {} - sNpFiles = find_touchstone_files(input_dir) - if not sNpFiles: + snp_files = find_touchstone_files(input_dir) + if not snp_files: return out aedt_install_folder = list(aedt_versions.installed_versions.values())[0] - for snpf in sNpFiles: - out[snpf] = [] + for file_name, path in snp_files.items(): + out[file_name] = [] if os.name == "nt": genequiv_path = os.path.join(aedt_install_folder, "genequiv.exe") else: @@ -518,22 +518,20 @@ def check_touchstone_files(input_dir="", passivity=True, causality=True): if causality: cmd.append("-checkcausality") - cmd.append(sNpFiles[snpf]) + cmd.append(path) my_env = os.environ.copy() - p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=my_env) # nosec - output = p.communicate() - output_str = str(output[0]) - output_lst = output_str.split("\\r\\n") + result = subprocess.run( + cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=my_env, text=True, check=True + ) # nosec + output_lst = result.stdout.splitlines() - if len(output_lst) == 1: - output_lst = output_str.splitlines() for line in output_lst: if "Input data" in line and passivity: msg_log = line[17:] is_passive = True if "non-passive" in msg_log: is_passive = False - out[snpf].append(["passivity", is_passive, msg_log]) + out[file_name].append(["passivity", is_passive, msg_log]) if "Maximum causality" in line and causality: msg_log = line[17:] is_causal = True @@ -544,10 +542,10 @@ def check_touchstone_files(input_dir="", passivity=True, causality=True): except Exception: is_causal = False raise Exception("Failed evaluating causality value.") - out[snpf].append(["causality", is_causal, msg_log]) + out[file_name].append(["causality", is_causal, msg_log]) if "Causality check is inconclusive" in line and causality: is_causal = False - out[snpf].append(["causality", is_causal, line[17:]]) + out[file_name].append(["causality", is_causal, line[17:]]) return out diff --git a/src/ansys/aedt/core/visualization/post/spisim.py b/src/ansys/aedt/core/visualization/post/spisim.py index 5dea72f34c1..a1a0987a826 100644 --- a/src/ansys/aedt/core/visualization/post/spisim.py +++ b/src/ansys/aedt/core/visualization/post/spisim.py @@ -73,54 +73,42 @@ def working_directory(self, val): self._working_directory = val @pyaedt_function_handler() - def _compute_spisim(self, parameter, out_file="", touchstone_file="", config_file=""): + def __compute_spisim(self, parameter, config_file, out_file=""): exec_name = "SPISimJNI_LX64.exe" if is_linux else "SPISimJNI_WIN64.exe" - spisimExe = os.path.join(self.desktop_install_dir, "spisim", "SPISim", "modules", "ext", exec_name) + spisim_exe = os.path.join(self.desktop_install_dir, "spisim", "SPISim", "modules", "ext", exec_name) + command = [spisim_exe, parameter] - cfgCmmd = "" - if touchstone_file != "": - cfgCmmd = cfgCmmd + '-i "%s"' % touchstone_file if config_file != "": - if is_linux: - cfgCmmd = "-v CFGFILE=%s" % config_file - else: - cfgCmmd = '-v CFGFILE="%s"' % config_file - if out_file: - cfgCmmd += ', -o "%s"' % out_file - command = [spisimExe, parameter, cfgCmmd] - # Debug('%s %s' % (cmdList[0], ' '.join(arguments))) - # try up to three times to be sure + command += ["-v", f"CFGFILE={config_file}"] if out_file: + command += [",", "-o", f"{out_file}"] out_processing = os.path.join(out_file, generate_unique_name("spsim_out") + ".txt") else: out_processing = os.path.join(generate_unique_folder_name(), generate_unique_name("spsim_out") + ".txt") my_env = os.environ.copy() my_env.update(settings.aedt_environment_variables) - if is_linux: # pragma: no cover if "ANSYSEM_ROOT_PATH" not in my_env: # pragma: no cover my_env["ANSYSEM_ROOT_PATH"] = self.desktop_install_dir if "SPISIM_OUTPUT_LOG" not in my_env: # pragma: no cover my_env["SPISIM_OUTPUT_LOG"] = os.path.join(out_file, generate_unique_name("spsim_out") + ".log") - with open_file(out_processing, "w") as outfile: - subprocess.Popen(command, env=my_env, stdout=outfile, stderr=outfile).wait() # nosec - else: - with open_file(out_processing, "w") as outfile: - subprocess.Popen(" ".join(command), env=my_env, stdout=outfile, stderr=outfile).wait() # nosec + + with open_file(out_processing, "w") as outfile: + subprocess.run(command, env=my_env, stdout=outfile, stderr=outfile, check=True) # nosec return out_processing @pyaedt_function_handler() - def _get_output_parameter_from_result(self, out_file, parameter_name): + def __get_output_parameter_from_result(self, out_file, parameter_name): if parameter_name == "ERL": try: with open_file(out_file, "r") as infile: lines = infile.read() - parmDat = lines.split("[ParmDat]:", 1)[1] - for keyValu in parmDat.split(","): - dataAry = keyValu.split("=") - if dataAry[0].strip().lower() == parameter_name.lower(): - return float(dataAry[1].strip().split()[0]) + parm_dat = lines.split("[ParmDat]:", 1)[1] + for key_value in parm_dat.split(","): + data_arr = key_value.split("=") + if data_arr[0].strip().lower() == parameter_name.lower(): + return float(data_arr[1].strip().split()[0]) self.logger.error( f"Failed to compute {parameter_name}. Check input parameters and retry" ) # pragma: no cover @@ -267,27 +255,22 @@ def compute_erl( cfg_dict["REFLRHO"] = permitted_reflection if permitted_reflection is not None else cfg_dict["REFLRHO"] cfg_dict["NCYCLES"] = reflections_length if reflections_length is not None else cfg_dict["NCYCLES"] - new_cfg_file = os.path.join(self.working_directory, "spisim_erl.cfg").replace("\\", "/") - with open_file(new_cfg_file, "w") as fp: + config_file = os.path.join(self.working_directory, "spisim_erl.cfg").replace("\\", "/") + with open_file(config_file, "w") as fp: for k, v in cfg_dict.items(): fp.write(f"# {k}: {k}\n") fp.write(f"{k} = {v}\n") retries = 3 if "PYTEST_CURRENT_TEST" in os.environ: retries = 10 - trynumb = 0 - while trynumb < retries: - out_processing = self._compute_spisim( - "CalcERL", - touchstone_file=self.touchstone_file, - config_file=new_cfg_file, - out_file=self.working_directory, - ) - results = self._get_output_parameter_from_result(out_processing, "ERL") + nb_retry = 0 + while nb_retry < retries: + out_processing = self.__compute_spisim("CalcERL", config_file, out_file=self.working_directory) + results = self.__get_output_parameter_from_result(out_processing, "ERL") if results: return results self.logger.warning("Failing to compute ERL, retrying...") - trynumb += 1 + nb_retry += 1 self.logger.error("Failed to compute ERL.") return False @@ -345,14 +328,14 @@ def compute_com( com_param.set_parameter("Port Order", "[1 3 2 4]" if port_order == "EvenOdd" else "[1 2 3 4]") com_param.set_parameter("RESULT_DIR", out_folder if out_folder else self.working_directory) - return self._compute_com(com_param) + return self.__compute_com(com_param) @pyaedt_function_handler - def _compute_com( + def __compute_com( self, com_parameter, ): - """Compute Channel Operating Margin. + """Compute Channel Operating Margin (COM). Parameters ---------- @@ -361,7 +344,7 @@ def _compute_com( Returns ------- - + float or list """ thru_snp = com_parameter.parameters["THRUSNP"].replace("\\", "/") fext_snp = com_parameter.parameters["FEXTARY"].replace("\\", "/") @@ -376,8 +359,8 @@ def _compute_com( cfg_file = os.path.join(com_parameter.parameters["RESULT_DIR"], "com_parameters.cfg") com_parameter.export_spisim_cfg(cfg_file) - out_processing = self._compute_spisim(parameter="COM", config_file=cfg_file) - return self._get_output_parameter_from_result(out_processing, "COM") + out_processing = self.__compute_spisim("COM", cfg_file) + return self.__get_output_parameter_from_result(out_processing, "COM") @pyaedt_function_handler def export_com_configure_file(self, file_path, standard=1):