Foundation Models for Time Series—A Case Study Using MOMENT and Brain Signals

Sebastián Castaño - Feb 24 - - Dev Community

For this post, I wanted to take a look at general purpose time series foundation models. In particular, I am curious about what the “general” part of these models actually entails. It is not intended as a throughout benchmark, but more as an exploratory exercise.

I will start with a short intro about why foundation models for time series are in a different category compared to language or vision models, with an overview on what has been released the last year or so.

Then, I will transition to a case study with one of these models—MOMENT—by analyzing Electroencephalographic (EEG) data from a brain-computer interface experiment. Why EEG data? well, earlier this year, Piramidal—a YC-backed company building a foundational model specifically for EEG data—raised a $6.5M seed round. That left me wondering how good general-purpose models can be if companies are raising rounds this size for training domain-specific models.

Impatient? TLDR;

Why are general time series foundation models different from other modalities?

Language and vision have fundamental structures that are largely consistent, regardless of the domain. With language, for example, models trained on a math textbook and Shakespearean prose will still capture structural rules—syntax, grammar, sentence formation—. Domain-specific vocabulary or context can vary, but the foundational structure of language doesn’t change. Similarly, vision models can learn shapes, edges, colors, spatial patterns and textures that carry semantic meaning.

However, when it comes to time series, I initially had some questions about the potential for such broad generalizability. Time series data holds information far beyond trends and seasonality: complex dynamics like phase and frequency modulation, as well as phase-amplitude coupling, can govern many processes and can drastically vary from one domain to the other. And this doesn’t even account for multi-channel data, where cross-channel interactions add yet another layer of complexity.

Overview of Time Series Foundation Models

Here’s an overview of some pre-trained models for time series, along with the companies or labs behind them:

Model Link Company / University
TimesFM Google
Lag-Llama -
ForecastPFN Abacus.ai
TinyTimeMixer IBM
Uni2TS Salesforce
Moment CMU
MOIRAI Salesforce
Toto Datadog
Chronos Amazon
TimeGPT NIXTLA

Many of these models are pre-trained on data from diverse domains—weather, medicine (e.g., ECGs), finance, and more. The pre-training approach for most of these models is similar to that used in vision models: masks are applied at various time points, and the model learns to predict the values for these masked intervals, effectively filling in the gaps.

Ideally, pre-training enables these models to identify and understand informative patterns inherent across domains. Beyond basic seasonality and trends, a robust foundation model should capture higher-level structures such as phase and frequency modulations and cross-channel interactions. This level of adaptability would make such models truly general-purpose, equipping them to handle the intricate dynamics present in complex time series data. However, given the vast range of domains and the diversity of signal types, achieving this level of adaptability seems very challenging. The differences in underlying structures across domains makes it difficult for any single model to effectively capture the full complexity of any type of time series data, which would explain the emergence domain-specific foundation models such as the one the developed by Piramidal.

Case study

As exploration exercise , I chose MOMENT. Reported results look very good. Besides that, it is very well documented. For the data, I chose an EEG dataset corresponding to a standard classification task for a brain-computer interface. More on the data below.

The goal is to assess the zero-shot performance of the MOMENT embeddings, as used in an LDA classifier downstream. The code below can also be found here

Step: Set up general parameters

We'll be using the small version of the model and a subset of the EEG channels to simplify the analysis.

!uv pip install numpy pandas scikit-learn matplotlib tqdm torch moabb pyriemann
!uv pip install git+https://github.com/moment-timeseries-foundation-model/moment.git

TORCH_DEVICE = 'mps' # Apple Silicon GPU
MODEL_NAME = 'AutonLab/MOMENT-1-small'
EEG_CHANNELS = ['Fz', 'C3', 'Cz', 'C4', 'P3', 'Pz', 'P4', 'O1', 'O2'] 
SAMPLING_FREQUENCY = 512
Enter fullscreen mode Exit fullscreen mode

Step: Load the data

