Lab session: predicting time series, discrete state space

Alphabet lab
Author
Affiliation

Per Unneberg

NBIS

Preparations

Execute the following code blocks to configure the session and import relevant modules.

Code
%config InlineBackend.figure_format ='retina'
%load_ext autoreload
%autoreload 2
%matplotlib inline
Code
import os
import sys
import math
import random
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, Subset
import torch.nn.functional as F
from torch.nn.utils.rnn import pad_sequence

Aims

In this lab the aim is to predict a character in the alphabet given a short subsequence. Basically, the network learns to output the probability distribution of a character conditional on a sequence of input characters. Since the state space is discrete, you need to think about what output activity and loss function to use.

To help you along the way, some of the steps have been prepared in advance, but in most cases, your task is to complete missing code. Don’t hesitate to change parameter settings and experiment with the model architectures.

A simple LSTM model

In this section we will model input sequences as floating point values in the range [0.0, 1.0] and feed them directly to an LSTM layer.

Prepare data

We will work with the English alphabet, which consists of 26 characters (states). The predictions will be based on alphabet substrings, such that the model given an input “CDE” should output “F”, “STUV” “W” and so on.

Code
alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"

Since a neural network cannot deal directly with characters, we map each individual letter to an integer (integer encoding), where A maps to 1, B to 2, and so on. We reserve 0 to be a padding index, whose use will become clear further down, and set the vocabulary size to the number of letters in the alphabet plus the padding index.

Code
char_to_int = dict((c, i + 1) for i, c in enumerate(alphabet))
int_to_char = dict((i + 1, c) for i, c in enumerate(alphabet))
PAD_IDX = 0
int_to_char[PAD_IDX] = '<PAD>'
VOCAB_SIZE = len(alphabet) + 1

Training data will be generated by selecting n-tuple (n<=6) slices from the alphabet, where the output will be the last character and the input the preceding characters of a slice. The following function generates training data.

Code
def make_training_data(num_inputs=200, max_length=5):
    """Make training data by slicing the alphabet into n-tuples, where
    n is between 2 and max_length.

    Args:
        num_inputs (int): number of training samples to generate
        max_length (int): maximum length of input sequences (n-1)
    """
    dataX = []
    dataY = []
    for i in range(num_inputs):
        start = np.random.randint(len(alphabet)-3)
        end = min(start + np.random.randint(1, max_length), len(alphabet) - 2)
        sequence_in = alphabet[start:end+1]
        sequence_out = alphabet[end + 1]
        dataX.append(torch.tensor([char_to_int[char] for char in sequence_in], dtype=torch.float32))
        dataY.append(char_to_int[sequence_out])
    return dataX, dataY
Code
max_length = 5
dataX, dataY = make_training_data(max_length=max_length)

Take a minute to inspect the dataX inputs. As you will see, the length of different entries differ. Prior to training, we need to pad input sequences shorter than five characters. Can you think of why this is necessary?

Code
def pad_input_sequences(data, max_length):
    """Pad input sequences from left with padding character to max_length."""
    X = pad_sequence(
        data,
        batch_first=True,
        padding_value=PAD_IDX,
        padding_side='left'
    )
    # reshape X to be [samples, seq_len, features]
    X = np.reshape(X, (X.shape[0], max_length, 1))
    return X

X = pad_input_sequences(dataX, max_length)
# one hot encode the output variable
y = nn.functional.one_hot(torch.tensor(dataY))

We now have a tensor of features X and a tensor of labels y for training. We follow the pytorch idiom of constructing a custom Dataset class to hold features and labels, and initialize a dataset object that holds all data.

Code
class AlphabetDataset(Dataset):
    def __init__(self, X, y):
        self.X = X  # Features/tensor
        self.y = y  # Labels/tensor

    def __getitem__(self, idx):
        X = self.X[idx]
        y = self.y[idx]
        # Pytorch gotcha: one-hot encoded y needs to be converted to
        # class indices
        y = y.argmax()
        return X, y

    def __len__(self):
        return len(self.X)
Code
dataset = AlphabetDataset(X, y)

For evaluation purposes, we also want to split the data into a training and test dataset. This can be done based on the indices of the data, but make sure to shuffle before splitting to ensure independent and representative datasets. Let’s set aside 10% of the data for testing using the following code:

