diff --git a/packs/core/core_utils.py b/packs/core/core_utils.py index 4bb9ea2..3ac1767 100644 --- a/packs/core/core_utils.py +++ b/packs/core/core_utils.py @@ -12,3 +12,19 @@ def check_test(file): return True else: return False + + +# THIS SHOULD BE MOVED ELSEWHERE +class MalformedHeaderError(Exception): + ''' + Exception created for when two headers don't match up consecutively. + Created initially for WD1 processing, but should be back-ported for WD2 + ''' + + def __init__(self, header1, header2): + self.header1 = header1 + self.header2 = header2 + + def __str__(self): + return f"MalformedHeaderError: Headers don't output expected result. Ensure the .dat file provided is formatted correctly.\nFirst Header {self.header1}\nSecond Header {self.header2}" + diff --git a/packs/core/io.py b/packs/core/io.py index 99dbe3e..e69008b 100644 --- a/packs/core/io.py +++ b/packs/core/io.py @@ -1,7 +1,7 @@ import os import pandas as pd -import numpy as np +import numpy as np import h5py import ast @@ -98,7 +98,7 @@ def read_config_file(file_path : str) -> dict: if not os.path.exists(file_path): raise FileNotFoundError(2, 'No such config file', file_path) - + # read in arguments, require the required ones config.read(file_path) @@ -127,12 +127,12 @@ def writer(path : str, path (str) : File path group (str) : Group within the h5 file overwrite(bool) : Boolean for overwriting previous dataset (OPTIONAL) - + Returns ------- write (func) : write function described in write() - - + + Fixed size is for when you know the size of the output file, so you set the size of the df beforehand, saving precious IO operation. The input then becomes a tuple of (True, DF_SIZE, INDEX), otherwise its false. @@ -153,10 +153,10 @@ def write(dataset : str, fixed_size : Optional[Union[False, Tuple[True, int, int]]] = False) -> None: ''' Writes ndarray to dataset within group defined in writer(). - Fixed size used to speed up writing, if True will - create a dataset of a fixed size rather than + Fixed size used to speed up writing, if True will + create a dataset of a fixed size rather than increasing the size iteratively. - + Parameters ---------- dataset (str) : Dataset name to write to @@ -164,7 +164,7 @@ def write(dataset : str, fixed_size (Union[Bool, Tuple[Bool, int, int]]) : Method that's either enable or disabled. False (disabled) -> Iteratively increases size of dataframe at runtime - True (enabled) -> Requires Tuple containing + True (enabled) -> Requires Tuple containing (True, number of events, index to write to) This method is best seen in action in `process_bin_WD1()`. * Data should be in a numpy structured array format, as can be seen in WD1 and WD2 processing @@ -203,7 +203,7 @@ def reader(path : str, dataset : str) -> Generator: ''' A lazy h5 reader that will iteratively read from a dataset, with the formatting: - + FILE.H5 -> GROUP/DATASET Parameters ---------- @@ -220,4 +220,4 @@ def reader(path : str, dset = gr[dataset] for row in dset: - yield row \ No newline at end of file + yield row diff --git a/packs/proc/proc.py b/packs/proc/proc.py index ec95240..6f4520c 100644 --- a/packs/proc/proc.py +++ b/packs/proc/proc.py @@ -2,6 +2,7 @@ from packs.core.io import read_config_file from packs.proc.processing_utils import process_bin_WD2 +from packs.proc.processing_utils import process_bin_WD1 from packs.core.core_utils import check_test def proc(config_file): @@ -22,6 +23,8 @@ def proc(config_file): case 'decode': if conf_dict['wavedump_edition'] == 2: process_bin_WD2(**arg_dict) + elif conf_dict['wavedump_edition'] == 1: + process_bin_WD1(**arg_dict) else: raise RuntimeError(f"wavedump edition {conf_dict['wavedump_edition']} decoding isn't currently implemented.") case default: diff --git a/packs/proc/processing_utils.py b/packs/proc/processing_utils.py index 7102b49..e54bc4e 100644 --- a/packs/proc/processing_utils.py +++ b/packs/proc/processing_utils.py @@ -14,7 +14,9 @@ # imports start from MULE/ from packs.core.core_utils import flatten -from packs.types import types +from packs.core.core_utils import MalformedHeaderError +from packs.core.io import writer +from packs.types import types """ Processing utilities @@ -369,13 +371,130 @@ def check_save_path(save_path : str, return save_path +def process_event_lazy_WD1(file_object : BinaryIO, + sample_size : int): + + ''' + WAVEDUMP 1: Generator that outputs each event iteratively from an opened binary file + Parameters + ---------- + file_object (obj) : Opened file object + sample_size (int) : Time difference between each sample in waveform (2ns for V1730B digitiser) + Returns + ------- + data (generator) : Generator object containing one event's worth of data + across each event + ''' + + # read first header + header = np.fromfile(file_object, dtype = 'i', count = 6) + + # header to check against + sanity_header = header.copy() + + # continue only if data exists + while len(header) > 0: + + # alter header to match expected size + header[0] = header[0] - 24 + event_size = header[0] // sample_size + + # collect waveform, no of samples and timestamp + yield (np.fromfile(file_object, dtype = np.dtype(' sanity_header[5] # timestamp increases + ]): + sanity_header = None + else: + raise MalformedHeaderError(sanity_header, header) + else: + raise MalformedHeaderError(sanity_header, header) + print("Processing Finished!") + + +def process_bin_WD1(file_path : str, + save_path : str, + sample_size : int, + overwrite : Optional[bool] = False, + print_mod : Optional[int] = -1): + + ''' + WAVEDUMP 1: Takes a binary file and outputs the containing information in a h5 file. + This only works for individual channels at the moment, as wavedump 1 saves each channel + as a separate file. + For particularly large waveforms/number of events. You can 'chunk' the data such that + each dataset holds `counts` events. + # Makeup of the header (header[n]) where n is: + # 0 - event size (ns in our case, with extra 24 samples) + # 1 - board ID + # 2 - pattern (not sure exactly what this means) + # 3 - board channel + # 4 - event counter + # 5 - Time-tag for the trigger + # Each of which is a signed 4byte integer + Parameters + ---------- + file_path (str) : Path to binary file + save_path (str) : Path to saved file + sample_size (int) : Size of each sample in an event (2 ns in the case of V1730B digitiser) + overwrite (bool) : Boolean for overwriting pre-existing files + print_mod (int) : Readout frequency for number of events, -1 implies no readout + Returns + ------- + None + ''' + + + # lets build it here first and break it up later + # destroy the group within the file if you're overwriting + save_path = check_save_path(save_path, overwrite) + print(save_path) + + + # open file for reading + with open(file_path, 'rb') as file: + + # open writer object + with writer(save_path, 'RAW', overwrite) as write: + + for i, (waveform, samples, timestamp) in enumerate(process_event_lazy_WD1(file, sample_size)): + + if (i % print_mod == 0) and (print_mod != -1): + print(f"Event {i}") + + # enforce stucture upon data + e_dtype = types.event_info_type + wf_dtype = types.rwf_type_WD1(samples) + + event_info = np.array((i, timestamp, samples, sample_size, 1), dtype = e_dtype) + waveforms = np.array((i, 0, waveform), dtype = wf_dtype) + + # first run-through, collect the header information to extract table size + if i == 0: + file_size = os.path.getsize(file_path) + waveform_size = (samples * 2) + (4*6) + num_of_events = int(file_size / waveform_size) + + # add data to df lazily + write('event_info', event_info, (True, num_of_events, i)) + write('rwf', waveforms, (True, num_of_events, i)) + + def process_bin_WD2(file_path : str, save_path : str, overwrite : Optional[bool] = False, counts : Optional[int] = -1): ''' - Takes a binary file and outputs the containing waveform information in a h5 file. + WAVEDUMP 2: Takes a binary file and outputs the containing waveform information in a h5 file. For particularly large waveforms/number of events. You can 'chunk' the data such that each dataset holds `counts` events. diff --git a/packs/tests/data/configs/process_WD1.conf b/packs/tests/data/configs/process_WD1.conf new file mode 100644 index 0000000..5a8fc25 --- /dev/null +++ b/packs/tests/data/configs/process_WD1.conf @@ -0,0 +1,11 @@ +[required] + +process = 'decode' +wavedump_edition = 1 +file_path = '/path/to/file.bin' +save_path = '/path/to/file.h5' + +[optional] + +overwrite = True +print_mod = 100 diff --git a/packs/tests/data/configs/process_WD1_1channel.conf b/packs/tests/data/configs/process_WD1_1channel.conf new file mode 100644 index 0000000..17f33a6 --- /dev/null +++ b/packs/tests/data/configs/process_WD1_1channel.conf @@ -0,0 +1,11 @@ +[required] +process = 'decode' +wavedump_edition = 1 +file_path = '/home/casper/Documents/MULE/packs/tests/data/one_channel_WD1.dat' +save_path = '/home/casper/Documents/MULE/packs/tests/data/one_channel_WD1_tmp.h5' +sample_size = 2 + +[optional] +overwrite = True +print_mod = 100 + diff --git a/packs/tests/data/malformed_short_header.bin b/packs/tests/data/malformed_short_header.bin new file mode 100644 index 0000000..29753ae Binary files /dev/null and b/packs/tests/data/malformed_short_header.bin differ diff --git a/packs/tests/data/one_channel_WD1.dat b/packs/tests/data/one_channel_WD1.dat new file mode 100644 index 0000000..2e2fd2a Binary files /dev/null and b/packs/tests/data/one_channel_WD1.dat differ diff --git a/packs/tests/data/one_channel_WD1.h5 b/packs/tests/data/one_channel_WD1.h5 new file mode 100644 index 0000000..19f4b5a Binary files /dev/null and b/packs/tests/data/one_channel_WD1.h5 differ diff --git a/packs/tests/processing_test.py b/packs/tests/processing_test.py index 57af0db..72ec392 100644 --- a/packs/tests/processing_test.py +++ b/packs/tests/processing_test.py @@ -1,3 +1,4 @@ +import os import sys import numpy as np @@ -11,6 +12,8 @@ from pytest import warns from pytest import fixture +from packs.proc.processing_utils import process_event_lazy_WD1 +from packs.proc.processing_utils import process_bin_WD1 from packs.proc.processing_utils import read_defaults_WD2 from packs.proc.processing_utils import process_header from packs.proc.processing_utils import read_binary @@ -22,8 +25,11 @@ from packs.types.types import rwf_type from packs.types.types import event_info_type +from packs.core.core_utils import MalformedHeaderError + from packs.core.io import load_rwf_info from packs.core.io import load_evt_info +from packs.core.io import reader from packs.types import types from hypothesis import given @@ -181,3 +187,64 @@ def test_decode_produces_expected_output(config, inpt, output, comparison, MULE_ assert load_evt_info(save_path).equals(load_evt_info(comparison_path)) assert load_rwf_info(save_path, samples).equals(load_rwf_info(comparison_path, samples)) + +@mark.parametrize("config, inpt, output, comparison", [("process_WD1_1channel.conf", "one_channel_WD1.dat", "one_channel_WD1_tmp.h5", "one_channel_WD1.h5")]) +def test_WD1_decode_produces_expected_output(config, inpt, output, comparison, MULE_dir, data_dir, tmp_path): + ''' + This test will be merged with test_decode_produces_expected_output() + once WD2 processing has been updated to match lazy method of WD1 + ''' + + # ensure path is correct + file_path = data_dir + inpt + save_path = tmp_path / output # PosixPaths behave differently + comparison_path = data_dir + comparison + config_path = data_dir + "configs/" + config + + # rewrite paths to files + cnfg = configparser.ConfigParser() + cnfg.read(config_path) + cnfg.set('required', 'file_path', "'" + file_path + "'") # need to add comments around for config reasons + cnfg.set('required', 'save_path', f"'{save_path}'") # PosixPaths behave differently + + with open(config_path, 'w') as cfgfile: + cnfg.write(cfgfile) + + # run processing pack decode + run_pack = ['python3', MULE_dir + "/bin/mule", "proc", config_path] + subprocess.run(run_pack) + + # the event info can be read out like a normal h5, the RWF cannot due to how they're structured + assert pd.read_hdf(save_path, 'RAW/event_info').equals(pd.read_hdf(comparison_path, 'RAW/event_info')) + assert [x for x in reader(save_path, 'RAW', 'rwf')] == [x for x in reader(comparison_path, 'RAW', 'rwf')] + + +def test_lazy_loading_malformed_data_WD1(MULE_dir): + ''' + Test that a file you pass through with no appropriate header is flagged if it's + not functioning correctly. + ATM the check for this is: + - event number goes up +1 events + - number of samples stays the same across two events + - timestamp increases between events + These may not always hold, but will ensure the test works as expected + ''' + + data_path = MULE_dir + "/packs/tests/data/malformed_data.bin" + + with raises(MalformedHeaderError): + with open(data_path, 'rb') as file: + a = process_event_lazy_WD1(file, sample_size = 2) + next(a) + next(a) + +def test_lazy_loading_short_header_WD1(MULE_dir): + ''' + Test a file that contains only 4 components in its header, + should return a MalformedHeaderError + ''' + + data_path = MULE_dir + "/packs/tests/data/malformed_short_header.bin" + with open(data_path, 'rb') as file: + a = process_event_lazy_WD1(file, sample_size = 2) + next(a) diff --git a/packs/types/types.py b/packs/types/types.py index 4722041..d859318 100644 --- a/packs/types/types.py +++ b/packs/types/types.py @@ -31,6 +31,15 @@ def rwf_type(samples : int) -> np.dtype: ('rwf', np.float32, (samples,)) ]) +def rwf_type_WD1(samples : int) -> np.dtype: + ''' + WAVEDUMP 1: Generates the data-type for raw waveforms + ''' + + return np.dtype([('event_number', int), + ('channels', int), + ('rwf', np.uint16, (samples))]) + def generate_wfdtype(channels, samples): '''