Lab session: predicting time series, discrete state space

Alphabet lab
Author
Affiliation

Per Unneberg

NBIS

Published

May 6, 2026

Preparation

Configuration

Execute the following code blocks to configure the session and import relevant modules.

Code
%config InlineBackend.figure_format ='retina'
%load_ext autoreload
%autoreload 2
%matplotlib inline
Code
import os
import sys
import math
import random
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, Subset
import torch.nn.functional as F
from torch.nn.utils.rnn import pad_sequence

For reproducibility we can set seeds:

Code
SEED=22
torch.manual_seed(SEED)
random.seed(SEED)
np.random.seed(SEED)
torch.cuda.manual_seed_all(SEED)

Utility classes and functions

We add utility classes and functions for plotting and training.

Code
from typing import Optional
import plotly.graph_objects as go
from plotly.subplots import make_subplots

import torchmetrics

class LivePlot():
    def __init__(self, left_label="Loss", right_label="Accuracy"):
        self.fig = go.FigureWidget(
            make_subplots(specs=[[{"secondary_y": True}]])
        )
        self.fig.update_yaxes(title_text=left_label,  secondary_y=False)
        self.fig.update_yaxes(title_text=right_label, secondary_y=True)

        self.plot_indices = {}
        self.trace_secondary = {}
        display(self.fig)
        self.limits = [0, 0]
        self.current_x = 0

    def report(self, name: str, value: float, secondary_y: bool = False):
        try:
            plot_index = self.plot_indices[name]
        except KeyError:
            plot_index = len(self.fig.data)
            self.fig.add_scatter(
                y=[], x=[], name=name,
                secondary_y=secondary_y
            )
            self.plot_indices[name] = plot_index
            self.trace_secondary[name] = secondary_y
        self.fig.data[plot_index].y += (value,)
        self.fig.data[plot_index].x += (self.current_x,)

    def increment(self, n_ticks: int):
        self.limits[1] += n_ticks
        self.fig.update_layout(xaxis_range=self.limits)

    def set_limit(self, n_ticks: int):
        self.limits[1] = n_ticks
        self.fig.update_layout(xaxis_range=self.limits)

    def tick(self, n_ticks: Optional[int] = None):
        if n_ticks is None:
            n_ticks = 1
        self.current_x += n_ticks

def train(*,
          model: torch.nn.Module,
          train_loader: DataLoader,
          dev_loader: DataLoader,
          optimizer: torch.optim.Optimizer,
          criterion: torch.nn.Module,
          max_epochs: int,
          metric: Optional[torchmetrics.metric] = None,
          device: Optional[torch.device] = None,
          liveplot: Optional[LivePlot]=None):
    if device is None:
        device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

    model.to(device)

    for epoch in range(max_epochs):
        training_loss_acc = 0
        training_examples = 0
        model.train()

        for i, batch in enumerate(train_loader):
            optimizer.zero_grad()

            x_batch, y_batch = batch
            x_batch = x_batch.to(device)
            y_hat, _ = model(x_batch)

            loss = criterion(y_hat, y_batch.to(device))
            loss.backward()

            optimizer.step()
            training_loss_acc += loss.item()
            training_examples += x_batch.size(0)

        model.eval()
        with torch.no_grad():
            dev_loss_acc = 0
            dev_examples = 0
            dev_accuracy = 0
            for i, batch in enumerate(dev_loader):
                x_batch, y_batch = batch
                x_batch = x_batch.to(device)
                y_hat, _ = model(x_batch)
                dev_loss_acc += criterion(y_hat, y_batch.to(device)).item()
                dev_examples += x_batch.size(0)
                if metric:
                    dev_accuracy += metric(torch.argmax(y_hat, -1).cpu(), y_batch)

        if liveplot is not None:
            liveplot.tick() # Update the liveplot time
            liveplot.report("Training loss", training_loss_acc / training_examples)
            liveplot.report("Development loss", dev_loss_acc / dev_examples)
            if metric:
                liveplot.report("Development accuracy", dev_accuracy / (i+1), secondary_y=True)

Aims

In this lab the aim is to predict a character in the alphabet given a short subsequence. Basically, the network learns to output the probability distribution of a character conditional on a sequence of input characters. Since the state space is discrete, you need to think about what output activity and loss function to use.

To help you along the way, some of the steps have been prepared in advance, but in most cases, your task is to complete missing code. Don’t hesitate to change parameter settings and experiment with the model architectures.

A simple LSTM model

