Skip to content

Commit

Permalink
Refactors Probe in pyWrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
lmdiazangulo committed Jan 1, 2025
1 parent 2e45f4f commit 6ad160d
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 94 deletions.
202 changes: 114 additions & 88 deletions src_pyWrapper/pyWrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,103 +10,126 @@
DEFAULT_SEMBA_FDTD_PATH = '/build/bin/semba-fdtd'

class Probe():
MTLN_PROBE_TAGS = ['_V_','_I_']
CURRENT_PROBE_TAGS = ['_Wx_', '_Wy_', '_Wz_']
POINT_PROBE_TAGS = ['_Ex_', '_Ey_', '_Ez_', '_Hx_', '_Hy_', '_Hz_']
FAR_FIELD_TAG = ['_FF_']
MOVIE_TAGS = ['_ExC_', '_EyC_', '_EzC_', '_HxC_', '_HyC_', '_HzC_', '_ME_', '_MH_']

@staticmethod
def _positionStrToCell(pos_str):
pos = pos_str.split('_')
return np.array([int(pos[0]), int(pos[1]), int(pos[2])])

ALL_TAGS = MTLN_PROBE_TAGS \
+ CURRENT_PROBE_TAGS \
+ POINT_PROBE_TAGS \
+ FAR_FIELD_TAG \
+ MOVIE_TAGS

def __init__(self, probe_filename):
# This initializatoin tries to infer all probe properties from the filename.
assert os.path.isfile(probe_filename)

# This initialization tries to infer all probe properties from the filename.
self.filename = probe_filename

mtln_probe_tags = ['_V_','_I_']
current_probe_tags = ['_Wx_', '_Wy_', '_Wz_']
point_probe_tags = ['_Ex_', '_Ey_', '_Ez_', '_Hx_', '_Hy_', '_Hz_']
far_field_tag = ['_FF_']
movie_tags = ['_ExC_', '_EyC_', '_EzC_', '_HxC_', '_HyC_', '_HzC_', '_ME_', '_MH_']

all_tags = mtln_probe_tags \
+ current_probe_tags \
+ point_probe_tags \
+ far_field_tag \
+ movie_tags

basename = os.path.basename(self.filename)
if '.fdtd_' in basename:
self.case_name, basename_with_no_case_name = basename.split('.fdtd_')
self.case_name = self._getCaseNameFromFilename(probe_filename)
self.name = self._getProbeNameFromFilename(probe_filename)
self.domainType = self._getDomainTypeFromFilename(probe_filename)

tag = self._getTagFromFilename(probe_filename)
position_str = self._getPositionStrFromFilename(probe_filename)
if tag in Probe.CURRENT_PROBE_TAGS:
self.type = 'wire'
self.cell = self._positionStrToCell(position_str)
self.segment = int(position_str.split('_s')[1])
self.df = pd.read_csv(self.filename, sep='\s+')
if self.domainType == 'time':
self.df = self.df.rename(columns={\
't': 'time',
self.df.columns[1]: 'current'\
})
elif self.domainType == 'frequency':
self.df = self.df.rename( columns={\
self.df.columns[0]: 'frequency', \
self.df.columns[1]: 'magnitude', \
self.df.columns[2]: 'phase'\
})
elif tag in Probe.POINT_PROBE_TAGS:
self.type = 'point'
self.cell = self._positionStrToCell(position_str)
self.field = tag[1]
self.direction = tag[2]
self.df = pd.read_csv(self.filename, sep='\s+')
if self.domainType == 'time':
self.df = self.df.rename(columns = {
't': 'time',
self.df.columns[1]: 'field',
self.df.columns[2]: 'incident'
})
elif tag in Probe.FAR_FIELD_TAG:
self.type = 'farField'
init_str, end_str = position_str.split('__')
self.cell_init = self._positionStrToCell(init_str)
self.cell_end = self._positionStrToCell(end_str)
self.df = pd.read_csv(self.filename, sep='\s+')
elif tag in Probe.MOVIE_TAGS:
self.type = 'movie'
init_str, end_str = position_str.split('__')
self.cell_init = self._positionStrToCell(init_str)
self.cell_end = self._positionStrToCell(end_str)
elif tag in Probe.MTLN_PROBE_TAGS:
self.type ='mtln'
self.cell = self._positionStrToCell(position_str)
self.df = pd.read_csv(self.filename, sep='\s+')
else:
for tag in all_tags:
if tag in basename:
self.case_name, basename_with_no_case_name = basename.split(tag)
break
basename_with_no_case_name = os.path.splitext(basename_with_no_case_name)[0]

if '_df.' in basename:
self.domainType = 'time'
else:
self.domainType = 'frequency'

