Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions packs/core/core_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,19 @@ def check_test(file):
return True
else:
return False


# THIS SHOULD BE MOVED ELSEWHERE
class MalformedHeaderError(Exception):
'''
Exception created for when two headers don't match up consecutively.
Created initially for WD1 processing, but should be back-ported for WD2
'''

def __init__(self, header1, header2):
self.header1 = header1
self.header2 = header2

def __str__(self):
return f"MalformedHeaderError: Headers don't output expected result. Ensure the .dat file provided is formatted correctly.\nFirst Header {self.header1}\nSecond Header {self.header2}"

22 changes: 11 additions & 11 deletions packs/core/io.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os

import pandas as pd
import numpy as np
import numpy as np

import h5py
import ast
Expand Down Expand Up @@ -98,7 +98,7 @@ def read_config_file(file_path : str) -> dict:

if not os.path.exists(file_path):
raise FileNotFoundError(2, 'No such config file', file_path)


# read in arguments, require the required ones
config.read(file_path)
Expand Down Expand Up @@ -127,12 +127,12 @@ def writer(path : str,
path (str) : File path
group (str) : Group within the h5 file
overwrite(bool) : Boolean for overwriting previous dataset (OPTIONAL)

Returns
-------
write (func) : write function described in write()


Fixed size is for when you know the size of the output file, so you set the size
of the df beforehand, saving precious IO operation. The input then becomes a tuple
of (True, DF_SIZE, INDEX), otherwise its false.
Expand All @@ -153,18 +153,18 @@ def write(dataset : str,
fixed_size : Optional[Union[False, Tuple[True, int, int]]] = False) -> None:
'''
Writes ndarray to dataset within group defined in writer().
Fixed size used to speed up writing, if True will
create a dataset of a fixed size rather than
Fixed size used to speed up writing, if True will
create a dataset of a fixed size rather than
increasing the size iteratively.

Parameters
----------
dataset (str) : Dataset name to write to
data (ndarray) : Data to write*
fixed_size (Union[Bool, Tuple[Bool, int, int]])
: Method that's either enable or disabled.
False (disabled) -> Iteratively increases size of dataframe at runtime
True (enabled) -> Requires Tuple containing
True (enabled) -> Requires Tuple containing
(True, number of events, index to write to)
This method is best seen in action in `process_bin_WD1()`.
* Data should be in a numpy structured array format, as can be seen in WD1 and WD2 processing
Expand Down Expand Up @@ -203,7 +203,7 @@ def reader(path : str,
dataset : str) -> Generator:
'''
A lazy h5 reader that will iteratively read from a dataset, with the formatting:

FILE.H5 -> GROUP/DATASET
Parameters
----------
Expand All @@ -220,4 +220,4 @@ def reader(path : str,
dset = gr[dataset]

for row in dset:
yield row
yield row
3 changes: 3 additions & 0 deletions packs/proc/proc.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from packs.core.io import read_config_file
from packs.proc.processing_utils import process_bin_WD2
from packs.proc.processing_utils import process_bin_WD1
from packs.core.core_utils import check_test

def proc(config_file):
Expand All @@ -22,6 +23,8 @@ def proc(config_file):
case 'decode':
if conf_dict['wavedump_edition'] == 2:
process_bin_WD2(**arg_dict)
elif conf_dict['wavedump_edition'] == 1:
process_bin_WD1(**arg_dict)
Comment thread
jwaiton marked this conversation as resolved.
else:
raise RuntimeError(f"wavedump edition {conf_dict['wavedump_edition']} decoding isn't currently implemented.")
case default:
Expand Down
123 changes: 121 additions & 2 deletions packs/proc/processing_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

# imports start from MULE/
from packs.core.core_utils import flatten
from packs.types import types
from packs.core.core_utils import MalformedHeaderError
from packs.core.io import writer
from packs.types import types

"""
Processing utilities
Expand Down Expand Up @@ -369,13 +371,130 @@ def check_save_path(save_path : str,
return save_path


def process_event_lazy_WD1(file_object : BinaryIO,
sample_size : int):

'''
WAVEDUMP 1: Generator that outputs each event iteratively from an opened binary file
Parameters
----------
file_object (obj) : Opened file object
sample_size (int) : Time difference between each sample in waveform (2ns for V1730B digitiser)
Returns
-------
data (generator) : Generator object containing one event's worth of data
across each event
'''

# read first header
header = np.fromfile(file_object, dtype = 'i', count = 6)

Comment thread
jwaiton marked this conversation as resolved.
# header to check against
sanity_header = header.copy()

# continue only if data exists
Comment thread
jwaiton marked this conversation as resolved.
while len(header) > 0:

# alter header to match expected size
header[0] = header[0] - 24
event_size = header[0] // sample_size

# collect waveform, no of samples and timestamp
yield (np.fromfile(file_object, dtype = np.dtype('<H'), count = event_size), event_size, header[-1])

# collect next header
header = np.fromfile(file_object, dtype = 'i', count = 6)

# check if header has correct number of elements and correct information ONCE.
if sanity_header is not None:
Comment thread
jwaiton marked this conversation as resolved.
if len(header) == 6:
if all([header[0] == sanity_header[0], # event size
header[4] == sanity_header[4] + 1, # event number +1
header[5] > sanity_header[5] # timestamp increases
]):
sanity_header = None
else:
raise MalformedHeaderError(sanity_header, header)
else:
raise MalformedHeaderError(sanity_header, header)
Comment thread
jwaiton marked this conversation as resolved.
print("Processing Finished!")


def process_bin_WD1(file_path : str,
save_path : str,
sample_size : int,
overwrite : Optional[bool] = False,
print_mod : Optional[int] = -1):

'''
WAVEDUMP 1: Takes a binary file and outputs the containing information in a h5 file.
This only works for individual channels at the moment, as wavedump 1 saves each channel
as a separate file.
For particularly large waveforms/number of events. You can 'chunk' the data such that
each dataset holds `counts` events.
# Makeup of the header (header[n]) where n is:
# 0 - event size (ns in our case, with extra 24 samples)
# 1 - board ID
# 2 - pattern (not sure exactly what this means)
# 3 - board channel
# 4 - event counter
# 5 - Time-tag for the trigger
# Each of which is a signed 4byte integer
Parameters
----------
file_path (str) : Path to binary file
save_path (str) : Path to saved file
sample_size (int) : Size of each sample in an event (2 ns in the case of V1730B digitiser)
overwrite (bool) : Boolean for overwriting pre-existing files
print_mod (int) : Readout frequency for number of events, -1 implies no readout
Returns
-------
None
'''


# lets build it here first and break it up later
# destroy the group within the file if you're overwriting
save_path = check_save_path(save_path, overwrite)
print(save_path)
Comment thread
bpalmeiro marked this conversation as resolved.


# open file for reading
with open(file_path, 'rb') as file:

# open writer object
with writer(save_path, 'RAW', overwrite) as write:

for i, (waveform, samples, timestamp) in enumerate(process_event_lazy_WD1(file, sample_size)):

if (i % print_mod == 0) and (print_mod != -1):
print(f"Event {i}")
Comment thread
bpalmeiro marked this conversation as resolved.

# enforce stucture upon data
e_dtype = types.event_info_type
wf_dtype = types.rwf_type_WD1(samples)

event_info = np.array((i, timestamp, samples, sample_size, 1), dtype = e_dtype)
Comment thread
jwaiton marked this conversation as resolved.
waveforms = np.array((i, 0, waveform), dtype = wf_dtype)

# first run-through, collect the header information to extract table size
if i == 0:
file_size = os.path.getsize(file_path)
waveform_size = (samples * 2) + (4*6)
num_of_events = int(file_size / waveform_size)

# add data to df lazily
write('event_info', event_info, (True, num_of_events, i))
write('rwf', waveforms, (True, num_of_events, i))


def process_bin_WD2(file_path : str,
save_path : str,
overwrite : Optional[bool] = False,
counts : Optional[int] = -1):

'''
Takes a binary file and outputs the containing waveform information in a h5 file.
WAVEDUMP 2: Takes a binary file and outputs the containing waveform information in a h5 file.

For particularly large waveforms/number of events. You can 'chunk' the data such that
each dataset holds `counts` events.
Expand Down
11 changes: 11 additions & 0 deletions packs/tests/data/configs/process_WD1.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[required]

process = 'decode'
wavedump_edition = 1
file_path = '/path/to/file.bin'
save_path = '/path/to/file.h5'

[optional]

overwrite = True
print_mod = 100
11 changes: 11 additions & 0 deletions packs/tests/data/configs/process_WD1_1channel.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[required]
process = 'decode'
wavedump_edition = 1
file_path = '/home/casper/Documents/MULE/packs/tests/data/one_channel_WD1.dat'
save_path = '/home/casper/Documents/MULE/packs/tests/data/one_channel_WD1_tmp.h5'
Comment thread
bpalmeiro marked this conversation as resolved.
sample_size = 2

[optional]
overwrite = True
print_mod = 100

Binary file added packs/tests/data/malformed_short_header.bin
Binary file not shown.
Binary file added packs/tests/data/one_channel_WD1.dat
Binary file not shown.
Binary file added packs/tests/data/one_channel_WD1.h5
Binary file not shown.
67 changes: 67 additions & 0 deletions packs/tests/processing_test.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
import sys

import numpy as np
Expand All @@ -11,6 +12,8 @@
from pytest import warns
from pytest import fixture

from packs.proc.processing_utils import process_event_lazy_WD1
from packs.proc.processing_utils import process_bin_WD1
from packs.proc.processing_utils import read_defaults_WD2
from packs.proc.processing_utils import process_header
from packs.proc.processing_utils import read_binary
Expand All @@ -22,8 +25,11 @@
from packs.types.types import rwf_type
from packs.types.types import event_info_type

from packs.core.core_utils import MalformedHeaderError

from packs.core.io import load_rwf_info
from packs.core.io import load_evt_info
from packs.core.io import reader

from packs.types import types
from hypothesis import given
Expand Down Expand Up @@ -181,3 +187,64 @@ def test_decode_produces_expected_output(config, inpt, output, comparison, MULE_
assert load_evt_info(save_path).equals(load_evt_info(comparison_path))
assert load_rwf_info(save_path, samples).equals(load_rwf_info(comparison_path, samples))


@mark.parametrize("config, inpt, output, comparison", [("process_WD1_1channel.conf", "one_channel_WD1.dat", "one_channel_WD1_tmp.h5", "one_channel_WD1.h5")])
def test_WD1_decode_produces_expected_output(config, inpt, output, comparison, MULE_dir, data_dir, tmp_path):
'''
This test will be merged with test_decode_produces_expected_output()
once WD2 processing has been updated to match lazy method of WD1
'''

# ensure path is correct
file_path = data_dir + inpt
save_path = tmp_path / output # PosixPaths behave differently
comparison_path = data_dir + comparison
config_path = data_dir + "configs/" + config

# rewrite paths to files
cnfg = configparser.ConfigParser()
cnfg.read(config_path)
cnfg.set('required', 'file_path', "'" + file_path + "'") # need to add comments around for config reasons
cnfg.set('required', 'save_path', f"'{save_path}'") # PosixPaths behave differently

with open(config_path, 'w') as cfgfile:
cnfg.write(cfgfile)

# run processing pack decode
run_pack = ['python3', MULE_dir + "/bin/mule", "proc", config_path]
subprocess.run(run_pack)

# the event info can be read out like a normal h5, the RWF cannot due to how they're structured
assert pd.read_hdf(save_path, 'RAW/event_info').equals(pd.read_hdf(comparison_path, 'RAW/event_info'))
assert [x for x in reader(save_path, 'RAW', 'rwf')] == [x for x in reader(comparison_path, 'RAW', 'rwf')]


def test_lazy_loading_malformed_data_WD1(MULE_dir):
'''
Test that a file you pass through with no appropriate header is flagged if it's
not functioning correctly.
ATM the check for this is:
- event number goes up +1 events
- number of samples stays the same across two events
- timestamp increases between events
These may not always hold, but will ensure the test works as expected
'''

data_path = MULE_dir + "/packs/tests/data/malformed_data.bin"

with raises(MalformedHeaderError):
with open(data_path, 'rb') as file:
a = process_event_lazy_WD1(file, sample_size = 2)
next(a)
next(a)

def test_lazy_loading_short_header_WD1(MULE_dir):
'''
Test a file that contains only 4 components in its header,
should return a MalformedHeaderError
'''

data_path = MULE_dir + "/packs/tests/data/malformed_short_header.bin"
with open(data_path, 'rb') as file:
a = process_event_lazy_WD1(file, sample_size = 2)
next(a)
9 changes: 9 additions & 0 deletions packs/types/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ def rwf_type(samples : int) -> np.dtype:
('rwf', np.float32, (samples,))
])

def rwf_type_WD1(samples : int) -> np.dtype:
'''
WAVEDUMP 1: Generates the data-type for raw waveforms
'''

return np.dtype([('event_number', int),
('channels', int),
('rwf', np.uint16, (samples))])


def generate_wfdtype(channels, samples):
'''
Expand Down