In this section we will model input sequences as floating point values in the range [0.0, 1.0] and feed them directly to an LSTM layer.

Prepare data

We will work with the English alphabet, which consists of 26 characters (states). The predictions will be based on alphabet substrings, such that the model given an input “CDE” should output “F”, “STUV” “W” and so on.

Code
alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"

Since a neural network cannot deal directly with characters, we map each individual letter to an integer (integer encoding), where A maps to 1, B to 2, and so on. We reserve 0 to be a padding index, whose use will become clear further down, and set the vocabulary size to the number of letters in the alphabet plus the padding index.

Code
char_to_int = dict((c, i + 1) for i, c in enumerate(alphabet))
int_to_char = dict((i + 1, c) for i, c in enumerate(alphabet))
PAD_IDX = 0
int_to_char[PAD_IDX] = '<PAD>'
VOCAB_SIZE = len(alphabet) + 1

Training data will be generated by selecting n-tuple (n<=6) slices from the alphabet, where the output will be the last character and the input the preceding characters of a slice. The following function generates training data.

Code
def make_training_data(num_inputs=200, max_length=5):
    """Make training data by slicing the alphabet into n-tuples, where
    n is between 2 and max_length.

    Args:
        num_inputs (int): number of training samples to generate
        max_length (int): maximum length of input sequences (n-1)
    """
    dataX = []
    dataY = []
    for i in range(num_inputs):
        start = np.random.randint(len(alphabet)-3)
        end = min(start + np.random.randint(1, max_length), len(alphabet) - 2)
        sequence_in = alphabet[start:end+1]
        sequence_out = alphabet[end + 1]
        dataX.append(torch.tensor([char_to_int[char] for char in sequence_in], dtype=torch.float32))
        dataY.append(char_to_int[sequence_out])
    return dataX, dataY
Code
max_length = 5
dataX, dataY = make_training_data(max_length=max_length)

Take a minute to inspect the dataX inputs. As you will see, the length of different entries differ. Prior to training, we need to pad input sequences shorter than five characters. Can you think of why this is necessary?

Code
def pad_input_sequences(data, max_length):
    """Pad input sequences from left with padding character to max_length."""
    X = pad_sequence(
        data,
        batch_first=True,
        padding_value=PAD_IDX,
        padding_side='left'
    )
    # reshape X to be [samples, seq_len, features]
    X = np.reshape(X, (X.shape[0], max_length, 1))
    return X

X = pad_input_sequences(dataX, max_length)
# one hot encode the output variable
y = nn.functional.one_hot(torch.tensor(dataY))

We now have a tensor of features X and a tensor of labels y for training. We follow the pytorch idiom of constructing a custom Dataset class to hold features and labels, and initialize a dataset object that holds all data.

Code
class AlphabetDataset(Dataset):
    def __init__(self, X, y):
        self.X = X  # Features/tensor
        self.y = y  # Labels/tensor

    def __getitem__(self, idx):
        X = self.X[idx]
        y = self.y[idx]
        # Pytorch gotcha: one-hot encoded y needs to be converted to
        # class indices
        y = y.argmax()
        return X, y

    def __len__(self):
        return len(self.X)
Code
dataset = AlphabetDataset(X, y)

For evaluation purposes, we also want to split the data into a training and test dataset. This can be done based on the indices of the data, but make sure to shuffle before splitting to ensure independent and representative datasets. Let’s set aside 10% of the data for testing using the following code:

Code
n_samples = len(dataset)
n_test = int(n_samples * 0.1)
indices = list(range(n_samples))
random.shuffle(indices)
test_dataset = Subset(dataset, indices[:n_test])
train_dataset = Subset(dataset, indices[n_test:])
test_dataloader = DataLoader(test_dataset, batch_size=10, shuffle=False)
train_dataloader = DataLoader(train_dataset, batch_size=10, shuffle=True)

The model

We now define the neural network model by subclassing nn.Module, where the layers are initialized in __init__. The forward function implements an operation on the input data and feeds it forward to the next layer. See the pytorch LSTM documentation for information on more parameter settings.

First see if you can complete the model below to include an lstm layer, a tanh optimizatio