We will use one of the sessions of the ERP study published by Hübner et al. 2017 and available via de Mother Of All BCI Benchmarks MOABB. The experiment in a nutshell: The patients are presented a rapid sequence of different visual stimuli, some of which they are instructed to pay attention to—target stimuli—and some of which they have to ignore—non-target stimuli—. It is expected that the brain response to each class is different.

The dataset contains data for 13 subjects participating in 3 sessions each. It is not the scope of this post to do a throughout benchmark, so we will limit the dataset to one of the sessions of one of the subjects.

import contextlib
import io
import warnings

import numpy as np
from sklearn.preprocessing import LabelEncoder
from moabb import datasets
from moabb.paradigms import P300

dataset = datasets.Huebner2017(interval=[0,.99])
dataset.download()

paradigm = P300(    
    resample=SAMPLING_FREQUENCY,
    baseline=None,
    channels=EEG_CHANNELS
)

stdout = io.StringIO()
with contextlib.redirect_stdout(stdout), warnings.catch_warnings(record=True) as w:
    X, y, metadata = paradigm.get_data(
        dataset=dataset,
        subjects=[1],
        return_epochs=False,
        return_raws=False,
        cache_config=None,
        postprocess_pipeline=None,
    )

# Limit to one session
session = '0'
ids_mask_session = metadata.session == session
y_encoded = LabelEncoder().fit_transform(y)

X = X[ids_mask_session]
y = y_encoded[ids_mask_session]

# Print number of classes and number of samples per class
unique_classes, counts = np.unique(y, return_counts=True)
print(f"Number of classes: {len(unique_classes)}")
for cls, count in zip(unique_classes, counts):
    print(f"Class '{cls}': {count} samples")
Enter fullscreen mode Exit fullscreen mode
Number of classes: 2
Class '0': 3275 samples
Class '1': 1008 samples
Enter fullscreen mode Exit fullscreen mode

Step: Explore the data

Let's take a look at what the average brain activity is for each target and non target stimuli, for each of the channels. I have highlighted two periods, where we find archetypical responses for this type of stimuli, the so-called N100 and P300: a (N)egative spike around 100ms after stimuli, and a (P)ositive spike around 300ms after stimuli. N100 is to be seen predominantly in electrodes around the visual cortex, i.e., Occital Channels O1 and O2, where as P300 is commonly seen in Central-Parietal, closer to the top of the skull, i.e., Cz and Pz.

import plotly.express as px
import numpy as np
import matplotlib.pyplot as plt
import plotly.graph_objects as go
from plotly.subplots import make_subplots


ids_target = y == 1
X_target = X[ids_target]

ids_nontarget = y == 0
X_nontarget = X[ids_nontarget]

X_target_avg = X_target.mean(axis=0)
X_nontarget_avg = X_nontarget.mean(axis=0)

num_channels = X_target_avg.shape[0]

# Create a figure with subplots in a square grid
num_cols = int(np.ceil(np.sqrt(num_channels)))
num_rows = int(np.ceil(num_channels / num_cols))
fig = make_subplots(rows=num_rows, cols=num_cols, subplot_titles=EEG_CHANNELS, shared_xaxes=True, shared_yaxes=True, vertical_spacing=0.05, horizontal_spacing=0.02)

time_vector: np.ndarray = np.arange(X_target_avg.shape[1])/SAMPLING_FREQUENCY
# Plot each channel in a separate subplot
for i in range(num_channels):
    row: int = i // num_cols + 1
    col: int = i % num_cols + 1
    showlegend: bool = (i == 0)
    fig.add_trace(
        go.Scatter(
            x=time_vector, 
            y=X_target_avg[i], 
            mode='lines', 
            name='Target' if showlegend else None, 
            line=dict(color='red'),
            showlegend=showlegend
        ), 
        row=row, col=col
    )
    fig.add_trace(
        go.Scatter(
            x=time_vector, 
            y=X_nontarget_avg[i], 
            mode='lines', 
            name='NonTarget' if showlegend else None, 
            line=dict(color='blue'),
            showlegend=showlegend
        ), 
        row=row, col=col
    )
    fig.add_vrect(
        x0=60/SAMPLING_FREQUENCY, x1=80/SAMPLING_FREQUENCY, 
        fillcolor="#EFCB66", opacity=0.5, 
        layer="below", line_width=0,
        row=row, col=col
    )
    fig.add_vrect(
        x0=128/SAMPLING_FREQUENCY, x1=256/SAMPLING_FREQUENCY, 
        fillcolor="#90EE90", opacity=0.5, 
        layer="below", line_width=0,
        row=row, col=col
    )
    if row == num_rows:
        fig.update_xaxes(title_text="seconds after stimulus", row=row, col=col)

    if col == 1:
        fig.update_yaxes(title_text="µV", row=row, col=col)

