StreamReceiver: real-time alpha band power#

BCI, Neurofeedback, or any online paradigm that needs access to real-time signals to compute a given metric can be designed with a StreamReceiver.

# Authors: Mathieu Scheltienne <mathieu.scheltienne@fcbg.ch>
#
# License: LGPL-2.1

Warning

Both StreamPlayer and StreamRecorder create a new process to stream or record data. On Windows, mutliprocessing suffers a couple of restrictions. The entry-point of a multiprocessing program should be protected with if __name__ == '__main__': to ensure it can safely import and run the module. More information on the documentation for multiprocessing on Windows.

This example will use a sample EEG resting-state dataset that can be retrieve with bsl.datasets. The dataset is stored in the user home directory in the folder bsl_data (e.g. C:\Users\User\bsl_data).

import os
from pathlib import Path
import time

from matplotlib import pyplot as plt
import mne
import numpy as np

from bsl import StreamRecorder, StreamReceiver, StreamPlayer, datasets
from bsl.utils import Timer
from bsl.triggers.software import SoftwareTrigger

To simulate an actual signal coming from an LSL stream, a StreamPlayer is used with a 40 seconds resting-state recording.

Out:

<Player: StreamPlayer | ON | /Users/scheltie/bsl_data/eeg_sample/resting_state-raw.fif>

Basics of StreamReceiver#

Now that a StreamPlayer is streaming data, a StreamReceiver is defined to access it in real-time.

receiver = StreamReceiver(bufsize=2, winsize=1, stream_name='StreamPlayer')
time.sleep(2)  # wait 2 seconds to fill LSL inlet.

Note

A StreamReceiver opens an LSL inlet for each connected stream at initialization. The inlet’s buffer is empty when created and fills up as time passes. Data is pulled from the LSL inlet each time acquire is called.

Warning

If the StreamReceiver buffer/window is large and data is pulled too often from the LSL inlet, there might not be enough new samples to pull an entire window/buffer length.

receiver.acquire()
data1, timestamps1 = receiver.get_window()
print (data1.shape)
time.sleep(1)
receiver.acquire()
data2, timestamps2 = receiver.get_window()
print (data2.shape)
receiver.acquire()
data3, timestamps3 = receiver.get_window()
print (data3.shape)

Out:

(512, 65)
(512, 65)
(512, 65)

The code snippet above retrieved 3 different windows of 1 second each from the LSL stream sampled @ 512 Hz. The first window is retrieved 2 seconds after the StreamReceiver was created. The second window is retrieved 1 second after the second window. The third window is retrieved right after the second window.

Let’s visualize how this 3 different window are placed on the timeline:

idx = 10  # Select one channel
f, ax = plt.subplots(1, 1, figsize=(10, 10))
ax.plot(timestamps1, data1[:, idx], color='#1f77b4')
ax.plot(timestamps2, data2[:, idx], color='#ff7f0e')
ax.plot(timestamps3, data3[:, idx], color='#2ca02c')
real time regulation of alpha waves

Out:

[<matplotlib.lines.Line2D object at 0x134cf4700>]

As expected, the second and third window are mostly overlapping and contains mostly the same data. To improve visualization, each window can be shifted vertically with a fix offset:

f, ax = plt.subplots(1, 1, figsize=(10, 10))
ax.plot(timestamps1, data1[:, idx], color='#1f77b4')
ax.plot(timestamps2, data2[:, idx]+2, color='#ff7f0e')
ax.plot(timestamps3, data3[:, idx]+4, color='#2ca02c')
real time regulation of alpha waves

Out:

[<matplotlib.lines.Line2D object at 0x134aa00d0>]

Finally, the get_window and get_buffer methods are only getters and do not modify the buffer. Pulling new data in the buffer is only done in a separate Thread by acquire. The exact timings at which the StreamReceiver acquires new data is left to the discretion of the user.

data4, timestamps4 = receiver.get_window()
print ((data4 == data3).all(), (timestamps4 == timestamps3).all())

Out:

True True

Online loop with a StreamReceiver#

The example below is a simple online loop shifting between 2 phases lasting each 3 seconds. The loop will stop once each phase has been experienced twice, thus after 12 seconds. The beginning of each phase is marked with a trigger event, (1) or (2).

As an example, the alpha band power will be computed for each phase on 1 second successive acquisition window.

# FFT settings
winsize_in_samples = \
    receiver.streams['StreamPlayer'].sample_rate * receiver.winsize