Code
n_samples = len(dataset)
n_test = int(n_samples * 0.1)
indices = list(range(n_samples))
# Set seed for reproducibility
random.seed(42)
random.shuffle(indices)
test_dataset = Subset(dataset, indices[:n_test])
train_dataset = Subset(dataset, indices[n_test:])
test_dataloader = DataLoader(test_dataset, batch_size=10, shuffle=False)
train_dataloader = DataLoader(train_dataset, batch_size=10, shuffle=True)

The model

We now define the neural network model by subclassing nn.Module, where the layers are initialized in __init__. The forward function implements an operation on the input data and feeds it forward to the next layer. See the pytorch LSTM documentation for information on more parameter settings

Code
class AlphabetRNN(nn.Module):
    def __init__(self, input_size, hidden_size=8, vocab_size=VOCAB_SIZE, num_layers=1, dropout=0.0):
        super().__init__()
        self.num_classes = vocab_size
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout
        )
        self.lstm_tanh = nn.Tanh()
        self.classifier = nn.Linear(hidden_size, vocab_size)

    def indices_to_floats(self, x):
        """Convert input indices to floats in range [0, 1]"""
        floats = x.float() / (self.num_classes - 1)
        return floats  # (batch, seq_len, 1)
        
    def forward(self, x, hidden=None):
        x = self.indices_to_floats(x)
        lstm_out, (hidden, cell) = self.lstm(x, hidden)
        # Get the final hidden state (last batch element, last time step)
        hidden = hidden[-1]
        # Apply tanh activation (optional in PyTorch but matches Keras)
        hidden = self.lstm_tanh(hidden)
        # Pass through dense layer and get logits
        logits = self.classifier(hidden)
        return logits, hidden

    def predict(self, x, device="cpu"):
        """Returns probability distribution for classification.
           Matches Keras categorical_crossentropy + softmax behavior.
        """
        self.eval()
        with torch.no_grad():
            x = torch.tensor([[char_to_int[c] for c in x]]).unsqueeze(-1).to(device)
            logits, _ = self.forward(x)
            probs = F.log_softmax(logits, dim=-1)  # log_softmax for loss compatibility
            pred_idx = probs.argmax(dim=-1).item()
            return int_to_char.get(pred_idx, '<UNK>')

Finally, we set the device and initilize the model.

Code
device = torch.accelerator.current_accelerator().type if torch.accelerator.is_available() else "cpu"
print(f"Using {device} device")
Code
model = AlphabetRNN(input_size=1, hidden_size=8)
model.to(device)

Optmization of parameters

Now that we have the model components in place we define a training loop that will loop through our training samples. In each iteration, we let the model generate predictions and calculate the loss with respect to a loss function. The gradient of the loss with respect to the model parameters is then calculated with backward propagation, which is then passed on to and utilized by the optimizer to update weights that will hopefully improve network performance in the next epoch. To this end, we define an explicit training function:

Code
def train_loop(dataloader, model, loss_fn, optimizer, device, loginterval=None):
    """Train model on data in dataloader subject to loss defined by
    loss_fn and optimize parameters with optimizer."""
    model.train()  # Set model to training mode
    size = len(dataloader.dataset)
    total_loss = 0
    total = 0

    for batch, (X, y) in enumerate(dataloader):
        X = X.to(device)
        y = y.to(device)
        logits, _ = model(X)
        loss = loss_fn(logits, y)

        optimizer.zero_grad()
        # Backpropagation
        loss.backward()
        # Optimization
        optimizer.step()

        total_loss += loss.item()
        _, predicted = torch.max(logits, dim=1)
        total += predicted.eq(y).sum().item()

        if loginterval is not None and batch % loginterval == 0:
            loss, current = loss.item(), batch * batch_size + len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

    accuracy = total / size
    avg_loss = total_loss / len(dataloader)
    return accuracy, avg_loss

To avoid overfitting, we also add a test function that evaluates model performance on the test dataset.

