-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdsp.py
More file actions
77 lines (63 loc) · 2.51 KB
/
dsp.py
File metadata and controls
77 lines (63 loc) · 2.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# ----------------------------------------------------
# Electromagnetic Mining Array (EMMA)
# Copyright 2017, Pieter Robyns
# ----------------------------------------------------
import numpy as np
import matplotlib.pyplot as plt
from debug import DEBUG
from scipy import signal
from sklearn.decomposition import PCA
def normalize(trace):
"""
Z-score normalize trace
"""
mean = np.mean(trace)
std = np.std(trace)
if std == 0:
raise ValueError
return (trace - mean) / std
def normalize_p2p(trace):
return (trace - trace.min(0)) / trace.ptp(0)
def butter_filter(trace, order=1, cutoff=0.01, filter_type='low'):
"""
Apply butter filter to trace
"""
b, a = signal.butter(order, cutoff, filter_type)
trace_filtered = signal.filtfilt(b, a, trace)
return trace_filtered
def align(trace, reference, cutoff=0.01, order=1, prefilter=False):
"""
Determine their offset using cross-correlation. This offset is then used to
align the original signals.
"""
# Preprocess
try:
trace = np.array(trace)
reference = np.array(reference)
if prefilter:
processed_trace = butter_filter(trace, order=order, cutoff=cutoff)
processed_reference = butter_filter(reference, order=order, cutoff=cutoff)
processed_trace = normalize_p2p(processed_trace) # normalize() seems to work pretty well too
processed_reference = normalize_p2p(processed_reference)
else:
processed_trace = normalize_p2p(trace) # normalize() seems to work pretty well too
processed_reference = normalize_p2p(reference)
except ValueError: # Something is wrong with the signal
return None
# Correlated processed traces to determine lag
result = signal.correlate(processed_trace, processed_reference, mode='valid')
lag = np.argmax(result)
# Align the original trace based on this calculation
aligned_trace = trace[lag:]
# Vertical align as well TODO add as new separate op?
#bias = np.mean(aligned_trace)
#aligned_trace -= bias
#DEBUG = True
if DEBUG:
plt.plot(range(0, len(processed_reference)), processed_reference, label="Normalized reference")
plt.plot(range(0, len(processed_trace)), processed_trace, label="Normalized trace")
plt.plot(range(0, len(result)), result, label="Correlation")
#plt.plot(range(0, len(aligned_trace)), aligned_trace, label="Aligned trace")
plt.legend()
plt.show()
return aligned_trace