Table of Content
Electrocardiogram (ECG)⚓︎
Electrocardiogram (ECG) measures the electrical activity in our heart. ECG machine is commonly used in hospital to monitor patient's heart rate. Nowadays, a few consumer-grade device has integrated the ECG sensor into their smartwatch, for example Apple Watch and Fitbit.
ECG Morphology⚓︎
ECG morphology can be described by 5 waves: P-wave, Q-wave, R-wave, S-wave, and T-wave.
- P-wave reflects atrial depolarization. It is a small wave due to the small muscle mass made by atria.
- T-wave reflects the rapid repolarization of contractile cell. Normal T-wave is slightly asymmetric, with a steeper downward slope.
- Q-wave, R-wave, S-wave are knowns as QRS complex in general. The QRS complex represents the depolarization of the ventricles. A QRS complex with a short duration implies a rapid depolarization, proving that the conduction system functions properly. A QRS complex with a long duration implies a slow depolarization, showing a high likelihood the dysfunction in conduction system.
Determine the heart rate from ECG⚓︎
The heart rate can be determined by measuring the duration of RR interval between two successive QRS complexes. Once the RR interval is known, the heart rate can be computed as follows:
where HR is heart rate is bpm, and \(d_{RR}\) is the duration of RR interval.
Hence, the first problem to heart rate estimation is to locate the position of R-peak.
R-peak Detection⚓︎
We will use a sample ECG data to demonstrate the R-peak detection, estimating the RR interval, and calculating the heart rate.
This sample ECG data is generated artificially using neurokit2. A 5 second data is generated with 100Hz sampling rate.
The data is saved in csv file so we can use pandas to read the data. We can visualize the sample data using matplotlib.
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
fs = 100
data = pd.read_csv('sample-ecg-data-70.csv')
plt.figure(figsize=(12,3))
plt.plot(data.index*(1/fs), data.ECG)
plt.xlabel('Time (s)')
plt.ylabel('ECG Amplitude')
plt.show()
Output:
From the plot, we can clearly identify the P-wave, T-wave, and the QRS complex. The goal is to locate all the R-peaks given the 5 second ECG signal. There are a number of ways to R-peaks detection, one of them is described in Halmiton algorithm
Here, we will use a simple moving average approach to R-peak detection. The key steps of this approach is summarized as follows:
- compute the moving average with a window that is at least half the sampling frequency
- mark the region of interest (RoI) for those signals above the moving average
- locate the peak value and peak location for all RoIs
- calculate the average peak value of all peaks, and reject those peaks with peak value less than the average
- compute the duration between each RR-interval, and use the mean duration to compute the heart rate.
Step 1: Moving average⚓︎
The sampling rate is 100Hz, so half the sampling rate is 50Hz, which means 50 data points per one second.
# 1. compute the moving average with window of 50 data points
window = 50
# use the convolve function by numpy to compute the moving average
data['ECG_ma'] = np.convolve(data.ECG, np.ones(window), 'same') /window
plt.figure(figsize=(12,3))
plt.plot(data.index*(1/fs), data.ECG, alpha=0.5, label = 'raw ECG data')
plt.plot(data.index*(1/fs), data.ECG_ma, label = 'moving average')
plt.xlabel('Time (s)')
plt.ylabel('ECG Amplitude')
plt.legend(framealpha=0.6)
plt.show()
Output:
Step 2: RoIs Identification⚓︎
From the above plot, we can see that some data points are above the moving average line, and some are below. We can mark those above the moving average line as RoI.
# 2. find all the RoIs
data['roi'] = np.zeros(len(data))
track_start = True
for index, row in data.iterrows():
# check if the point is above the moving average point
if (row.ECG > row.ECG_ma):
if (track_start):
k = int(np.max(data.roi) + 1)
print(f'[{k}] RoI detected...')
track_start = False
data.roi.iloc[index] = k
else:
track_start = True
Output:
[1] RoI detected...
[2] RoI detected...
[3] RoI detected...
[4] RoI detected...
[5] RoI detected...
[6] RoI detected...
[7] RoI detected...
[8] RoI detected...
[9] RoI detected...
[10] RoI detected...
[11] RoI detected...
[12] RoI detected...
[13] RoI detected...
[14] RoI detected...
[15] RoI detected...
[16] RoI detected...
[17] RoI detected...
[18] RoI detected...
Step 3: Peak Detection⚓︎
For each RoI, we can compute the peak value and peak position by taking the maximum value.
# 3. locate the peak for all RoIs
groups = data.groupby(['roi'])
columns = ['roi', 'value', 'position']
peak = pd.DataFrame(columns = columns)
for k,(roi, group) in enumerate(groups):
if (roi != 0):
peak = peak.append(pd.Series({
'roi': roi,
'value': np.max(group.ECG),
'position': group.index[np.argmax(group.ECG)]
}, name = k-1))
plt.figure(figsize=(12,3))
plt.plot(data.index*(1/fs), data.ECG, alpha=0.5, label = 'raw ECG data')
plt.plot(data.index*(1/fs), data.ECG_ma, label = 'moving average')
plt.scatter(peak.position*(1/fs), peak.value, marker='x', color='red', s=70, label="R-peak")
plt.xlabel('Time (s)')
plt.ylabel('ECG Amplitude')
plt.legend(framealpha=0.6)
plt.show()
Output:
Step 4: Peak Validation⚓︎
From the above plot, it shows that the peak of QRS complex is detected, as well as the peak from T-wave and P-wave. However, we just want to keep the peak from the QRS complex. We can validate the peak to see if the peak is the one we would like to keep or to reject. The simplest way is to compare each peak value with the average peak value and then keep only those peaks with value greater than the average peak value.
# compute the average peak values
average_peak_value = peak.value.mean()
# keep only the peaks that are greater than the average peak value
val_peak = peak.loc[peak.value>average_peak_value, :]
plt.figure(figsize=(12,3))
plt.plot(data.index*(1/fs), data.ECG, alpha=0.5, label = 'raw ECG data')
plt.plot(data.index*(1/fs), data.ECG_ma, label = 'moving average')
plt.scatter(val_peak.position*(1/fs), val_peak.value, marker='x', color='red', s=70, label="Validated R-peak")
plt.xlabel('Time (s)')
plt.ylabel('ECG Amplitude')
plt.legend(framealpha=0.6)
plt.show()
Output:
Step 5: Heart Rate Computation⚓︎
First, compute the duration between each RR interval, and then use the duration to compute for the heart rate, i.e.,
RR_duration = (val_peak.position[1:].values - val_peak.position[:-1].values) * 1/fs
HR = 60/np.mean(RR_duration)
print(f'Heart rate: {HR} bpm')
Output:
Heart rate: 70.09345794392523 bpm
Wrap Up the R-peak Detection⚓︎
We can group those five steps into a single function that accepts the ECG data and sampling rate as input argument and return a list of tuple containing R-peak values and their corresponding positions. The benefit of using function is that we can reuse the function for different ECG data without rewriting the same code. Let's realize this function implementation for R-peak detection.
def moving_average(ecg, window):
return np.convolve(ecg, np.ones(window), 'same') /window
def rpeak_detection(ecg, fs):
data = pd.DataFrame(ecg, columns = ['ECG'])
# 1. moving average
data['ECG_ma'] = moving_average(data.ECG, int(fs/2))
# 2. find all the RoIs
data['roi'] = np.zeros(len(data))
track_start = True
for index, row in data.iterrows():
# check if the point is above the moving average point
if (row.ECG > row.ECG_ma):
if (track_start):
k = int(np.max(data.roi) + 1)
track_start = False
data.roi.iloc[index] = k
else:
track_start = True
# 3. locate the peak for all RoIs
groups = data.groupby(['roi'])
columns = ['roi', 'value', 'position']
peak = pd.DataFrame(columns = columns)
for k,(roi, group) in enumerate(groups):
if (roi != 0):
peak = peak.append(pd.Series({
'roi': roi,
'value': np.max(group.ECG),
'position': group.index[np.argmax(group.ECG)]
}, name = k-1))
# 4. validate the peak
average_peak_value = peak.value.mean()
val_peak = peak.loc[peak.value>average_peak_value, :]
return (val_peak.position, val_peak.value)
Create a function to compute the heart rate given the list of R-peak positions as input argument.
def cal_HR(rpeak_position):
RR_duration = (rpeak_position[1:].values - rpeak_position[:-1].values) * 1/fs
return 60/np.mean(RR_duration)
Create a plotting function to visualize the detected R-peaks with respect to the ECG data.
def plot_rpeak(fs, ecg, peak_position, peak_value):
plt.plot(np.arange(0, len(ecg))*(1/fs), ecg, alpha=0.5, label = 'raw ECG data')
plt.scatter(peak_position*(1/fs), peak_value, marker='x', color='red', s=70, label="Validated R-peak")
plt.xlabel('Time (s)')
plt.ylabel('ECG Amplitude')
plt.legend(framealpha=0.6)
plt.show()
We can now call the function by supplying the ECG data and the sampling rate. Besides trying with the ECG data we have previously, we can also try with another two ECG samples: sample-ecg-data-75, sample-ecg-data-85.
Both samples are of 5 second duration with sampling rate equals to 100Hz. Note that the rpeak_detection
function takes in a numpy array as input argument, so we got to be careful when supplying the ecg input to the function.
# try with sample-ecg-data-70
fs = 100
data = pd.read_csv('sample-ecg-data-70.csv')
rpeaks = rpeak_detection(data.ECG, fs)
plt.figure(figsize=(12,3))
plot_rpeak(100, data.ECG, rpeaks[0], rpeaks[1])
# compute the heart rate
HR = cal_HR(rpeaks[0])
print(f'Heart rate: {HR} bpm')
Output:
Output:
Heart rate: 70.09345794392523 bpm
# try with sample-ecg-data-75
fs = 100
data = pd.read_csv('sample-ecg-data-75.csv')
rpeaks = rpeak_detection(data.ECG, fs)
plt.figure(figsize=(12,3))
plot_rpeak(100, data.ECG, rpeaks[0], rpeaks[1])
# compute the heart rate
HR = cal_HR(rpeaks[0])
print(f'Heart rate: {HR} bpm')
Output:
Output:
Heart rate: 75.0 bpm
# try with sample-ecg-data-85
fs = 100
data = pd.read_csv('sample-ecg-data-85.csv')
rpeaks = rpeak_detection(data.ECG, fs)
plt.figure(figsize=(12,3))
plot_rpeak(100, data.ECG, rpeaks[0], rpeaks[1])
# compute the heart rate
HR = cal_HR(rpeaks[0])
print(f'Heart rate: {HR} bpm')
Output:
Output:
Heart rate: 85.02024291497976 bpm
Resources⚓︎
The complete code to the above implementation is available here.
Many algorithms have been developed to compute the R-peak detection. Here you can find a collection of 7 ECG heartbeat detection algorithms implemented in Python. You can try out those peak detection algorithms by installing the package using pip install py-ecg-detectors
.