# Update layout
y_axis_range: list[float] = [min(X_nontarget_avg.min(), X_target_avg.min()), max(X_nontarget_avg.max(), X_target_avg.max())]
fig.update_layout(
    height=800, 
    width=1000, 
    showlegend=True
)
for row in range(1, num_rows + 1):
    fig.update_yaxes(range=y_axis_range, row=row, col=1)
fig.show()
Enter fullscreen mode Exit fullscreen mode

Step: Baseline pipeline

As baseline, we will implemented a pipeline that is commonly used with this type of data. It consist of a spatial filter, which means, a linear mix across all channels. The spatial filters are learned from data, such that the difference between classes is maximized. This is the XDawn part of the pipeline.

The spatial filters are followed by a standard LDA classifier.

from sklearn.metrics import get_scorer
from pyriemann.estimation import Xdawn
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
from sklearn.pipeline import make_pipeline
from sklearn.model_selection import train_test_split
from mne.decoding import Vectorizer

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)


baseline_pipeline = make_pipeline(
    Xdawn(nfilter=1), 
    Vectorizer(),
    LDA(
        solver='lsqr', 
        shrinkage='auto'
    )
)
# Fit the baseline pipeline
baseline_pipeline.fit(X_train, y_train)

# Predict on the test set
y_pred = baseline_pipeline.predict(X_test)

# Calculate the test score
test_score: float = get_scorer(paradigm.scoring)(baseline_pipeline, X_test, y_test)
print(f"Test ROC-AUC XDAWN+LDA: {test_score}")
Enter fullscreen mode Exit fullscreen mode
Test ROC-AUC XDAWN+LDA: 0.9788115284974094
Enter fullscreen mode Exit fullscreen mode

pas mal.

Step: Data preparation for the MOMENT torch mdoel

N_INPUT_SAMPLES = 512

from sklearn.preprocessing import LabelEncoder
import torch
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split

def prepare_data(X_train, X_test, y_train, y_test):
    # Convert to torch tensors
    X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
    X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
    y_train_tensor = torch.tensor(y_train, dtype=torch.long)
    y_test_tensor = torch.tensor(y_test, dtype=torch.long)

    # pad the input to 512
    X_train_tensor = torch.nn.functional.pad(X_train_tensor, (0, N_INPUT_SAMPLES - X_train_tensor.shape[-1]))
    X_test_tensor = torch.nn.functional.pad(X_test_tensor, (0, N_INPUT_SAMPLES - X_test_tensor.shape[-1]))


    input_mask: torch.Tensor = torch.cat([torch.ones(X_train.shape[-1], dtype=torch.bool), torch.zeros(N_INPUT_SAMPLES - X_train.shape[-1], dtype=torch.bool)])
    input_mask = input_mask.unsqueeze(0).repeat(X_train_tensor.shape[0], 1)


    # Create TensorDataset
    train_dataset = TensorDataset(X_train_tensor, y_train_tensor, input_mask[:X_train_tensor.shape[0]])
    test_dataset = TensorDataset(X_test_tensor, y_test_tensor, input_mask[:X_test_tensor.shape[0]])


    # Create DataLoader
    train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=False)
    test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False)
    return train_dataloader, test_dataloader

train_dataloader, test_dataloader = prepare_data(X_train, X_test, y_train, y_test)
Enter fullscreen mode Exit fullscreen mode