for tag in all_tags:
ids = [m.start() for m in re.finditer(tag, basename_with_no_case_name)]
if len(ids) == 0:
continue
elif len(ids) == 1:
if tag in current_probe_tags:
self.type = 'wire'
self.name, position_str = basename_with_no_case_name.split(tag)
self.cell = self._positionStrToCell(position_str)
self.segment_tag = int(position_str.split('_s')[1])
self.df = pd.read_csv(self.filename, sep='\s+')
if self.domainType == 'time':
self.df = self.df.rename(columns={self.df.columns[1]: 'current'})
elif self.domainType == 'frequency':
self.df = self.df.rename( columns={\
self.df.columns[1]: 'magnitude', \
self.df.columns[2]: 'phase'}, \
)
elif tag in point_probe_tags:
self.type = 'point'
self.name, position_str = basename_with_no_case_name.split(tag)
self.cell = self._positionStrToCell(position_str)
self.field = tag[1]
self.direction = tag[2]
self.df = pd.read_csv(self.filename, sep='\s+')
self.df = self.df.rename(columns = {
self.df.columns[1]: 'field',
self.df.columns[2]: 'incident'
})
elif tag in far_field_tag:
self.type = 'farField'
self.name, positions_str = basename_with_no_case_name.split(tag)
init_str, end_str = positions_str.split('__')
self.cell_init = self._positionStrToCell(init_str)
self.cell_end = self._positionStrToCell(end_str)
self.df = pd.read_csv(self.filename, sep='\s+')
elif tag in movie_tags:
self.type = 'movie'
self.name, positions_str = basename_with_no_case_name.split(tag)
init_str, end_str = positions_str.split('__')
self.cell_init = self._positionStrToCell(init_str)
self.cell_end = self._positionStrToCell(end_str)
elif tag in mtln_probe_tags:
self.type ='mtln'
self.name, position_str = basename_with_no_case_name.split(tag)
self.cell = self._positionStrToCell(position_str)
self.df = pd.read_csv(self.filename, sep='\s+')
else:
raise ValueError("Unable to determine probe name or type for a probe with name:" + basename)
try:
self.type
self.name
except:
raise ValueError('Unable to determine type for probe' + basename)

raise ValueError("Unable to determine probe name or type for a probe with name:" + basename)

def __getitem__(self, key):
return self.df[key]

@staticmethod
def _getCaseNameFromFilename(fn):
bn = os.path.basename(fn)
if '.fdtd_' in bn:
return bn.split('.fdtd_')[0]
else:
for tag in Probe.ALL_TAGS:
if tag in bn:
return bn.split(tag)[0]

@staticmethod
def _getProbeNameFromFilename(fn):
if '.fdtd_' in fn:
tag = Probe._getTagFromFilename(fn)
bn_without_case_name = fn.split('.fdtd_')[1]
probe_name = bn_without_case_name.split(tag)[0]
return probe_name
else:
return Probe._getCaseNameFromFilename(fn)

@staticmethod
def _getTagFromFilename(fn):
for tag in Probe.ALL_TAGS:
if tag in fn:
return tag
raise ValueError("Unable to determine probe tag")

@staticmethod
def _getPositionStrFromFilename(fn):
bn_no_ext = os.path.splitext(os.path.basename(fn))[0]
tag = Probe._getTagFromFilename(fn)
position_str = bn_no_ext.split(tag)[1]
return position_str

@staticmethod
def _getDomainTypeFromFilename(fn):
if '_df.' in os.path.basename(fn):
return 'frequency'
else:
return 'time'

@staticmethod
def _positionStrToCell(pos_str):
pos = pos_str.split('_')
return np.array([int(pos[0]), int(pos[1]), int(pos[2])])



class FDTD():
def __init__(self, input_filename, path_to_exe=None, flags = [], run_in_folder = None):
self._setFilename(input_filename)
Expand Down Expand Up @@ -184,6 +207,9 @@ def readJsonDict(self):
with open(self._filename) as input_file:
return json.load(input_file)

def __getitem__(self, key):
return self.input[key]

def cleanUp(self):
folder = self.getFolder()
extensions = ('*.dat', '*.pl', '*.txt', '*.xdmf', '*.bin', '*.h5')
Expand Down
12 changes: 7 additions & 5 deletions test/pyWrapper/test_pyWrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,19 @@ def test_read_wire_probe():
assert p.type == 'wire'
assert p.domainType == 'time'
assert np.all(p.cell == np.array([11, 11, 12]))
assert p.segment_tag == 2
assert p.segment == 2

assert len(p['t']) == 1001
assert p['t'][0] == 0.0
assert p['t'].iat[-1] == 0.2999999901276417E-007
assert len(p['time']) == 1001
assert p['time'][0] == 0.0
assert p['time'].iat[-1] == 0.2999999901276417E-007

assert len(p['current']) == 1001
assert p['current'][0] == 0.0
assert p['current'].iat[-1] == -0.513576742E-004

def test_read_probe_with_no_dot_fdtd_name():
p = Probe(OUTPUT_FOLDER + 'fakeCurrentProbe_mid_point_Wz_11_11_12_s2.dat')
p = Probe(OUTPUT_FOLDER + 'fakeCurrentProbe_mid_point_Wz_11_11_11_s2.dat')

assert p.type == 'wire'

def test_read_point_probe():
Expand All @@ -29,6 +30,7 @@ def test_read_point_probe():
assert p.case_name == 'shieldingEffectiveness'
assert p.name == 'front'
assert p.type == 'point'
assert p.domainType == 'time'
assert p.direction == 'x'
assert p.field == 'E'
assert np.all(p.cell == np.array([1, 1, 1]))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
t faceCurrentProbe_mid_point_Wz_11_11_12_s2.dat -E*dl Vplus Vminus Vplus-Vminus
t fakeCurrentProbe_mid_point_Wz_11_11_12_s2.dat -E*dl Vplus Vminus Vplus-Vminus
0.00000000000000000E+000 -0.000000000E+000 -0.000000000E+000 -0.000000000E+000 -0.000000000E+000 0.000000000E+000
0.29999999012764178E-010 -0.000000000E+000 -0.000000000E+000 -0.000000000E+000 -0.000000000E+000 0.000000000E+000
0.59999998025528356E-010 -0.000000000E+000 -0.000000000E+000 -0.000000000E+000 -0.000000000E+000 0.000000000E+000

0 comments on commit 6ad160d

Please sign in to comment.