StreamReceiver: real-time alpha band power#
BCI, Neurofeedback, or any online paradigm that needs access to real-time
signals to compute a given metric can be designed with a
# Authors: Mathieu Scheltienne <firstname.lastname@example.org> # # License: LGPL-2.1
StreamRecorder create a new process
to stream or record data. On Windows, mutliprocessing suffers a couple of
restrictions. The entry-point of a multiprocessing program should be
if __name__ == '__main__': to ensure it can safely
import and run the module. More information on the
documentation for multiprocessing on Windows.
This example will use a sample EEG resting-state dataset that can be retrieve
with bsl.datasets. The dataset is stored in the user home
directory in the folder
To simulate an actual signal coming from an LSL stream, a
is used with a 40 seconds resting-state recording.
<Player: StreamPlayer | ON | /Users/scheltie/bsl_data/eeg_sample/resting_state-raw.fif>
Basics of StreamReceiver#
StreamReceiver opens an LSL inlet for each connected stream at
initialization. The inlet’s buffer is empty when created and fills up as
time passes. Data is pulled from the LSL inlet each time
acquire is called.
StreamReceiver buffer/window is large and data is pulled
too often from the LSL inlet, there might not be enough new samples to
pull an entire window/buffer length.
(512, 65) (512, 65) (512, 65)
The code snippet above retrieved 3 different windows of 1 second each from
the LSL stream sampled @ 512 Hz. The first window is retrieved 2 seconds
StreamReceiver was created. The second window is retrieved 1
second after the second window. The third window is retrieved right after the
Let’s visualize how this 3 different window are placed on the timeline:
[<matplotlib.lines.Line2D object at 0x134cf4700>]
As expected, the second and third window are mostly overlapping and contains mostly the same data. To improve visualization, each window can be shifted vertically with a fix offset:
[<matplotlib.lines.Line2D object at 0x134aa00d0>]
get_buffer methods are only getters and do not modify
the buffer. Pulling new data in the buffer is only done in a separate Thread
acquire. The exact timings at which the
StreamReceiver acquires new data is left to the discretion of the
Online loop with a StreamReceiver#
The example below is a simple online loop shifting between 2 phases lasting each 3 seconds. The loop will stop once each phase has been experienced twice, thus after 12 seconds. The beginning of each phase is marked with a trigger event, (1) or (2).
As an example, the alpha band power will be computed for each phase on 1 second successive acquisition window.
# FFT settings winsize_in_samples = \ receiver.streams['StreamPlayer'].sample_rate * receiver.winsize sample_spacing = 1./receiver.streams['StreamPlayer'].sample_rate frequencies = np.fft.rfftfreq(n=int(winsize_in_samples), d=sample_spacing) alpha_band = np.where(np.logical_and(8<=frequencies, frequencies<=13)) fft_window = np.hanning(winsize_in_samples) # Loop settings n_cycles = 2 # 2 alternation of phases phase_duration = 3 # in seconds
Acquired data is saved to disk with a
StreamRecorder and the beginning
of each phase is marked with a trigger event. For this example, a
SoftwareTrigger is used, but this example would be
equally valid with a different type of trigger.
All triggers do not need an active
StreamRecorder to be created.
<Recorder: All streams | ON | /Users/scheltie/bsl_data/examples>
The 2 first events,
phase2 are defined with their
respective timings as tuples
(timing, event). The timing are offset by 0.2
to give a little headroom to the script and to avoid clipping the first
The values used to mark the beginning of each phase are stored in a
There is actually 2 nested online loops: one to switch between phases and one to acquire data and compute the alpha band power inside a phase.
# number of time each phase has been experienced n = 1 # list to store results alphas = list() timings = list() # timers paradigm_timer = Timer() # timer used to switch between phases phase_timer = Timer() # timer used within a phase to count the duration next_event_timing, event = events.pop(0) while n <= n_cycles: if next_event_timing <= paradigm_timer.sec(): # schedule next similar event events.append((next_event_timing+2*phase_duration, event)) # add new result list alphas.append() timings.append() # reset timer and send trigger phase_timer.reset() trigger.signal(trigger_values[event]) while phase_timer.sec() <= phase_duration: # acquisition receiver.acquire() raw, samples = receiver.get_window(return_raw=True) if samples.shape != winsize_in_samples: continue # skip incomplete windows # processing raw.set_eeg_reference(ref_channels='average', projection=False) data = raw.pick(picks='eeg', exclude='bads').get_data() data = np.multiply(data, fft_window) fftval = np.abs(np.fft.rfft(data, axis=1) / data.shape[-1]) alpha = np.average(np.square(fftval[:, alpha_band]).T) # append to result list alphas[-1].append(alpha) timings[-1].append(samples) # increment if this is the second phase if event == 'phase2': n += 1 # Retrieve next event next_event_timing, event = events.pop(0) # close the trigger and stop the recorder trigger.close() recorder.stop()
Depending on the CPU, on the current CPU load, and on the processing applied, the number of acquired window (points) may vary.
[132, 122, 127, 123] [132, 122, 127, 123]
Raw instance can then be loaded and analyzed.
<Raw | example_real_time-StreamPlayer-raw.fif, 65 x 6304 (12.3 s), ~3.2 MB, data loaded> [[ 120 0 1] [1667 0 2] [3204 0 1] [4741 0 2]]
Stop the mock LSL stream.
del receiver # disconnects and close the LSL inlet. player.stop()
Total running time of the script: ( 0 minutes 21.295 seconds)