Just making sure that the conversion of data to torch tensors went OK

# Get tensors from dataloader
def collect_tensors(dataloader: DataLoader) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
    X_list = []
    y_list = []
    mask_list = []
    for X, y, mask in dataloader:
        X_list.append(X)
        y_list.append(y) 
        mask_list.append(mask)
    return torch.cat(X_list), torch.cat(y_list), torch.cat(mask_list)

X_train_tensor, y_train_tensor, train_input_mask = collect_tensors(train_dataloader)
X_test_tensor, y_test_tensor, test_input_mask = collect_tensors(test_dataloader)

# Apply baseline_pipeline to X_train_tensor and X_test_tensor
baseline_pipeline.fit(X_train_tensor.cpu().numpy(), y_train_tensor.cpu().numpy())

# Predict on the test set
y_pred_tensor = baseline_pipeline.predict(X_test_tensor.cpu().numpy())

# Calculate the test score
test_score_tensor: float = get_scorer(paradigm.scoring)(baseline_pipeline, X_test_tensor.cpu().numpy(), y_test_tensor.cpu().numpy())
print(f"Test ROC-AUC XDAWN+LDA: {test_score_tensor}")
Enter fullscreen mode Exit fullscreen mode
Test ROC-AUC XDAWN+LDA: 0.9788147668393783
Enter fullscreen mode Exit fullscreen mode

Calculate MOMENT embeddings

We calculate the embeddings for each channel individually. It’s important to note that this setup is inherently biased against MOMENT, as it isn’t trained to learn across channels. To address this, we reshape the data so that each channel’s embedding is obtained separately, and then we concatenate these embeddings. Consequently, the baseline pipeline extracts inter-channel information, while MOMENT does not.

from tqdm import tqdm
from momentfm import MOMENTPipeline
def get_embedding(model, dataloader):
    embeddings, labels = [], []
    with torch.no_grad():
        for batch_x, batch_labels, batch_mask in tqdm(dataloader, total=len(dataloader)):
            batch_size, n_channels, n_timesteps = batch_x.shape
            batch_x = batch_x.reshape(batch_size * n_channels, 1, n_timesteps).to(TORCH_DEVICE)
            batch_mask = batch_mask.to(TORCH_DEVICE)
            output = model(x_enc=batch_x) # [batch_size * n_channels x emb_dim]
            embedding = output.embeddings.cpu().reshape(batch_size, -1) # [batch_size x n_channels * emb_dim]
            embeddings.append(embedding)
            labels.append(batch_labels)        

    embeddings, labels = np.concatenate(embeddings), np.concatenate(labels)
    return embeddings, labels

model = MOMENTPipeline.from_pretrained(
    MODEL_NAME, 
    model_kwargs={
        'task_name': 'embedding',
    }
)
model.init()
model.to(TORCH_DEVICE)


train_embeddings, train_labels = get_embedding(model, train_dataloader)
test_embeddings, test_labels = get_embedding(model, test_dataloader)

print(train_embeddings.shape, train_labels.shape)
print(test_embeddings.shape, test_labels.shape)
Enter fullscreen mode Exit fullscreen mode
(2998, 4608) (2998,)
(1285, 4608) (1285,)
Enter fullscreen mode Exit fullscreen mode

Step: Downstream classification

We use an LDA and hyperparameter optimized SVM

from momentfm.models.statistical_classifiers import fit_svm

moment_lda_pipeline = make_pipeline(
    LDA(solver='lsqr', shrinkage='auto')
)
moment_lda_pipeline.fit(train_embeddings, train_labels)
y_pred = moment_lda_pipeline.predict(test_embeddings)
test_score = get_scorer(paradigm.scoring)(moment_lda_pipeline, test_embeddings, test_labels)
print(f"Test ROC-AUC MOMENT+LDA: {test_score}")
Enter fullscreen mode Exit fullscreen mode
Test ROC-AUC MOMENT+LDA: 0.609520725388601
Enter fullscreen mode Exit fullscreen mode
moment_svm_pipeline = fit_svm(train_embeddings, train_labels)
y_pred = moment_svm_pipeline.predict(test_embeddings)
test_score = get_scorer(paradigm.scoring)(moment_svm_pipeline, test_embeddings, test_labels)
print(f"Test ROC-AUC MOMENT+SVM: {test_score}")
Enter fullscreen mode Exit fullscreen mode
Test ROC-AUC MOMENT+SVM: 0.549556347150259
Enter fullscreen mode Exit fullscreen mode

