StreamReceiver: real-time buffer filtered with a causal filter#
StreamReceiver can be used to create a data buffer on which different
operations have already been applied. For instance, a buffer where data is
filtered with a bandpass filter.
# Authors: Mathieu Scheltienne <email@example.com> # # License: LGPL-2.1
StreamRecorder create a new process
to stream or record data. On Windows, mutliprocessing suffers a couple of
restrictions. The entry-point of a multiprocessing program should be
if __name__ == '__main__': to ensure it can safely
import and run the module. More information on the
documentation for multiprocessing on Windows.
This example will use a sample EEG resting-state dataset that can be retrieve
with bsl.datasets. The dataset is stored in the user home
directory in the folder
To simulate an actual signal coming from an LSL stream, a
is used with a 40 seconds resting-state recording. This dataset is already
filtered between (1, 40) Hz.
<Player: StreamPlayer | ON | /Users/scheltie/bsl_data/eeg_sample/resting_state-raw.fif>
BP filter between: 1.0, 40.0 Hz
Data should be filtered along one dimension. For this example, a butter IIR filter is used. More information on filtering is available on the MNE documentation:
def create_bandpass_filter(low, high, fs, n): """ Create a bandpass filter using a butter filter of order n. Parameters ---------- low : float The lower pass-band edge. high : float The upper pass-band edge. fs : float Sampling rate of the data. n : int Order of the filter. Returns ------- sos : array Second-order sections representation of the IIR filter. zi_coeff : array Initial condition for sosfilt for step response steady-state. """ # Divide by the Nyquist frequency bp_low = low / (0.5 * fs) bp_high = high / (0.5 * fs) # Compute SOS output (second order sections) sos = butter(n, [bp_low, bp_high], btype='band', output='sos') # Construct initial conditions for sosfilt for step response steady-state. zi_coeff = sosfilt_zi(sos).reshape((sos.shape, 2, 1)) return sos, zi_coeff
EEG data is usually subject to a lage DC offset, which corresponds to a step
response steady-state. The initial conditions are determined by multiplying
zi_coeff with the DC offset. The DC offset value can be approximated
by taking the mean of a small window.
When creating the filtered buffer, the duration has to be define to create a numpy array of the correct shape and pre-allocate the required space.
buffer_duration = 5 # in seconds
StreamReceiver is created. But the actual buffer and window
size of the
StreamReceiver are set as small as possible. The buffer
StreamReceiver is only used to store the last samples until
they are retrieved, filtered, and added to the filtered buffer. In this
StreamReceiver buffer and window size are set to 200 ms.
StreamReceiver opens an LSL inlet for each connected stream at
initialization. The inlet’s buffer is empty when created and fills up as
time passes. Data is pulled from the LSL inlet each time
acquire is called.
The filtered buffer can be define as a class that uses the elements created
previously. The method
.update() pulls new samples from the LSL stream,
filters and add them to the buffer while removing older samples that are now
exiting the buffer.
StreamReceiver appends a
TRIGGER channel at the beginning of
the data array. For filtering, the trigger channel is not needed. Thus, the
number of channels is reduced by 1 and the first channel on the
(samples, channels) data array is ignored.
For this example, the filter is defined between 5 and 10 Hz to emphasize its effect as the dataset streamed is already filtered between (1, 40) Hz.
class Buffer: """ A buffer containing filter data and its associated timestamps. Parameters ---------- buffer_duration : float Length of the buffer in seconds. sr : bsl.StreamReceiver StreamReceiver connected to the desired data stream. """ def __init__(self, buffer_duration, sr): # Store the StreamReceiver in a class attribute self.sr = sr # Retrieve sampling rate and number of channels self.fs = int(self.sr.streams[stream_name].sample_rate) self.nb_channels = len(self.sr.streams[stream_name].ch_list) - 1 # Define duration self.buffer_duration = buffer_duration self.buffer_duration_samples = ceil(self.buffer_duration * self.fs) # Create data array self.timestamps = np.zeros(self.buffer_duration_samples) self.data = np.zeros((self.buffer_duration_samples, self.nb_channels)) # For demo purposes, let's store also the raw data self.raw_data = np.zeros((self.buffer_duration_samples, self.nb_channels)) # Create filter BP (1, 15) Hz and filter variables self.sos, self.zi_coeff = create_bandpass_filter(5., 10., self.fs, n=2) self.zi = None def update(self): """ Update the buffer with new samples from the StreamReceiver. This method should be called regularly, with a period at least smaller than the StreamReceiver buffer length. """ # Acquire new data points self.sr.acquire() data_acquired, ts_list = self.sr.get_buffer() self.sr.reset_buffer() if len(ts_list) == 0: return # break early, no new samples # Remove trigger channel data_acquired = data_acquired[:, 1:] # Filter acquired data if self.zi is None: # Initialize the initial conditions for the cascaded filter delays. self.zi = self.zi_coeff * np.mean(data_acquired, axis=0) data_filtered, self.zi = sosfilt(self.sos, data_acquired, axis=0, zi=self.zi) # Roll buffer, remove samples exiting and add new samples self.timestamps = np.roll(self.timestamps, -len(ts_list)) self.timestamps[-len(ts_list):] = ts_list self.data = np.roll(self.data, -len(ts_list), axis=0) self.data[-len(ts_list):, :] = data_filtered self.raw_data = np.roll(self.raw_data, -len(ts_list), axis=0) self.raw_data[-len(ts_list):, :] = data_acquired
Testing the filtered buffer#
The filtered buffer must be updated regularly. In this example, the
StreamReceiver buffer has been initialized at 200 ms. Thus, the
filtered buffer should be updated at most every 200 ms, else, there is a risk
that a couple of samples will be missed between 2 updates.
A 15 seconds acquisition is used to test the buffer. Every 5 seconds, the buffer is retrieved and plotted.
# Create plot f, ax = plt.subplots(2, 1, sharex=True) ax.set_title('Raw data') ax.set_title('Filtered data between (5, 10) Hz') # Create buffer buffer = Buffer(buffer_duration, sr) # Start by filling once the entire buffer (to get rid of the initialization 0) timer = Timer() while timer.sec() <= buffer_duration: buffer.update() # Acquire during 15 seconds and plot every 5 seconds idx_last_plot = 1 timer.reset() while timer.sec() <= 15: buffer.update() # check if we just passed the 5s between plot limit if timer.sec() // 5 == idx_last_plot: # average all channels to simulate an evoked response ax.plot(buffer.timestamps, np.mean(buffer.raw_data[:, 1:], axis=1)) ax.plot(buffer.timestamps, np.mean(buffer.data[:, 1:], axis=1)) idx_last_plot += 1
Stop the mock LSL stream.
del sr # disconnects and close the LSL inlet. player.stop()
Total running time of the script: ( 0 minutes 22.646 seconds)