sample_spacing = 1./receiver.streams['StreamPlayer'].sample_rate
frequencies = np.fft.rfftfreq(n=int(winsize_in_samples), d=sample_spacing)
alpha_band = np.where(np.logical_and(8<=frequencies, frequencies<=13))[0]
fft_window = np.hanning(winsize_in_samples)

# Loop settings
n_cycles = 2  # 2 alternation of phases
phase_duration = 3  # in seconds

Acquired data is saved to disk with a StreamRecorder and the beginning of each phase is marked with a trigger event. For this example, a SoftwareTrigger is used, but this example would be equally valid with a different type of trigger.

Note

SoftwareTrigger must be created after a StreamRecorder is started and closed/deleted before a StreamRecorder is stopped.

All triggers do not need an active StreamRecorder to be created.

record_dir = Path('~/bsl_data/examples').expanduser()
os.makedirs(record_dir, exist_ok=True)
recorder = StreamRecorder(record_dir, fname='example_real_time')
recorder.start()
print (recorder)
trigger = SoftwareTrigger(recorder, verbose=True)

Out:

<Recorder: All streams | ON | /Users/scheltie/bsl_data/examples>

The 2 first events, phase1 and phase2 are defined with their respective timings as tuples (timing, event). The timing are offset by 0.2 to give a little headroom to the script and to avoid clipping the first phase.

The values used to mark the beginning of each phase are stored in a dict.

offset = 0.2  # offset to avoid clipping the first phase
events = [(offset, 'phase1'), (offset+phase_duration, 'phase2')]
trigger_values = {'phase1': 1, 'phase2': 2}

There is actually 2 nested online loops: one to switch between phases and one to acquire data and compute the alpha band power inside a phase.

# number of time each phase has been experienced
n = 1

# list to store results
alphas = list()
timings = list()

# timers
paradigm_timer = Timer()  # timer used to switch between phases
phase_timer = Timer()  # timer used within a phase to count the duration

next_event_timing, event = events.pop(0)
while n <= n_cycles:
    if next_event_timing <= paradigm_timer.sec():
        # schedule next similar event
        events.append((next_event_timing+2*phase_duration, event))

        # add new result list
        alphas.append([])
        timings.append([])

        # reset timer and send trigger
        phase_timer.reset()
        trigger.signal(trigger_values[event])

        while phase_timer.sec() <= phase_duration:
            # acquisition
            receiver.acquire()
            raw, samples = receiver.get_window(return_raw=True)

            if samples.shape[0] != winsize_in_samples:
                continue  # skip incomplete windows

            # processing
            raw.set_eeg_reference(ref_channels='average', projection=False)
            data = raw.pick(picks='eeg', exclude='bads').get_data()
            data = np.multiply(data, fft_window)
            fftval = np.abs(np.fft.rfft(data, axis=1) / data.shape[-1])
            alpha = np.average(np.square(fftval[:, alpha_band]).T)

            # append to result list
            alphas[-1].append(alpha)
            timings[-1].append(samples[0])

        # increment if this is the second phase
        if event == 'phase2':
            n += 1
        # Retrieve next event
        next_event_timing, event = events.pop(0)

# close the trigger and stop the recorder
trigger.close()
recorder.stop()

As you may have noticed, get_window or get_buffer return by default data as a numpy.array, but it can also be return directly as a Raw instance if the argument return_raw is set to True.

Depending on the CPU, on the current CPU load, and on the processing applied, the number of acquired window (points) may vary.

print ([len(a) for a in alphas])
print ([len(t) for t in timings])

f, ax = plt.subplots(1, 1, figsize=(10, 10))
for k in range(len(alphas)):
    color = '#1f77b4' if k%2 == 0 else '#ff7f0e'
    ax.plot(timings[k], alphas[k], color=color)
real time regulation of alpha waves

Out:

[132, 122, 127, 123]
[132, 122, 127, 123]

The saved Raw instance can then be loaded and analyzed.

fname = record_dir / 'fif' / 'example_real_time-StreamPlayer-raw.fif'
raw = mne.io.read_raw_fif(fname, preload=True)
print (raw)
events = mne.find_events(raw, stim_channel='TRIGGER')
print (events)

Out:

<Raw | example_real_time-StreamPlayer-raw.fif, 65 x 6304 (12.3 s), ~3.2 MB, data loaded>
[[ 120    0    1]
 [1667    0    2]
 [3204    0    1]
 [4741    0    2]]

Stop the mock LSL stream.

del receiver  # disconnects and close the LSL inlet.
player.stop()

Total running time of the script: ( 0 minutes 21.295 seconds)

Gallery generated by Sphinx-Gallery