Code
def test_loop(dataloader, model, loss_fn, device, loginterval=None):
    """Evaluate model performance on dataset provided by dataloader."""
    model.eval()  # Set model to evaluation mode
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    test_loss, correct = 0, 0

    # Make sure no gradients are calculated during test mode
    with torch.no_grad():
        for X, y in dataloader:
            X = X.to(device)
            y = y.to(device)
            logits, _ = model(X)
            test_loss += loss_fn(logits, y).item()  # Access directly since we don't apply loss.backward()
            correct += (logits.argmax(1) == y).type(torch.float).sum().item()

    test_loss /= num_batches
    correct /= size
    return correct, test_loss

Next, we choose what loss function to use. Modify the code block below to choose a loss function that is suitable for the model. See https://docs.pytorch.org/docs/stable/nn.html#loss-functions for a complete list of PyTorch loss functions.

Code
# Modify LossFunction below to an appropriate class
loss_fn = nn.LossFunction()
Code
# source_hidden
# Suggested loss function
#
# Since we have a classification problem with 26 classes and the model
# outputs logits the natural choice is the
# [CrossEntropyLoss](https://docs.pytorch.org/docs/stable/generated/torch.nn.modules.loss.CrossEntropyLoss.html)
loss_fn = nn.CrossEntropyLoss(ignore_index=PAD_IDX)

The last component we need is the optimizer. As usual, there is a large number of choices. What would be the appropriate choice in this case?

Code
# Modify Optimizer below to an appropriate choice
learning_rate = 0.001
optimizer = torch.optim.Optimizer(model.parameters(), lr=learning_rate)
Code
# source_hidden
# Suggested optimizer
#
# Since RNNs are prone to gradient problems, the Adam optimizer is an
# appropriate choice as it uses per-parameter learning rates based on
# gradient history
learning_rate = 0.001
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

Now we can run iterate over training and test functions to optimize model parameters. Each iteration is called an epoch. We record metrics for each epoch so that we can assess the model performance over time.

Code
epochs = 200
# Print log message every epoch_loginterval iteration
epoch_loginterval = 10
# Dictionary to store validation metrics; use same keys as Keras
metrics = {'accuracy': [], 'loss': [], 'val_accuracy': [], 'val_loss': []}
for t in range(epochs):
    if (t+1) % epoch_loginterval == 0:
        print(f"Epoch {t+1}/{epochs}\n-------------------------------")
    accuracy, loss = train_loop(train_dataloader, model, loss_fn, optimizer, device)
    metrics["accuracy"].append(accuracy)
    metrics["loss"].append(loss)
    if t % epoch_loginterval == 0:
        print(f"  Train: Loss: {loss:.4f}, Accuracy: {(100*accuracy):>0.1f}%")
    val_accuracy, val_loss = test_loop(test_dataloader, model, loss_fn, device)
    metrics["val_accuracy"].append(val_accuracy)
    metrics["val_loss"].append(val_loss)
    if t % epoch_loginterval == 0:
        print(f"  Test: Val loss: {val_loss:>8f}, Accuracy: {(100*val_accuracy):>0.1f}%\n")
print("Done!")

In order to get a nicer overview of how training has progressed, you can plot the training and validation metrics with a utility function defined in the rnnutils module.

Code
# rnnutils.plot_loss_acc(metrics)

Printing predictions

Finally, to test some predictions, you can select an entry from the input data and run model.predict. Briefly, the code will select input sequences from the training data and the model will output predictions based on an input. If you increase the number of examples you will probably see cases where the predictions are wrong.

Now we let the model do predictions on randomly selected inputs

Code
num_examples = 2
for i in range(num_examples):
    pattern_index = np.random.randint(len(dataX))
    X = "".join([int_to_char[int(index)] for index in dataX[pattern_index]])
    pred = model.predict(X, device)
    print(f"{X:>5s} -> {pred}")

If you only ran 200 epochs, the training and validation accuracies are still pretty low, and it is likely that the predictions are substantially off. Try increasing the number of epochs to see if things improve.

An improved model

Our model treats each character (class) as an index, where every class is independent. We can improve the model by embedding the characters in a lower-dimensional space, where similar character sequences get similar embeddings, thereby capturing semantic similarities.

The embedded model

The main difference between the embedded model and the previous model is the addition of an embedding layer which is placed before the LSTM layer. The embedding layer needs to know the size of the vocabulary, which here is VOCAB_SIZE, the number of alphabet characters plus the padding index. The input type to the embedding layer must be Long or Int, and since the output consists of floats, it can be directly fed to the LSTM layer.