Code
class AlphabetRNN(nn.Module):
    def __init__(self, input_size, hidden_size=..., vocab_size=..., num_layers=1, dropout=0.0):
        super().__init__()
        self.num_classes = ...
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout
        )
        self.classifier = nn...

    def indices_to_floats(self, x):
        """Convert input indices to floats in range [0, 1]"""
        floats = x.float() / (self.num_classes - 1)
        return floats  # (batch, seq_len, 1)

    def forward(self, x, hidden=None):
        x = self.indices_to_floats(x)
        lstm_out, (hidden, cell) = self.lstm(x, hidden)
        # Get the final hidden state (last batch element, last time step)
        hidden = ...
        # Pass through dense layer and get logits
        logits = self.classifier(...)
        return logits, hidden

    def predict(self, x, device="cpu"):
        """Returns probability distribution for classification."""
        self.eval()
        with torch.no_grad():
            x = torch.tensor([[char_to_int[c] for c in x]]).unsqueeze(-1).to(device)
            logits, _ = self.forward(x)
            probs = F.log_softmax(logits, dim=-1)  # log_softmax for loss compatibility
            pred_idx = probs.argmax(dim=-1).item()
            return int_to_char.get(pred_idx, '<UNK>')
Code
# solution_hidden: Suggested AlphabetRNN class
class AlphabetRNN(nn.Module):
    def __init__(self, input_size, hidden_size=8, vocab_size=VOCAB_SIZE, num_layers=1, dropout=0.0):
        super().__init__()
        self.num_classes = vocab_size
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout
        )
        self.classifier = nn.Linear(hidden_size, vocab_size)

    def indices_to_floats(self, x):
        """Convert input indices to floats in range [0, 1]"""
        floats = x.float() / (self.num_classes - 1)
        return floats  # (batch, seq_len, 1)
        
    def forward(self, x, hidden=None):
        x = self.indices_to_floats(x)
        lstm_out, (hidden, cell) = self.lstm(x, hidden)
        # Get the final hidden state (last batch element, last time step)
        hidden = hidden[-1]
        # Pass through dense layer and get logits
        logits = self.classifier(hidden)
        return logits, hidden

    def predict(self, x, device="cpu"):
        """Returns probability distribution for classification."""
        self.eval()
        with torch.no_grad():
            x = torch.tensor([[char_to_int[c] for c in x]]).unsqueeze(-1).to(device)
            logits, _ = self.forward(x)
            probs = F.log_softmax(logits, dim=-1)  # log_softmax for loss compatibility
            pred_idx = probs.argmax(dim=-1).item()
            return int_to_char.get(pred_idx, '<UNK>')

Finally, we initilize the model.

Code
model = AlphabetRNN(input_size=1, hidden_size=8)

Training

Next, we choose what loss function (criterion) to use. Modify the code block below to choose a loss function that is suitable for the model. See https://docs.pytorch.org/docs/stable/nn.html#loss-functions for a complete list of PyTorch loss functions.

Code
# Modify LossFunction below to an appropriate class
criterion = nn.LossFunction()
Code
# solution_hidden: Suggested loss function (criterion)
#
# Since we have a classification problem with 26 classes and the model
# outputs logits the natural choice is the
# [CrossEntropyLoss](https://docs.pytorch.org/docs/stable/generated/torch.nn.modules.loss.CrossEntropyLoss.html)
criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX)

The last component we need is the optimizer. As usual, there is a large number of choices. What would be the appropriate choice in this case?

Code
# Modify Optimizer below to an appropriate choice
learning_rate = 0.001
optimizer = torch.optim.Optimizer(model.parameters(), lr=learning_rate)
Code
# solution_hidden: Suggested optimizer
#
# Since RNNs are prone to gradient problems, the Adam optimizer is an
# appropriate choice as it uses per-parameter learning rates based on
# gradient history
learning_rate = 0.001
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

With loss and optimizer in place, we can train the network.

Code
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
print(f"Using {device} device")
epochs = 400
accuracy = torchmetrics.Accuracy(task='multiclass', num_classes=VOCAB_SIZE, top_k=1)
liveplot = LivePlot()
liveplot.fig.update_layout(width=1200, height=800, font_size=18)
liveplot.increment(epochs)

train(model=model,
      train_loader=train_dataloader,
      dev_loader=test_dataloader,
      optimizer=optimizer,
      criterion=criterion,
      metric=accuracy,
      max_epochs=epochs,
      liveplot=liveplot,
      device=device)

Printing predictions

Finally, to test some predictions, you can select an entry from the input data and run model.predict. Briefly, the code will select input sequences from the training data and the model will output predictions based on an input. If you increase the number of examples you will probably see cases where the predictions are wrong.

Now we let the model do predictions on randomly selected inputs

