Skip to content

Commit c38f6b4

Browse files
committed
Added ecg filtering and hr detection example
1 parent a1e3608 commit c38f6b4

12 files changed

+5736
-1
lines changed

README.md

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,37 @@
11
# ECG-filtering-py
2-
Real-time ECG signal filtering simulation in python using FIR filters
2+
Real-time casual ECG signal filtering simulation in python using FIR filters.
3+
4+
# Dependencies
5+
6+
```
7+
pip install ecg_gudb_database
8+
pip3 install ecg_gudb_database
9+
```
10+
Note: Stated dependecies are only required for the test purposes.
11+
12+
# Description
13+
14+
ECG filtering using efficient finite impulse repsonse filter implementation. In this example the FIR filter has been designed to remove DC component and noise centred around 50Hz. To obtain impulse response of the filter in time domain, the response was modelled in frequency domain and IFFT coefficients were computed using numpy’s fft library. These contained small complex components so were filtered too out to obtain real valued coefficients.
15+
16+
![Filter's Frequency Response](images/FilterFrequencyResponse.png)
17+
18+
The first half of the coefficients of impulse are in positive time domain and the second half of coefficients are in negative time domain. The impulse response must be symmetrical in order to achieve linear phase; therefore, the elements were shifted. This was done by swapping the negative and positive coefficients (time-domain wise) around the middle element of the array which is half the number of taps. Then this impulse response was multiplied with Hanning window function was applited to smoothen the transition from the edges to the middle, ultimately reducing the ripples.
19+
20+
![Effect of Hanning Window on response in time-domain](images/WindowedResponse.png)
21+
22+
# Running Instructions
23+
24+
Run ecg_filter.py - Creates an FIR filter and applies it to an example ECG signal in real-time causal fashion and the creates the templates( for matched filter) for heart rate detector
25+
26+
Run hr_detect.py - Computes the heart rate from the ECG signal, to use the templated created by the ecg_filter use optional argument '--shortecg' otherwise the script will create use filtered Einthoven II recording slice as template.
27+
28+
29+
## Results
30+
31+
![Unfiltered(top) vs Filtered(bot) ECG Signal](images/ECGFiltering.png)
32+
33+
![Unfiltered(top) vs Filtered(bot) ECG Slice](images/FilteredECGSlice.png)
34+
35+
![Unfiltered ECG Signal Spectrum](images/UnfilteredECGspectrum.png)
36+
37+
![Filtered ECG Signal Spectrum](images/FilteredECGspectrum.png)

ecg_filter.py

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
Created on Wed Oct 21 09:08:58 2020
4+
5+
@author: Kamil
6+
"""
7+
8+
import numpy as np
9+
from matplotlib import pyplot as plt
10+
import fir_filter
11+
12+
def PlotWaveform(title, ycords):
13+
plt.figure()
14+
plt.title(title)
15+
plt.xlabel('Time')
16+
plt.ylabel('Amplitude')
17+
plt.plot(ycords)
18+
19+
20+
def PlotECGWaveforms(title, yAxisFirstGraph, yAxisSecondGraph):
21+
fig, axs = plt.subplots(2)
22+
fig.suptitle(title)
23+
axs[0].plot(yAxisFirstGraph)
24+
axs[1].plot(yAxisSecondGraph)
25+
26+
27+
""" Plot frequency vs amplitude graph using two sets of values """
28+
def PlotFrequency(title, samplingFrequency, fftCoefficients):
29+
# Scale x-axis to sampling frequency
30+
frequencyAxis = np.linspace(0, samplingFrequency, len(fftCoefficients) )
31+
32+
plt.figure()
33+
# Plot Frequency spectrum
34+
plt.plot( frequencyAxis, fftCoefficients)
35+
36+
# Set labels for the graph
37+
plt.title(title)
38+
plt.xlabel('Frequency [Hz]')
39+
plt.ylabel('Amplitude')
40+
41+
42+
def LoadSamples(filepath):
43+
# Load data into an array and return it
44+
return np.loadtxt( filepath )
45+
46+
47+
def GenerateFIRCoefficientsBandStop(frequency1, frequency2, samplingFrequency, nTaps):
48+
# Normalise Frequencies and sclae to n taps
49+
k1 = int( (frequency1 / samplingFrequency ) * nTaps)
50+
k2 = int( (frequency2 / samplingFrequency ) * nTaps)
51+
52+
# Define fft coefficients array values of 1
53+
fftCoefficients = np.ones(nTaps)
54+
55+
# Fitler unwanted frequencies by setting coeffients to zero at appropriate index values
56+
# 50Hz Hum
57+
fftCoefficients[ k1 : k2+1 ] = 0
58+
fftCoefficients[ nTaps-k2 : nTaps-k1 + 1 ] = 0
59+
# DC
60+
DCResolution = 2
61+
fftCoefficients[0 : DCResolution] = 0
62+
fftCoefficients[( (nTaps-1) - DCResolution ) : nTaps] = 0
63+
64+
# Plot the frequency response of the filter
65+
PlotFrequency("FFT Coefficients - Filter", samplingFrequency, fftCoefficients)
66+
67+
# Perform inverse Fourier transform
68+
ifftCoefficients = np.fft.ifft(fftCoefficients)
69+
ifftCoefficients = np.real(ifftCoefficients)
70+
71+
# Swap the +ve and -ve time around M/2
72+
impulseResponse = np.zeros(nTaps)
73+
impulseResponse[ 0 : int(nTaps/2) ] = ifftCoefficients[ int(nTaps/2) : nTaps ]
74+
impulseResponse[ int(nTaps/2) : nTaps ] = ifftCoefficients[ 0 : int(nTaps/2) ]
75+
76+
# Create window function and multiply the coffecients by function to improve filter's performance
77+
filterCoefficients = impulseResponse * np.hanning(nTaps)
78+
79+
# Plot Impulse Response & Windowed Impulse Response
80+
PlotECGWaveforms('Filter Impulse Response(Bottom *Windowed', impulseResponse, filterCoefficients)
81+
82+
return filterCoefficients
83+
84+
if __name__ == "__main__":
85+
86+
# Load Samples form .dat file and define sampling parameters
87+
samples = LoadSamples("shortecg.dat")
88+
samplingFrequency = 250
89+
nSamples = len(samples)
90+
filteredSmaples = np.zeros(nSamples)
91+
92+
# Calculate FFT coefficients of ECG signal
93+
ecgSpectrum = np.fft.fft(samples)
94+
ecgSpectrum = abs(ecgSpectrum)
95+
# Display frequency spectrum of unfiltered ECG Signal
96+
PlotFrequency('Unfiltered ECG - Frequency Spectrum', samplingFrequency, ecgSpectrum)
97+
98+
# Calculate FIR filter coefficients
99+
firCoefficients = GenerateFIRCoefficientsBandStop(45,55, samplingFrequency, (2*samplingFrequency))
100+
101+
# Initialize FIR object
102+
fir = fir_filter.FIR_filter(firCoefficients)
103+
104+
# Simulate causal system by filtering signal sample by sample
105+
for x in range(nSamples):
106+
filteredSmaples[x] = fir.dofilter(samples[x])
107+
108+
# Plot original & filtered Signal
109+
PlotECGWaveforms('ECG Signal (Bottom is Filtered)', samples, filteredSmaples)
110+
111+
# Calculate FFT coefficients of filtered ECG signal
112+
ecgSpectrum = np.fft.fft(filteredSmaples)
113+
ecgSpectrum = abs(ecgSpectrum)
114+
PlotFrequency('Filtered ECG - Frequency Spectrum', samplingFrequency, ecgSpectrum)
115+
116+
# Take a slice of ecg to compare
117+
unfilteredSlice = samples[860:1005]
118+
template = filteredSmaples[860:1005]
119+
PlotECGWaveforms('ECG Slice (Bottom is Filtered)', unfilteredSlice, template)
120+
121+
# Display the graphs
122+
plt.show()
123+
124+
# Create a Template for hr_detect.py
125+
# Scale to max value
126+
scailing = max(filteredSmaples)
127+
filteredSmaples *= (1/scailing)
128+
# Flip the slice
129+
template = np.flip(template)
130+
# Display Template
131+
PlotWaveform('Template for hr_detect.py',template)
132+
np.savetxt('template.txt', np.c_[template])
133+

fir_filter.py

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
Created on Wed Oct 21 09:10:11 2020
4+
5+
@author: Kamil
6+
"""
7+
8+
import numpy as np
9+
10+
class FIR_filter:
11+
12+
def __init__( self, _coefficients ):
13+
self.coeffFIR = _coefficients
14+
self.nTaps = len(_coefficients)
15+
self.ringbuffer = np.zeros(self.nTaps)
16+
self.ringBufferOffset = 0
17+
18+
def dofilter( self, inputValue ):
19+
# Store new value at the offset
20+
self.ringbuffer[self.ringBufferOffset] = inputValue
21+
22+
# Set offset variables
23+
offset = self.ringBufferOffset
24+
coeffOffset = 0
25+
26+
# Initialize output to zero
27+
output = 0
28+
29+
# Multiply values with coefficients until it reaches the beginning of the ring buffer
30+
while( offset >= 0 ):
31+
# Calculate tap value and add it to a sum
32+
output += self.ringbuffer[offset] * self.coeffFIR[coeffOffset]
33+
# Move offsets
34+
offset -= 1
35+
coeffOffset += 1
36+
37+
# Set the offset to end of the array
38+
offset = self.nTaps - 1
39+
40+
# Multiply coefficients until it reaches the start of the ring buffer
41+
while( self.ringBufferOffset < offset ):
42+
# Calculate tap value and add it to a sum
43+
output += self.ringbuffer[offset] * self.coeffFIR[coeffOffset]
44+
# Move offsets
45+
offset -= 1
46+
coeffOffset += 1
47+
48+
# Check if the next inputValue would be placed outside of the boundary of ring buffer and prevent this by wraping to the index of first element
49+
if( (offset + 1) >= self.nTaps ):
50+
self.ringBufferOffset = 0
51+
# The next offset value doesn't exceed the boundary
52+
else:
53+
self.ringBufferOffset += 1
54+
55+
return output
56+
57+
58+
def ResetFilter( self ):
59+
# Reset the current offset and clear ringbuffer
60+
self.ringBufferOffset = 0
61+
self.ringbuffer = np.zeros(self.nTaps)
62+
63+
64+
def unittest():
65+
# Define test sample vector, enough elements to test the wrap around
66+
testSamples = [1,2,3,4,5,6]
67+
nSample = 6
68+
# Define test FIR filter coefficients vector
69+
firCoefficients = [1,2,3,4,5]
70+
nFilterTaps = 5
71+
72+
# Define ring buffer expected vector
73+
expectedRingBufferItteration = [ [1,0,0,0,0],
74+
[1,2,0,0,0],
75+
[1,2,3,0,0],
76+
[1,2,3,4,0],
77+
[1,2,3,4,5],
78+
[6,2,3,4,5] ]
79+
80+
expectedRingBufferOffset = [1, 2, 3, 4, 0, 1]
81+
82+
# Define output for each sample processed in testsample vector
83+
expectedFilteroutput = [ 1, 4, 10, 20, 35, 50 ]
84+
85+
86+
print("INITLIALIZING FIR OBJECT!")
87+
# Initialize filter object
88+
fir = FIR_filter(firCoefficients)
89+
90+
print("TESTING FIR FILTER!\n")
91+
if(fir.nTaps != nFilterTaps ):
92+
print("FIR FILTER IS BROKEN! - FILTER'S NUMBER OF TAPS IS INCORRECT!")
93+
return
94+
95+
for x in range(nSample):
96+
output = fir.dofilter(testSamples[x])
97+
98+
# Verify output
99+
if (output != expectedFilteroutput[x]):
100+
print("FIR FILTER IS BROKEN! - FILTER OUTPUT IS INCORRECT!")
101+
print("Itteration Index: " + str(x) )
102+
print("Output: " + str(output) + " Expected: " + str(expectedFilteroutput[x]) )
103+
return
104+
105+
# Verify offset itterates correctly and witin boundaries
106+
if (fir.ringBufferOffset != expectedRingBufferOffset[x]):
107+
print("FIR FILTER IS BROKEN! - FILTER BUFFER OFFSET IS INCORRECT!")
108+
print("Itteration Index: " + str(x) )
109+
print("Output: " + str(fir.ringBufferOffset) + " Expected: " + str(expectedRingBufferOffset[x]) )
110+
return
111+
112+
# Verify contents of ring buffer
113+
for i in range(nFilterTaps):
114+
if ( fir.ringbuffer[i] != expectedRingBufferItteration[x][i] ):
115+
print("FIR FILTER IS BROKEN! - FILTER BUFFER CONTENT IS INCORRECT!")
116+
print("Itteration Index: " + str(x) + " Element: " + str(i) )
117+
print("Output: " + str(fir.ringBufferOffset) + " Expected: " + str(expectedRingBufferOffset[x]) )
118+
return
119+
120+
print("\nFIR FILTER IS OPERATING CORRECTLY!")
121+
return
122+
123+
if __name__ == "__main__":
124+
125+
unittest()
126+
127+
"""
128+
Numerical Explanation for the expected value
129+
130+
(offset >= 0) - RESULT: 1.0 Ring Buffer Element = 1.0 Tap Value = 1
131+
(self.ringBufferOffset < offset) - RESULT: 0.0 Ring Buffer Element = 0.0 Tap Value = 2
132+
(self.ringBufferOffset < offset) - RESULT: 0.0 Ring Buffer Element = 0.0 Tap Value = 3
133+
(self.ringBufferOffset < offset) - RESULT: 0.0 Ring Buffer Element = 0.0 Tap Value = 4
134+
(self.ringBufferOffset < offset) - RESULT: 0.0 Ring Buffer Element = 0.0 Tap Value = 5
135+
Sum of products is: 1.0
136+
137+
138+
(offset >= 0) - RESULT: 2.0 Ring Buffer Element = 2.0 Tap Value = 1
139+
(offset >= 0) - RESULT: 2.0 Ring Buffer Element = 1.0 Tap Value = 2
140+
(self.ringBufferOffset < offset) - RESULT: 0.0 Ring Buffer Element = 0.0 Tap Value = 3
141+
(self.ringBufferOffset < offset) - RESULT: 0.0 Ring Buffer Element = 0.0 Tap Value = 4
142+
(self.ringBufferOffset < offset) - RESULT: 0.0 Ring Buffer Element = 0.0 Tap Value = 5
143+
Sum of products is: 4.0
144+
145+
146+
(offset >= 0) - RESULT: 3.0 Ring Buffer Element = 3.0 Tap Value = 1
147+
(offset >= 0) - RESULT: 4.0 Ring Buffer Element = 2.0 Tap Value = 2
148+
(offset >= 0) - RESULT: 3.0 Ring Buffer Element = 1.0 Tap Value = 3
149+
(self.ringBufferOffset < offset) - RESULT: 0.0 Ring Buffer Element = 0.0 Tap Value = 4
150+
(self.ringBufferOffset < offset) - RESULT: 0.0 Ring Buffer Element = 0.0 Tap Value = 5
151+
Sum of products is: 10.0
152+
153+
154+
(offset >= 0) - RESULT: 4.0 Ring Buffer Element = 4.0 Tap Value = 1
155+
(offset >= 0) - RESULT: 6.0 Ring Buffer Element = 3.0 Tap Value = 2
156+
(offset >= 0) - RESULT: 6.0 Ring Buffer Element = 2.0 Tap Value = 3
157+
(offset >= 0) - RESULT: 4.0 Ring Buffer Element = 1.0 Tap Value = 4
158+
(self.ringBufferOffset < offset) - RESULT: 0.0 Ring Buffer Element = 0.0 Tap Value = 5
159+
Sum of products is: 20.0
160+
161+
162+
(offset >= 0) - RESULT: 5.0 Ring Buffer Element = 5.0 Tap Value = 1
163+
(offset >= 0) - RESULT: 8.0 Ring Buffer Element = 4.0 Tap Value = 2
164+
(offset >= 0) - RESULT: 9.0 Ring Buffer Element = 3.0 Tap Value = 3
165+
(offset >= 0) - RESULT: 8.0 Ring Buffer Element = 2.0 Tap Value = 4
166+
(offset >= 0) - RESULT: 5.0 Ring Buffer Element = 1.0 Tap Value = 5
167+
Sum of products is: 35.0
168+
169+
170+
(offset >= 0) - RESULT: 6.0 Ring Buffer Element = 6.0 Tap Value = 1
171+
(self.ringBufferOffset < offset) - RESULT: 10.0 Ring Buffer Element = 5.0 Tap Value = 2
172+
(self.ringBufferOffset < offset) - RESULT: 12.0 Ring Buffer Element = 4.0 Tap Value = 3
173+
(self.ringBufferOffset < offset) - RESULT: 12.0 Ring Buffer Element = 3.0 Tap Value = 4
174+
(self.ringBufferOffset < offset) - RESULT: 10.0 Ring Buffer Element = 2.0 Tap Value = 5
175+
Sum of products is: 50.0
176+
"""
177+

0 commit comments

Comments
 (0)