Code
class EmbeddedAlphabetRNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim=16, hidden_dim=32, num_layers=2):
        super(EmbeddedAlphabetRNN, self).__init__()

        self.embedding = nn.Embedding(
            vocab_size,
            embedding_dim,
            padding_idx=PAD_IDX
        )
        self.lstm = nn.LSTM(
            embedding_dim,
            hidden_dim,
            num_layers=num_layers,
            batch_first=True,
            dropout=0.3,  # Could add as separate layer
            bidirectional=False
        )
        self.classifier = nn.Linear(hidden_dim, vocab_size)
          
    def forward(self, x, hidden=None):
        # x shape: (batch, seq_len)
        embedded = self.embedding(x)  # (batch, seq_len, embedding_dim)
        lstm_out, hidden = self.lstm(embedded, hidden)
        
        last_output = lstm_out[:, -1, :]      # (batch, hidden_dim)
        logits = self.classifier(last_output) # (batch, vocab_size)
        
        return logits, hidden

    def predict(self, x, device="cpu"):
        """Predict the next character from an input sequence string

        Args:
          x (str): an alphabet substring
        """
        self.eval()
        with torch.no_grad():
            data = torch.tensor([[char_to_int[c] for c in x]]).to(device)
            logits, _ = self.forward(data)
            pred_idx = logits.argmax(dim=-1).item()
            return int_to_char.get(pred_idx, '<UNK>')

Training data

We regenerate a slightly large training data set and convert the padded sequences to integers for input to the embedding.

Code
max_length = 5
dataX, dataY = make_training_data(num_inputs=1000, max_length=max_length)
# Reshape vectors and convert to integers which is required by embedding
X = pad_input_sequences(dataX, max_length).squeeze().int()
y = nn.functional.one_hot(torch.tensor(dataY))
dataset = AlphabetDataset(X, y)
n_samples = len(dataset)
n_test = int(n_samples * 0.1)
indices = list(range(n_samples))
# Set seed for reproducibility
random.seed(84)
random.shuffle(indices)
test_dataset = Subset(dataset, indices[:n_test])
train_dataset = Subset(dataset, indices[n_test:])
test_dataloader = DataLoader(test_dataset, batch_size=10, shuffle=False)
train_dataloader = DataLoader(train_dataset, batch_size=10, shuffle=True)

The model, loss function and optimizer

Code
model = EmbeddedAlphabetRNN(vocab_size=VOCAB_SIZE)
model.to(device)
Code
loss_fn = nn.CrossEntropyLoss(ignore_index=0)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

Training

Code
epochs = 200
# Print log message every epoch_loginterval iteration
epoch_loginterval = 10
# Dictionary to store validation metrics; use same keys as Keras
metrics = {'accuracy': [], 'loss': [], 'val_accuracy': [], 'val_loss': []}
for t in range(epochs):
    if (t+1) % epoch_loginterval == 0:
        print(f"Epoch {t+1}/{epochs}\n-------------------------------")
    accuracy, loss = train_loop(train_dataloader, model, loss_fn, optimizer, device)
    metrics["accuracy"].append(accuracy)
    metrics["loss"].append(loss)
    if t % epoch_loginterval == 0:
        print(f"  Train: Loss: {loss:.4f}, Accuracy: {(100*accuracy):>0.1f}%")
    val_accuracy, val_loss = test_loop(test_dataloader, model, loss_fn, device)
    metrics["val_accuracy"].append(val_accuracy)
    metrics["val_loss"].append(val_loss)
    if t % epoch_loginterval == 0:
        print(f"  Test: Val loss: {val_loss:>8f}, Accuracy: {(100*val_accuracy):>0.1f}%\n")
print("Done!")
Code
# rnnutils.plot_loss_acc(metrics)

Prediction

Code
num_examples = 2
for i in range(num_examples):
    pattern_index = np.random.randint(len(dataX))
    X = "".join([int_to_char[int(index)] for index in dataX[pattern_index]])
    pred = model.predict(X, device)
    print(f"{X:>5s} -> {pred}")

Summary