From a537603ee25fc860caae77ff09c50ecdcf4c166e Mon Sep 17 00:00:00 2001 From: Brian Gow Date: Mon, 21 Oct 2024 11:13:39 -0400 Subject: [PATCH] catch open once read many exception --- waveform_benchmark/benchmark.py | 610 +++++++++++++++-------------- waveform_benchmark/formats/base.py | 3 - 2 files changed, 318 insertions(+), 295 deletions(-) diff --git a/waveform_benchmark/benchmark.py b/waveform_benchmark/benchmark.py index 9731920..553bff6 100755 --- a/waveform_benchmark/benchmark.py +++ b/waveform_benchmark/benchmark.py @@ -29,62 +29,69 @@ def append_result(format_name, waveform_name, test_name, result, format_list, wa return format_list, waveform_list, test_list, result_list -def __read(fmt, path, total_length, all_channels, block_length, block_count, rng, test1 = False): - reader = fmt() + +def __read(fmt, path, total_length, all_channels, block_length, block_count, rng, test1=False): + reader = fmt() for j in range(block_count): t0 = rng.random() * (total_length - block_length) t1 = t0 + block_length if test1: c = rng.choice(all_channels) - reader.read_waveforms(path, t0, t1, [c]) + reader.read_waveforms(path, t0, t1, [c]) else: - reader.read_waveforms(path, t0, t1, all_channels) - -# reads random channel and random block, opening file each time. + reader.read_waveforms(path, t0, t1, all_channels) + + # reads random channel and random block, opening file each time. + + def _run_read_test(fmt, path, total_length, all_channels, block_length, block_count, - test_min_dur = 10, test_min_iter = 3, test1 = False, mem_profile = False): + test_min_dur=10, test_min_iter=3, test1=False, mem_profile=False): r = random.Random(12345) counters = [] mem_usages = [] for i in repeat_test(test_min_dur, test_min_iter): - with PerformanceCounter(mem_profile = mem_profile) as pc: + with PerformanceCounter(mem_profile=mem_profile) as pc: if (mem_profile): mem_usage = memory_usage( (__read, (fmt, path, total_length, all_channels, block_length, block_count, r), {"test1": test1}), - include_children = True, max_usage = True) + include_children=True, max_usage=True) mem_usages.append(mem_usage) else: __read(fmt, path, total_length, all_channels, block_length, block_count, r, test1=test1) counters.append(pc) return counters, mem_usages - + # kwargs parameters are for any additional parameters that might be useful for optimization. def __open(fmt, path, channels, **kwargs): reader = fmt() opened = reader.open_waveforms(path, channels, **kwargs) - + return (reader, opened) + def __close(reader, opened): reader.close_waveforms(opened) - -def __read_rand_channel(reader, opened, total_length, channels, block_length, block_count, rng, rand_channel:bool = False): - # TODO test: - # reader = fmt() + + +def __read_rand_channel(reader, opened, total_length, channels, block_length, block_count, rng, + rand_channel: bool = False): + # TODO test: + # reader = fmt() for j in range(block_count): t0 = rng.random() * (total_length - block_length) t1 = t0 + block_length if rand_channel: c = rng.choice(channels) - reader.read_opened_waveforms(opened, t0, t1, [c]) + reader.read_opened_waveforms(opened, t0, t1, [c]) else: reader.read_opened_waveforms(opened, t0, t1, channels) + # reads random channel and random block, opening file(s) once. def _run_read_test_opened_rand_channel(fmt, path, total_length, all_channels, block_length, block_count, - test_min_dur = 10, test_min_iter = 3, - rand_channel:bool = True, test1ch:bool = False, mem_profile:bool = False): + test_min_dur=10, test_min_iter=3, + rand_channel: bool = True, test1ch: bool = False, mem_profile: bool = False): r = random.Random(12345) open_counters = [] read_counters = [] @@ -92,40 +99,43 @@ def _run_read_test_opened_rand_channel(fmt, path, total_length, all_channels, bl open_mem_usages = [] read_mem_usages = [] close_mem_usages = [] - + if (not rand_channel) and test1ch: channels = [r.choice(all_channels)] else: channels = all_channels rand_channel = test1ch and rand_channel - + for i in repeat_test(test_min_dur, test_min_iter): # open - with PerformanceCounter(mem_profile = mem_profile) as pc: + with PerformanceCounter(mem_profile=mem_profile) as pc: if (mem_profile): mem_usage, (reader, opened) = memory_usage( - (__open, (fmt, path, channels), + (__open, (fmt, path, channels), {'total_length': total_length, 'block_length': block_length, 'block_count': block_count}), - include_children = True, max_usage = True, retval = True) + include_children=True, max_usage=True, retval=True) open_mem_usages.append(mem_usage) else: - (reader, opened) = __open(fmt, path, channels, total_length = total_length, - block_length = block_length, block_count = block_count) + (reader, opened) = __open(fmt, path, channels, total_length=total_length, + block_length=block_length, block_count=block_count) open_counters.append(pc) - # read - with PerformanceCounter(mem_profile = mem_profile) as pc: + # read + with PerformanceCounter(mem_profile=mem_profile) as pc: if (mem_profile): - mem_usage = memory_usage((__read_rand_channel, (reader, opened, total_length, channels, block_length, block_count, r), {"rand_channel": rand_channel}), include_children = True, max_usage = True) + mem_usage = memory_usage((__read_rand_channel, + (reader, opened, total_length, channels, block_length, block_count, r), + {"rand_channel": rand_channel}), include_children=True, max_usage=True) read_mem_usages.append(mem_usage) else: - __read_rand_channel(reader, opened, total_length, channels, block_length, block_count, r, rand_channel=rand_channel) + __read_rand_channel(reader, opened, total_length, channels, block_length, block_count, r, + rand_channel=rand_channel) read_counters.append(pc) - + # close - with PerformanceCounter(mem_profile = mem_profile) as pc: + with PerformanceCounter(mem_profile=mem_profile) as pc: if (mem_profile): - mem_usage = memory_usage((__close, (reader, opened), {}), include_children = True, max_usage = True) + mem_usage = memory_usage((__close, (reader, opened), {}), include_children=True, max_usage=True) close_mem_usages.append(mem_usage) else: __close(reader, opened) @@ -143,15 +153,15 @@ def compute_snr(reference_signal, output_signal): output_signal = np.asarray(output_signal) # Check that the signals have the same dimensions and all finite values. - assert(np.array_equal(np.shape(reference_signal), np.shape(output_signal))) - assert(np.all(np.isfinite(reference_signal)) and np.all(np.isfinite(output_signal))) + assert (np.array_equal(np.shape(reference_signal), np.shape(output_signal))) + assert (np.all(np.isfinite(reference_signal)) and np.all(np.isfinite(output_signal))) # Compute the noise in the signal. noise_signal = output_signal - reference_signal # Compute the SNR with special handling for edge cases. - x = np.sum(reference_signal**2) - y = np.sum(noise_signal**2) + x = np.sum(reference_signal ** 2) + y = np.sum(noise_signal ** 2) if x > 0 and y > 0: snr = 10 * np.log10(x / y) @@ -162,47 +172,49 @@ def compute_snr(reference_signal, output_signal): return snr -def print_header(mem_profile:bool = False, with_ops:bool = False): + +def print_header(mem_profile: bool = False, with_ops: bool = False): ops = "OP " if with_ops else "" if (mem_profile): print('%s #seek #read KiB CPU(s) Wall(s) Mem(MB)(used/maxrss/malloced) [N]' % ops) else: print('%s #seek #read KiB CPU(s) Wall(s) [N]' % ops) - -def print_results(op_str:str, channels:str, counters, mem_usages, block_count, block_length, mem_profile:bool = False): - + + +def print_results(op_str: str, channels: str, counters, mem_usages, block_count, block_length, + mem_profile: bool = False): if (mem_profile): print('%s%6.0f %6.0f %8.0f %8.4f %8.4f %8.4f/%8.4f/%8.4f %6s open %d x %.0fs, %s' - % (op_str, - median_attr(counters, 'n_seek_calls'), - median_attr(counters, 'n_read_calls'), - median_attr(counters, 'n_bytes_read') / 1024, - median_attr(counters, 'cpu_seconds'), - median_attr(counters, 'walltime'), - # walltime / len(counters), - np.median(mem_usages), - median_attr(counters, 'max_rss'), - median_attr(counters, 'malloced'), - '[%d]' % len(counters), - block_count, - block_length, - channels)) + % (op_str, + median_attr(counters, 'n_seek_calls'), + median_attr(counters, 'n_read_calls'), + median_attr(counters, 'n_bytes_read') / 1024, + median_attr(counters, 'cpu_seconds'), + median_attr(counters, 'walltime'), + # walltime / len(counters), + np.median(mem_usages), + median_attr(counters, 'max_rss'), + median_attr(counters, 'malloced'), + '[%d]' % len(counters), + block_count, + block_length, + channels)) else: print('%s%6.0f %6.0f %8.0f %8.4f %8.4f %6s open %d x %.0fs, %s' - % (op_str, - median_attr(counters, 'n_seek_calls'), - median_attr(counters, 'n_read_calls'), - median_attr(counters, 'n_bytes_read') / 1024, - median_attr(counters, 'cpu_seconds'), - median_attr(counters, 'walltime'), - '[%d]' % len(counters), - block_count, - block_length, - channels)) + % (op_str, + median_attr(counters, 'n_seek_calls'), + median_attr(counters, 'n_read_calls'), + median_attr(counters, 'n_bytes_read') / 1024, + median_attr(counters, 'cpu_seconds'), + median_attr(counters, 'walltime'), + '[%d]' % len(counters), + block_count, + block_length, + channels)) -def run_benchmarks(input_record, format_class, pn_dir=None, format_list=None, waveform_list=None, test_list=None, - result_list=None, mem_profile = False, test_only = False): +def run_benchmarks(input_record, format_class, pn_dir=None, format_list=None, waveform_list=None, test_list=None, + result_list=None, mem_profile=False, test_only=False): # Load the class we will be testing module_name, class_name = format_class.rsplit('.', 1) module = importlib.import_module(module_name) @@ -229,15 +241,15 @@ def run_benchmarks(input_record, format_class, pn_dir=None, format_list=None, wa for chunk in waveform['chunks']) # Collect summary information about each channel in the waveform - inv_gain = 1/waveform['chunks'][-1]['gain'] + inv_gain = 1 / waveform['chunks'][-1]['gain'] res_rounded = f'{float(f"{inv_gain:.3g}"):g}' resolution = f"{res_rounded}({waveforms[name]['units']})" precise_channel_length += sum(chunk['end_time'] - chunk['start_time'] - for chunk in waveform['chunks']) + for chunk in waveform['chunks']) waveform_characterizations[name] = { - 'fs': waveform['samples_per_second'], - 'bit_resolution': resolution, - 'channel_length': precise_channel_length + 'fs': waveform['samples_per_second'], + 'bit_resolution': resolution, + 'channel_length': precise_channel_length } total_timepoints = total_length * timepoints_per_second @@ -248,9 +260,9 @@ def run_benchmarks(input_record, format_class, pn_dir=None, format_list=None, wa TEST_BLOCK_LENGTHS = [ [total_length, 1], - [500, 5], # 5 random blocks of 500 seconds - [50, 50], # 50 random blocks of 50 seconds - [5, 500], # 500 random blocks of 5 seconds + [500, 5], # 5 random blocks of 500 seconds + [50, 50], # 50 random blocks of 50 seconds + [5, 500], # 500 random blocks of 5 seconds ] TEST_MIN_DURATION = 10 @@ -285,14 +297,14 @@ def run_benchmarks(input_record, format_class, pn_dir=None, format_list=None, wa # Write the example data to a file or files. # time1 = time.time() - with PerformanceCounter(mem_profile = mem_profile) as pc_write: + with PerformanceCounter(mem_profile=mem_profile) as pc_write: if mem_profile: mem_usage = memory_usage((fmt().write_waveforms, (path, waveforms), {}), - include_children = True, max_usage = True) + include_children=True, max_usage=True) else: fmt().write_waveforms(path, waveforms) # wall_time = time.time() - time1 - + # Calculate total size of the file(s). output_size = 0 for subdir, dirs, files in os.walk(tempdir): @@ -304,12 +316,12 @@ def run_benchmarks(input_record, format_class, pn_dir=None, format_list=None, wa print('CPU time: %.4f sec' % pc_write.cpu_seconds) # print('Wall Time: %.0f s' % wall_time) print('Wall Time: %.4f s' % pc_write.walltime) - + if (mem_profile): print('Memory Used (memory_profiler): %.0f MiB' % mem_usage) print('Maximum Memory Used (max_rss): %.0f MiB' % pc_write.max_rss) print('Memory Malloced (tracemalloc): %.0f MiB' % pc_write.malloced) - + print('_' * 64) if format_list is not None: @@ -330,7 +342,7 @@ def run_benchmarks(input_record, format_class, pn_dir=None, format_list=None, wa print("Chunk\t\t\tNumeric Samples\t\t\t\t NaN Samples") print(f"\t# Errors / Total\t{'% Eq':^8}\t{'SNR':^8}\tNaN Values Match") - for channel,waveform in waveforms.items(): + for channel, waveform in waveforms.items(): print(f"Signal: {channel}") # Loop over chunks # print("Chunk\t\t Numeric Samples\t\t NaN Samples") @@ -366,15 +378,16 @@ def run_benchmarks(input_record, format_class, pn_dir=None, format_list=None, wa filedata_nonan = filedata[~np.isnan(data)] # use numpy's isclose to determine floating point equality - isgood = np.isclose(filedata_nonan, data_nonan, atol=0.5/chunk['gain']) + isgood = np.isclose(filedata_nonan, data_nonan, atol=0.5 / chunk['gain']) numgood = np.sum(isgood) - fpeq_rel = numgood/len(data_nonan) if len(data_nonan) != 0 else np.nan + fpeq_rel = numgood / len(data_nonan) if len(data_nonan) != 0 else np.nan # compute SNR to quantify signal fidelity snr = compute_snr(data_nonan, filedata_nonan) # print to table - print(f"{i_ch:^5}\t{len(data_nonan)-numgood:10}/{len(data_nonan):10}\t{fpeq_rel*100:^6.3f}\t\t{snr:^6.1f}\t\t{numnanstr:^16}") + print( + f"{i_ch:^5}\t{len(data_nonan) - numgood:10}/{len(data_nonan):10}\t{fpeq_rel * 100:^6.3f}\t\t{snr:^6.1f}\t\t{numnanstr:^16}") # print up to 10 bad values if not all equal if numgood != len(data_nonan): @@ -384,236 +397,249 @@ def run_benchmarks(input_record, format_class, pn_dir=None, format_list=None, wa print(filedata_nonan[~isgood][:10]) print(f"(Gain: {chunk['gain']})") # print('_' * 64) - + # Fidelity Check 2 # Loop over each waveform - print('_' * 64) - print("Fidelity check via open/read/close:") - print() - print("Chunk\t\t\tNumeric Samples\t\t\t\t NaN Samples") - print(f"\t# Errors / Total\t{'% Eq':^8}\t{'SNR':^8}\tNaN Values Match") - for channel,waveform in waveforms.items(): - print(f"Signal: {channel}") - # Loop over chunks - # print("Chunk\t\t Numeric Samples\t\t NaN Samples") - # print(f"\t# Errors / Total\t{'% Eq':^8}\tNaN Values Match") - - for i_ch, chunk in enumerate(waveform["chunks"]): - st = chunk["start_time"] - et = chunk["end_time"] - data = chunk["samples"] - - # read chunk from file - filedata = fmt().open_read_close_waveforms(path, st, et, [channel]) - filedata = filedata[channel] - - # compare values - - # check arrays are same size - if data.shape != filedata.shape: - print(f"{i_ch:^5}\t --- Different shapes (input: {data.shape}, file: {filedata.shape}) ---") - continue - - # check for nans in correct location - NANdiff = np.sum(np.isnan(data) != np.isnan(filedata)) - numnan = np.sum(np.isnan(data)) - numnanstr = f"{'N' if NANdiff else 'Y'} ({numnan})" - - # remove nans for equality check - data_nonan = data[~np.isnan(data)] - filedata_nonan = filedata[~np.isnan(data)] - - # use numpy's isclose to determine floating point equality - isgood = np.isclose(filedata_nonan, data_nonan, atol=0.5/chunk['gain']) - numgood = np.sum(isgood) - fpeq_rel = numgood/len(data_nonan) if len(data_nonan) != 0 else np.nan - - # compute SNR to quantify signal fidelity - snr = compute_snr(data_nonan, filedata_nonan) - - # print to table - print(f"{i_ch:^5}\t{len(data_nonan)-numgood:10}/{len(data_nonan):10}\t{fpeq_rel*100:^6.3f}\t\t{snr:^6.1f}\t\t{numnanstr:^16}") + try: + print('_' * 64) + print("Fidelity check via open/read/close:") + print() + print("Chunk\t\t\tNumeric Samples\t\t\t\t NaN Samples") + print(f"\t# Errors / Total\t{'% Eq':^8}\t{'SNR':^8}\tNaN Values Match") + + for channel,waveform in waveforms.items(): + print(f"Signal: {channel}") + # Loop over chunks + # print("Chunk\t\t Numeric Samples\t\t NaN Samples") + # print(f"\t# Errors / Total\t{'% Eq':^8}\tNaN Values Match") + + for i_ch, chunk in enumerate(waveform["chunks"]): + st = chunk["start_time"] + et = chunk["end_time"] + data = chunk["samples"] + + # read chunk from file + filedata = fmt().open_read_close_waveforms(path, st, et, [channel]) + filedata = filedata[channel] + + # compare values + + # check arrays are same size + if data.shape != filedata.shape: + print(f"{i_ch:^5}\t --- Different shapes (input: {data.shape}, file: {filedata.shape}) ---") + continue + + # check for nans in correct location + NANdiff = np.sum(np.isnan(data) != np.isnan(filedata)) + numnan = np.sum(np.isnan(data)) + numnanstr = f"{'N' if NANdiff else 'Y'} ({numnan})" + + # remove nans for equality check + data_nonan = data[~np.isnan(data)] + filedata_nonan = filedata[~np.isnan(data)] + + # use numpy's isclose to determine floating point equality + isgood = np.isclose(filedata_nonan, data_nonan, atol=0.5/chunk['gain']) + numgood = np.sum(isgood) + fpeq_rel = numgood/len(data_nonan) if len(data_nonan) != 0 else np.nan + + # compute SNR to quantify signal fidelity + snr = compute_snr(data_nonan, filedata_nonan) + + # print to table + print(f"{i_ch:^5}\t{len(data_nonan)-numgood:10}/{len(data_nonan):10}\t{fpeq_rel*100:^6.3f}\t\t{snr:^6.1f}\t\t{numnanstr:^16}") + + # print up to 10 bad values if not all equal + if numgood != len(data_nonan): + print("Subset of unuequal numeric data from input:") + print(data_nonan[~isgood][:10]) + print("Subset of unuequal numeric data from formatted file:") + print(filedata_nonan[~isgood][:10]) + print(f"(Gain: {chunk['gain']})") + # print('_' * 64) + except NotImplementedError: + print("Skipping Fidelity check open/read/close - open once, read many - this format has not implemented the required functions.") - # print up to 10 bad values if not all equal - if numgood != len(data_nonan): - print("Subset of unuequal numeric data from input:") - print(data_nonan[~isgood][:10]) - print("Subset of unuequal numeric data from formatted file:") - print(filedata_nonan[~isgood][:10]) - print(f"(Gain: {chunk['gain']})") - # print('_' * 64) - if not test_only: - print('_' * 64) - print('Open Once Read Many performance (median of N trials):') - print_header(mem_profile=mem_profile, with_ops=True) - - for block_length, block_count in TEST_BLOCK_LENGTHS: - - # time1 = time.time() - open_counters, open_mem_usages, read_counters, read_mem_usages, close_counters, close_mem_usages = \ - _run_read_test_opened_rand_channel(fmt, path, total_length, all_channels, block_length, block_count, - test_min_dur = TEST_MIN_DURATION, test_min_iter = TEST_MIN_ITERATIONS, - rand_channel = True, test1ch = False, mem_profile = mem_profile) - # walltime = time.time() - time1 - - print_results("open ", "all channels", open_counters, open_mem_usages, - block_count, block_length, mem_profile = mem_profile) - - if format_list is not None: - # Append open time result - format_list, waveform_list, test_list, result_list = append_result(format_class, input_record, - f'{block_count}_all_open', - median_attr(open_counters, 'cpu_seconds'), - format_list, - waveform_list, test_list, - result_list) - - print_results("read ", "all channels", - read_counters, read_mem_usages, block_count, block_length, mem_profile = mem_profile) + try: + print('_' * 64) + print('Open Once Read Many performance (median of N trials):') + print_header(mem_profile=mem_profile, with_ops=True) + + for block_length, block_count in TEST_BLOCK_LENGTHS: + + # time1 = time.time() + open_counters, open_mem_usages, read_counters, read_mem_usages, close_counters, close_mem_usages = \ + _run_read_test_opened_rand_channel(fmt, path, total_length, all_channels, block_length, block_count, + test_min_dur = TEST_MIN_DURATION, test_min_iter = TEST_MIN_ITERATIONS, + rand_channel = True, test1ch = False, mem_profile = mem_profile) + # walltime = time.time() - time1 + + print_results("open ", "all channels", open_counters, open_mem_usages, + block_count, block_length, mem_profile = mem_profile) + + if format_list is not None: + # Append open time result + format_list, waveform_list, test_list, result_list = append_result(format_class, input_record, + f'{block_count}_all_open', + median_attr(open_counters, 'cpu_seconds'), + format_list, + waveform_list, test_list, + result_list) + + print_results("read ", "all channels", + read_counters, read_mem_usages, block_count, block_length, mem_profile = mem_profile) + + if format_list is not None: + # Append read time result + format_list, waveform_list, test_list, result_list = append_result(format_class, input_record, + f'{block_count}_all_read', + median_attr(read_counters, 'cpu_seconds'), + format_list, + waveform_list, test_list, + result_list) + + print_results("close ", "all channels", + close_counters, close_mem_usages, block_count, block_length, mem_profile = mem_profile) + + if format_list is not None: + # Append close time result + format_list, waveform_list, test_list, result_list = append_result(format_class, input_record, + f'{block_count}_all_close', + median_attr(close_counters, 'cpu_seconds'), + format_list, + waveform_list, test_list, + result_list) + for block_length, block_count in TEST_BLOCK_LENGTHS: + open_counters, open_mem_usages, read_counters, read_mem_usages, close_counters, close_mem_usages = \ + _run_read_test_opened_rand_channel(fmt, path, total_length, all_channels, block_length, block_count, + test_min_dur = TEST_MIN_DURATION, test_min_iter = TEST_MIN_ITERATIONS, + rand_channel = True, test1ch = True, mem_profile = mem_profile) + + print_results("open ", "rand channel", + open_counters, open_mem_usages, block_count, block_length, mem_profile = mem_profile) + + if format_list: + format_list, waveform_list, test_list, result_list = append_result(format_class, input_record, + f'{block_count}_one_open_rand', + median_attr(open_counters, 'cpu_seconds'), + format_list, + waveform_list, test_list, + result_list) + + print_results("read ", "rand channel", + read_counters, read_mem_usages, block_count, block_length, mem_profile = mem_profile) + if format_list: + format_list, waveform_list, test_list, result_list = append_result(format_class, input_record, + f'{block_count}_one_read_rand', + median_attr(read_counters, 'cpu_seconds'), + format_list, + waveform_list, test_list, + result_list) + + print_results("close ", "rand channel", close_counters, close_mem_usages, block_count, block_length, mem_profile = mem_profile) + if format_list: + format_list, waveform_list, test_list, result_list = append_result(format_class, input_record, + f'{block_count}_one_close_rand', + median_attr(close_counters, 'cpu_seconds'), + format_list, + waveform_list, test_list, + result_list) + + + for block_length, block_count in TEST_BLOCK_LENGTHS: + + # time1 = time.time() + open_counters, open_mem_usages, read_counters, read_mem_usages, close_counters, close_mem_usages = \ + _run_read_test_opened_rand_channel(fmt, path, total_length, all_channels, block_length, block_count, + test_min_dur = TEST_MIN_DURATION, test_min_iter = TEST_MIN_ITERATIONS, + rand_channel = False, test1ch = True, mem_profile = mem_profile) + # walltime = time.time() - time1 + + print_results("open ", "one channel", open_counters, open_mem_usages, block_count, block_length, mem_profile = mem_profile) + + if format_list is not None: + # Append open time result + format_list, waveform_list, test_list, result_list = append_result(format_class, input_record, + f'{block_count}_one_open_fixed', + median_attr(open_counters, 'cpu_seconds'), + format_list, + waveform_list, test_list, + result_list) + + print_results("read ", "one channel", read_counters, read_mem_usages, block_count, block_length, mem_profile = mem_profile) + + if format_list is not None: + # Append read time result + format_list, waveform_list, test_list, result_list = append_result(format_class, input_record, + f'{block_count}_one_read_fixed', + median_attr(read_counters, 'cpu_seconds'), + format_list, + waveform_list, test_list, + result_list) + + print_results("close ", "one channel", close_counters, close_mem_usages, block_count, block_length, mem_profile = mem_profile) + + if format_list is not None: + # Append close time result + format_list, waveform_list, test_list, result_list = append_result(format_class, input_record, + f'{block_count}_one_close_fixed', + median_attr(close_counters, 'cpu_seconds'), + format_list, + waveform_list, test_list, + result_list) + + except NotImplementedError: + print( + "Skipping Open Once Read Many performance test since this format has not implemented the required functions.") - if format_list is not None: - # Append read time result - format_list, waveform_list, test_list, result_list = append_result(format_class, input_record, - f'{block_count}_all_read', - median_attr(read_counters, 'cpu_seconds'), - format_list, - waveform_list, test_list, - result_list) - - print_results("close ", "all channels", - close_counters, close_mem_usages, block_count, block_length, mem_profile = mem_profile) + print('_' * 64) + print('Read performance (median of N trials):') + print_header(mem_profile=mem_profile, with_ops=False) - if format_list is not None: - # Append close time result - format_list, waveform_list, test_list, result_list = append_result(format_class, input_record, - f'{block_count}_all_close', - median_attr(close_counters, 'cpu_seconds'), - format_list, - waveform_list, test_list, - result_list) for block_length, block_count in TEST_BLOCK_LENGTHS: - open_counters, open_mem_usages, read_counters, read_mem_usages, close_counters, close_mem_usages = \ - _run_read_test_opened_rand_channel(fmt, path, total_length, all_channels, block_length, block_count, - test_min_dur = TEST_MIN_DURATION, test_min_iter = TEST_MIN_ITERATIONS, - rand_channel = True, test1ch = True, mem_profile = mem_profile) - - print_results("open ", "rand channel", - open_counters, open_mem_usages, block_count, block_length, mem_profile = mem_profile) - - if format_list: - format_list, waveform_list, test_list, result_list = append_result(format_class, input_record, - f'{block_count}_one_open_rand', - median_attr(open_counters, 'cpu_seconds'), - format_list, - waveform_list, test_list, - result_list) - - print_results("read ", "rand channel", - read_counters, read_mem_usages, block_count, block_length, mem_profile = mem_profile) - if format_list: - format_list, waveform_list, test_list, result_list = append_result(format_class, input_record, - f'{block_count}_one_read_rand', - median_attr(read_counters, 'cpu_seconds'), - format_list, - waveform_list, test_list, - result_list) - print_results("close ", "rand channel", close_counters, close_mem_usages, block_count, block_length, mem_profile = mem_profile) - if format_list: - format_list, waveform_list, test_list, result_list = append_result(format_class, input_record, - f'{block_count}_one_close_rand', - median_attr(close_counters, 'cpu_seconds'), - format_list, - waveform_list, test_list, - result_list) - - - for block_length, block_count in TEST_BLOCK_LENGTHS: - # time1 = time.time() - open_counters, open_mem_usages, read_counters, read_mem_usages, close_counters, close_mem_usages = \ - _run_read_test_opened_rand_channel(fmt, path, total_length, all_channels, block_length, block_count, - test_min_dur = TEST_MIN_DURATION, test_min_iter = TEST_MIN_ITERATIONS, - rand_channel = False, test1ch = True, mem_profile = mem_profile) + counters, mem_usages = _run_read_test(fmt, path, total_length, all_channels, + block_length, block_count, + test_min_dur=TEST_MIN_DURATION, + test_min_iter=TEST_MIN_ITERATIONS, + test1=False, mem_profile=mem_profile) # walltime = time.time() - time1 - - print_results("open ", "one channel", open_counters, open_mem_usages, block_count, block_length, mem_profile = mem_profile) - - if format_list is not None: - # Append open time result - format_list, waveform_list, test_list, result_list = append_result(format_class, input_record, - f'{block_count}_one_open_fixed', - median_attr(open_counters, 'cpu_seconds'), - format_list, - waveform_list, test_list, - result_list) - - print_results("read ", "one channel", read_counters, read_mem_usages, block_count, block_length, mem_profile = mem_profile) - if format_list is not None: - # Append read time result - format_list, waveform_list, test_list, result_list = append_result(format_class, input_record, - f'{block_count}_one_read_fixed', - median_attr(read_counters, 'cpu_seconds'), - format_list, - waveform_list, test_list, - result_list) - - print_results("close ", "one channel", close_counters, close_mem_usages, block_count, block_length, mem_profile = mem_profile) + print_results("", "all channels", counters, mem_usages, block_count, block_length, + mem_profile=mem_profile) - if format_list is not None: - # Append close time result - format_list, waveform_list, test_list, result_list = append_result(format_class, input_record, - f'{block_count}_one_close_fixed', - median_attr(close_counters, 'cpu_seconds'), - format_list, - waveform_list, test_list, - result_list) - - print('_' * 64) - print('Read performance (median of N trials):') - print_header(mem_profile=mem_profile, with_ops=False) - - for block_length, block_count in TEST_BLOCK_LENGTHS: - - # time1 = time.time() - counters, mem_usages = _run_read_test(fmt, path, total_length, all_channels, - block_length, block_count, - test_min_dur = TEST_MIN_DURATION, - test_min_iter = TEST_MIN_ITERATIONS, - test1 = False, mem_profile = mem_profile) - # walltime = time.time() - time1 - - print_results("", "all channels", counters, mem_usages, block_count, block_length, mem_profile = mem_profile) - if format_list is not None: # Append read time result format_list, waveform_list, test_list, result_list = append_result(format_class, input_record, - f'{block_count}_all', - median_attr(counters, 'cpu_seconds'), - format_list, - waveform_list, test_list, - result_list) + f'{block_count}_all', + median_attr(counters, + 'cpu_seconds'), + format_list, + waveform_list, test_list, + result_list) for block_length, block_count in TEST_BLOCK_LENGTHS: - counters, mem_usages = _run_read_test(fmt, path, total_length, all_channels, - block_length, block_count, - test_min_dur = TEST_MIN_DURATION, - test_min_iter = TEST_MIN_ITERATIONS, - test1=True, mem_profile = mem_profile) + counters, mem_usages = _run_read_test(fmt, path, total_length, all_channels, + block_length, block_count, + test_min_dur=TEST_MIN_DURATION, + test_min_iter=TEST_MIN_ITERATIONS, + test1=True, mem_profile=mem_profile) # walltime = time.time() - time1 - - print_results("", "one channel", counters, mem_usages, block_count, block_length, mem_profile = mem_profile) - + + print_results("", "one channel", counters, mem_usages, block_count, block_length, + mem_profile=mem_profile) + if format_list: format_list, waveform_list, test_list, result_list = append_result(format_class, input_record, - f'{block_count}_one', - median_attr(counters, 'cpu_seconds'), - format_list, - waveform_list, test_list, - result_list) + f'{block_count}_one', + median_attr(counters, + 'cpu_seconds'), + format_list, + waveform_list, test_list, + result_list) print('_' * 64) diff --git a/waveform_benchmark/formats/base.py b/waveform_benchmark/formats/base.py index 18454ec..e8bd2f6 100644 --- a/waveform_benchmark/formats/base.py +++ b/waveform_benchmark/formats/base.py @@ -27,16 +27,13 @@ def read_waveforms(self, path: str, start_time: float, end_time: float, # kwargs is a dictionary that can be used to pass additional arguments to the format # replaces the total_length, block_length, and block_size params which are either test specific or intrinsic to the format. - @abc.abstractmethod def open_waveforms(self, path: str, signal_names:list, **kwargs): raise NotImplementedError - @abc.abstractmethod def read_opened_waveforms(self, opened_files: dict, start_time: float, end_time: float, signal_names: list): raise NotImplementedError - @abc.abstractmethod def close_waveforms(self, opened_files: dict): raise NotImplementedError