Well, that is not very good. I suspect that it has to do with the dimensionality of the input data (number of channels x the number of embedding dimensions). Let me try once again but this time applying PCA before the LDA classifier to reduce the number of input dimensions by 90%.

from sklearn.decomposition import PCA

moment_lda_pipeline = make_pipeline(
    PCA(n_components=int(train_embeddings.shape[-1] * 0.1)),
    LDA(solver='lsqr', shrinkage='auto')
)
moment_lda_pipeline.fit(train_embeddings, train_labels)
y_pred = moment_lda_pipeline.predict(test_embeddings)
test_score = get_scorer(paradigm.scoring)(moment_lda_pipeline, test_embeddings, test_labels)
print(f"Test ROC-AUC MOMENT+PCA+LDA: {test_score}")
Enter fullscreen mode Exit fullscreen mode
Test ROC-AUC MOMENT+PCA+LDA: 0.6235427461139896
Enter fullscreen mode Exit fullscreen mode

It improved, although not as much as I would have expected.

Step: Dimensionality reduction before embeddings

I want to try one last thing. Since MOMENT is still not built to merge information across channels, we will help it a bit by spatially filtering the data before calculating the embeddings. This should substantially increase the classification score.

x_dawn = Xdawn(nfilter=1)
X_train_filtered = x_dawn.fit_transform(X_train, y_train)
X_test_filtered = x_dawn.transform(X_test)


train_dataloader_filtered, test_dataloader_filtered = prepare_data(X_train_filtered, X_test_filtered, y_train, y_test)

train_embeddings_filtered, train_labels_filtered = get_embedding(model, train_dataloader_filtered)
test_embeddings_filtered, test_labels_filtered = get_embedding(model, test_dataloader_filtered)

moment_lda_pipeline = make_pipeline(
    LDA(solver='lsqr', shrinkage='auto')
)
moment_lda_pipeline.fit(train_embeddings_filtered, train_labels_filtered)
y_pred = moment_lda_pipeline.predict(test_embeddings_filtered)
test_score = get_scorer(paradigm.scoring)(moment_lda_pipeline, test_embeddings_filtered, test_labels_filtered)
print(f"Test ROC-AUC Xdawn+MOMENT+LDA: {test_score}")
Enter fullscreen mode Exit fullscreen mode
Test ROC-AUC Xdawn+MOMENT+LDA: 0.8285330310880828
Enter fullscreen mode Exit fullscreen mode

Aha! the assumption of inter-channel independence in MOMENT appears to be a significant challenge. While we’re still not reaching the baseline’s 0.97 ROC-AUC, we’ve seen a clear improvement from 0.64 to 0.81 by spatially mixing the channels and, thus, reducing the channel count before calculating embeddings.

Remaining considerations for this exercise:
  1. Cross-subject, cross-session: I still wonder who well these embeddings work if we merge the data across different subjects. This is still a challenging topic in the BCI community. In our case, I suspect that adding cross subject data could even improve the performance, since it seems that we are still under the curse of dimensionality.
  2. Generalizability to other experiments: It would be interesting to test if this pipeline can adapt to different types of experiments. For instance, in scenarios where discriminative features are not specific amplitude peaks (like N100 or P300) but other characteristics—such as power increases within a specific frequency band over longer periods of time, as seen in Motor Imagery experiments. This is actually one of the main selling points of foundation models: decent zero-shot performance across tasks, so it would make sense to try this one out.

I plan to explore this further in the future, but I’ll leave it here for now.

TLDR;

.