Code
num_examples = 2
for i in range(num_examples):
    pattern_index = np.random.randint(len(dataX))
    X = "".join([int_to_char[int(index)] for index in dataX[pattern_index]])
    pred = model.predict(X, device)
    print(f"{X:>5s} -> {pred}")

If you only ran 200 epochs, the training and validation accuracies are still pretty low, and it is likely that the predictions are substantially off. Try increasing the number of epochs to see if things improve.

An improved model

Our model treats each character (class) as an index, where every class is independent. We can improve the model by embedding the characters in a lower-dimensional space, where similar character sequences get similar embeddings, thereby capturing semantic similarities.

The embedded model

The main difference between the embedded model and the previous model is the addition of an embedding layer which is placed before the LSTM layer. The embedding layer needs to know the size of the vocabulary, which here is VOCAB_SIZE, the number of alphabet characters plus the padding index. The input type to the embedding layer must be Long or Int, and since the output consists of floats, it can be directly fed to the LSTM layer.

Code
class EmbeddedAlphabetRNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim=16, hidden_dim=32, num_layers=2):
        super(EmbeddedAlphabetRNN, self).__init__()

        self.embedding = nn.Embedding(
            vocab_size,
            embedding_dim,
            padding_idx=PAD_IDX
        )
        self.lstm = nn.LSTM(
            embedding_dim,
            hidden_dim,
            num_layers=num_layers,
            batch_first=True,
            dropout=0.3,  # Could add as separate layer
            bidirectional=False
        )
        self.classifier = nn.Linear(hidden_dim, vocab_size)
          
    def forward(self, x, hidden=None):
        # x shape: (batch, seq_len)
        embedded = self.embedding(x)  # (batch, seq_len, embedding_dim)
        lstm_out, hidden = self.lstm(embedded, hidden)
        
        last_output = lstm_out[:, -1, :]      # (batch, hidden_dim)
        logits = self.classifier(last_output) # (batch, vocab_size)
        
        return logits, hidden

    def predict(self, x, device="cpu"):
        """Predict the next character from an input sequence string

        Args:
          x (str): an alphabet substring
        """
        self.eval()
        with torch.no_grad():
            data = torch.tensor([[char_to_int[c] for c in x]]).to(device)
            logits, _ = self.forward(data)
            pred_idx = logits.argmax(dim=-1).item()
            return int_to_char.get(pred_idx, '<UNK>')

Training data

We regenerate a slightly large training data set and convert the padded sequences to integers for input to the embedding.

Code
max_length = 5
dataX, dataY = make_training_data(num_inputs=1000, max_length=max_length)
# Reshape vectors and convert to integers which is required by embedding
X = pad_input_sequences(dataX, max_length).squeeze().int()
y = nn.functional.one_hot(torch.tensor(dataY))
dataset = AlphabetDataset(X, y)
n_samples = len(dataset)
n_test = int(n_samples * 0.1)
indices = list(range(n_samples))
# Set seed for reproducibility
random.seed(84)
random.shuffle(indices)
test_dataset = Subset(dataset, indices[:n_test])
train_dataset = Subset(dataset, indices[n_test:])
test_dataloader = DataLoader(test_dataset, batch_size=10, shuffle=False)
train_dataloader = DataLoader(train_dataset, batch_size=10, shuffle=True)

The model, loss function and optimizer

Code
model = EmbeddedAlphabetRNN(vocab_size=VOCAB_SIZE)
model.to(device)
Code
criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

Training

Code
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
print(f"Using {device} device")
epochs = 20
accuracy = torchmetrics.Accuracy(task='multiclass', num_classes=VOCAB_SIZE, top_k=1)
liveplot = LivePlot()
liveplot.fig.update_layout(width=1200, height=800, font_size=18)
liveplot.increment(epochs)

train(model=model,
      train_loader=train_dataloader,
      dev_loader=test_dataloader,
      optimizer=optimizer,
      criterion=criterion,
      metric=accuracy,
      max_epochs=epochs,
      liveplot=liveplot,
      device=device)

Prediction

Code
num_examples = 2
for i in range(num_examples):
    pattern_index = np.random.randint(len(dataX))
    X = "".join([int_to_char[int(index)] for index in dataX[pattern_index]])
    pred = model.predict(X, device)
    print(f"{X:>5s} -> {pred}")

Summary

This lab has shown you how to model character sequence data, such as you would find in protein or DNA sequences. The model performance was greatly improved by embedding characters in a lower-dimensional space.