Code
%config InlineBackend.figure_format ='retina'
%load_ext autoreload
%autoreload 2
%matplotlib inlineExecute the following code blocks to configure the session and import relevant modules.
%config InlineBackend.figure_format ='retina'
%load_ext autoreload
%autoreload 2
%matplotlib inlineimport os
import sys
import math
import random
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, Subset
import torch.nn.functional as F
from torch.nn.utils.rnn import pad_sequenceFor reproducibility we can set seeds:
SEED=22
torch.manual_seed(SEED)
random.seed(SEED)
np.random.seed(SEED)
torch.cuda.manual_seed_all(SEED)We add utility classes and functions for plotting and training.
from typing import Optional
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import torchmetrics
class LivePlot():
def __init__(self, left_label="Loss", right_label="Accuracy"):
self.fig = go.FigureWidget(
make_subplots(specs=[[{"secondary_y": True}]])
)
self.fig.update_yaxes(title_text=left_label, secondary_y=False)
self.fig.update_yaxes(title_text=right_label, secondary_y=True)
self.plot_indices = {}
self.trace_secondary = {}
display(self.fig)
self.limits = [0, 0]
self.current_x = 0
def report(self, name: str, value: float, secondary_y: bool = False):
try:
plot_index = self.plot_indices[name]
except KeyError:
plot_index = len(self.fig.data)
self.fig.add_scatter(
y=[], x=[], name=name,
secondary_y=secondary_y
)
self.plot_indices[name] = plot_index
self.trace_secondary[name] = secondary_y
self.fig.data[plot_index].y += (value,)
self.fig.data[plot_index].x += (self.current_x,)
def increment(self, n_ticks: int):
self.limits[1] += n_ticks
self.fig.update_layout(xaxis_range=self.limits)
def set_limit(self, n_ticks: int):
self.limits[1] = n_ticks
self.fig.update_layout(xaxis_range=self.limits)
def tick(self, n_ticks: Optional[int] = None):
if n_ticks is None:
n_ticks = 1
self.current_x += n_ticks
def train(*,
model: torch.nn.Module,
train_loader: DataLoader,
dev_loader: DataLoader,
optimizer: torch.optim.Optimizer,
criterion: torch.nn.Module,
max_epochs: int,
metric: Optional[torchmetrics.metric] = None,
device: Optional[torch.device] = None,
liveplot: Optional[LivePlot]=None):
if device is None:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)
for epoch in range(max_epochs):
training_loss_acc = 0
training_examples = 0
model.train()
for i, batch in enumerate(train_loader):
optimizer.zero_grad()
x_batch, y_batch = batch
x_batch = x_batch.to(device)
y_hat, _ = model(x_batch)
loss = criterion(y_hat, y_batch.to(device))
loss.backward()
optimizer.step()
training_loss_acc += loss.item()
training_examples += x_batch.size(0)
model.eval()
with torch.no_grad():
dev_loss_acc = 0
dev_examples = 0
dev_accuracy = 0
for i, batch in enumerate(dev_loader):
x_batch, y_batch = batch
x_batch = x_batch.to(device)
y_hat, _ = model(x_batch)
dev_loss_acc += criterion(y_hat, y_batch.to(device)).item()
dev_examples += x_batch.size(0)
if metric:
dev_accuracy += metric(torch.argmax(y_hat, -1).cpu(), y_batch)
if liveplot is not None:
liveplot.tick() # Update the liveplot time
liveplot.report("Training loss", training_loss_acc / training_examples)
liveplot.report("Development loss", dev_loss_acc / dev_examples)
if metric:
liveplot.report("Development accuracy", dev_accuracy / (i+1), secondary_y=True)In this lab the aim is to predict a character in the alphabet given a short subsequence. Basically, the network learns to output the probability distribution of a character conditional on a sequence of input characters. Since the state space is discrete, you need to think about what output activity and loss function to use.
To help you along the way, some of the steps have been prepared in advance, but in most cases, your task is to complete missing code. Don’t hesitate to change parameter settings and experiment with the model architectures.
In this section we will model input sequences as floating point values in the range [0.0, 1.0] and feed them directly to an LSTM layer.
We will work with the English alphabet, which consists of 26 characters (states). The predictions will be based on alphabet substrings, such that the model given an input “CDE” should output “F”, “STUV” “W” and so on.
alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"Since a neural network cannot deal directly with characters, we map each individual letter to an integer (integer encoding), where A maps to 1, B to 2, and so on. We reserve 0 to be a padding index, whose use will become clear further down, and set the vocabulary size to the number of letters in the alphabet plus the padding index.
char_to_int = dict((c, i + 1) for i, c in enumerate(alphabet))
int_to_char = dict((i + 1, c) for i, c in enumerate(alphabet))
PAD_IDX = 0
int_to_char[PAD_IDX] = '<PAD>'
VOCAB_SIZE = len(alphabet) + 1Training data will be generated by selecting n-tuple (n<=6) slices from the alphabet, where the output will be the last character and the input the preceding characters of a slice. The following function generates training data.
def make_training_data(num_inputs=200, max_length=5):
"""Make training data by slicing the alphabet into n-tuples, where
n is between 2 and max_length.
Args:
num_inputs (int): number of training samples to generate
max_length (int): maximum length of input sequences (n-1)
"""
dataX = []
dataY = []
for i in range(num_inputs):
start = np.random.randint(len(alphabet)-3)
end = min(start + np.random.randint(1, max_length), len(alphabet) - 2)
sequence_in = alphabet[start:end+1]
sequence_out = alphabet[end + 1]
dataX.append(torch.tensor([char_to_int[char] for char in sequence_in], dtype=torch.float32))
dataY.append(char_to_int[sequence_out])
return dataX, dataYmax_length = 5
dataX, dataY = make_training_data(max_length=max_length)Take a minute to inspect the dataX inputs. As you will see, the length of different entries differ. Prior to training, we need to pad input sequences shorter than five characters. Can you think of why this is necessary?
def pad_input_sequences(data, max_length):
"""Pad input sequences from left with padding character to max_length."""
X = pad_sequence(
data,
batch_first=True,
padding_value=PAD_IDX,
padding_side='left'
)
# reshape X to be [samples, seq_len, features]
X = np.reshape(X, (X.shape[0], max_length, 1))
return X
X = pad_input_sequences(dataX, max_length)
# one hot encode the output variable
y = nn.functional.one_hot(torch.tensor(dataY))We now have a tensor of features X and a tensor of labels y for training. We follow the pytorch idiom of constructing a custom Dataset class to hold features and labels, and initialize a dataset object that holds all data.
class AlphabetDataset(Dataset):
def __init__(self, X, y):
self.X = X # Features/tensor
self.y = y # Labels/tensor
def __getitem__(self, idx):
X = self.X[idx]
y = self.y[idx]
# Pytorch gotcha: one-hot encoded y needs to be converted to
# class indices
y = y.argmax()
return X, y
def __len__(self):
return len(self.X)dataset = AlphabetDataset(X, y)For evaluation purposes, we also want to split the data into a training and test dataset. This can be done based on the indices of the data, but make sure to shuffle before splitting to ensure independent and representative datasets. Let’s set aside 10% of the data for testing using the following code:
n_samples = len(dataset)
n_test = int(n_samples * 0.1)
indices = list(range(n_samples))
random.shuffle(indices)
test_dataset = Subset(dataset, indices[:n_test])
train_dataset = Subset(dataset, indices[n_test:])
test_dataloader = DataLoader(test_dataset, batch_size=10, shuffle=False)
train_dataloader = DataLoader(train_dataset, batch_size=10, shuffle=True)We now define the neural network model by subclassing nn.Module, where the layers are initialized in __init__. The forward function implements an operation on the input data and feeds it forward to the next layer. See the pytorch LSTM documentation for information on more parameter settings.
First see if you can complete the model below to include an lstm layer, a tanh optimizatio
class AlphabetRNN(nn.Module):
def __init__(self, input_size, hidden_size=..., vocab_size=..., num_layers=1, dropout=0.0):
super().__init__()
self.num_classes = ...
self.lstm = nn.LSTM(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True,
dropout=dropout
)
self.classifier = nn...
def indices_to_floats(self, x):
"""Convert input indices to floats in range [0, 1]"""
floats = x.float() / (self.num_classes - 1)
return floats # (batch, seq_len, 1)
def forward(self, x, hidden=None):
x = self.indices_to_floats(x)
lstm_out, (hidden, cell) = self.lstm(x, hidden)
# Get the final hidden state (last batch element, last time step)
hidden = ...
# Pass through dense layer and get logits
logits = self.classifier(...)
return logits, hidden
def predict(self, x, device="cpu"):
"""Returns probability distribution for classification."""
self.eval()
with torch.no_grad():
x = torch.tensor([[char_to_int[c] for c in x]]).unsqueeze(-1).to(device)
logits, _ = self.forward(x)
probs = F.log_softmax(logits, dim=-1) # log_softmax for loss compatibility
pred_idx = probs.argmax(dim=-1).item()
return int_to_char.get(pred_idx, '<UNK>')# solution_hidden: Suggested AlphabetRNN class
class AlphabetRNN(nn.Module):
def __init__(self, input_size, hidden_size=8, vocab_size=VOCAB_SIZE, num_layers=1, dropout=0.0):
super().__init__()
self.num_classes = vocab_size
self.lstm = nn.LSTM(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True,
dropout=dropout
)
self.classifier = nn.Linear(hidden_size, vocab_size)
def indices_to_floats(self, x):
"""Convert input indices to floats in range [0, 1]"""
floats = x.float() / (self.num_classes - 1)
return floats # (batch, seq_len, 1)
def forward(self, x, hidden=None):
x = self.indices_to_floats(x)
lstm_out, (hidden, cell) = self.lstm(x, hidden)
# Get the final hidden state (last batch element, last time step)
hidden = hidden[-1]
# Pass through dense layer and get logits
logits = self.classifier(hidden)
return logits, hidden
def predict(self, x, device="cpu"):
"""Returns probability distribution for classification."""
self.eval()
with torch.no_grad():
x = torch.tensor([[char_to_int[c] for c in x]]).unsqueeze(-1).to(device)
logits, _ = self.forward(x)
probs = F.log_softmax(logits, dim=-1) # log_softmax for loss compatibility
pred_idx = probs.argmax(dim=-1).item()
return int_to_char.get(pred_idx, '<UNK>')Finally, we initilize the model.
model = AlphabetRNN(input_size=1, hidden_size=8)Next, we choose what loss function (criterion) to use. Modify the code block below to choose a loss function that is suitable for the model. See https://docs.pytorch.org/docs/stable/nn.html#loss-functions for a complete list of PyTorch loss functions.
# Modify LossFunction below to an appropriate class
criterion = nn.LossFunction()# solution_hidden: Suggested loss function (criterion)
#
# Since we have a classification problem with 26 classes and the model
# outputs logits the natural choice is the
# [CrossEntropyLoss](https://docs.pytorch.org/docs/stable/generated/torch.nn.modules.loss.CrossEntropyLoss.html)
criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX)The last component we need is the optimizer. As usual, there is a large number of choices. What would be the appropriate choice in this case?
# Modify Optimizer below to an appropriate choice
learning_rate = 0.001
optimizer = torch.optim.Optimizer(model.parameters(), lr=learning_rate)# solution_hidden: Suggested optimizer
#
# Since RNNs are prone to gradient problems, the Adam optimizer is an
# appropriate choice as it uses per-parameter learning rates based on
# gradient history
learning_rate = 0.001
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)With loss and optimizer in place, we can train the network.
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
print(f"Using {device} device")
epochs = 400
accuracy = torchmetrics.Accuracy(task='multiclass', num_classes=VOCAB_SIZE, top_k=1)
liveplot = LivePlot()
liveplot.fig.update_layout(width=1200, height=800, font_size=18)
liveplot.increment(epochs)
train(model=model,
train_loader=train_dataloader,
dev_loader=test_dataloader,
optimizer=optimizer,
criterion=criterion,
metric=accuracy,
max_epochs=epochs,
liveplot=liveplot,
device=device)Finally, to test some predictions, you can select an entry from the input data and run model.predict. Briefly, the code will select input sequences from the training data and the model will output predictions based on an input. If you increase the number of examples you will probably see cases where the predictions are wrong.
Now we let the model do predictions on randomly selected inputs
num_examples = 2
for i in range(num_examples):
pattern_index = np.random.randint(len(dataX))
X = "".join([int_to_char[int(index)] for index in dataX[pattern_index]])
pred = model.predict(X, device)
print(f"{X:>5s} -> {pred}")If you only ran 200 epochs, the training and validation accuracies are still pretty low, and it is likely that the predictions are substantially off. Try increasing the number of epochs to see if things improve.
Our model treats each character (class) as an index, where every class is independent. We can improve the model by embedding the characters in a lower-dimensional space, where similar character sequences get similar embeddings, thereby capturing semantic similarities.
The main difference between the embedded model and the previous model is the addition of an embedding layer which is placed before the LSTM layer. The embedding layer needs to know the size of the vocabulary, which here is VOCAB_SIZE, the number of alphabet characters plus the padding index. The input type to the embedding layer must be Long or Int, and since the output consists of floats, it can be directly fed to the LSTM layer.
class EmbeddedAlphabetRNN(nn.Module):
def __init__(self, vocab_size, embedding_dim=16, hidden_dim=32, num_layers=2):
super(EmbeddedAlphabetRNN, self).__init__()
self.embedding = nn.Embedding(
vocab_size,
embedding_dim,
padding_idx=PAD_IDX
)
self.lstm = nn.LSTM(
embedding_dim,
hidden_dim,
num_layers=num_layers,
batch_first=True,
dropout=0.3, # Could add as separate layer
bidirectional=False
)
self.classifier = nn.Linear(hidden_dim, vocab_size)
def forward(self, x, hidden=None):
# x shape: (batch, seq_len)
embedded = self.embedding(x) # (batch, seq_len, embedding_dim)
lstm_out, hidden = self.lstm(embedded, hidden)
last_output = lstm_out[:, -1, :] # (batch, hidden_dim)
logits = self.classifier(last_output) # (batch, vocab_size)
return logits, hidden
def predict(self, x, device="cpu"):
"""Predict the next character from an input sequence string
Args:
x (str): an alphabet substring
"""
self.eval()
with torch.no_grad():
data = torch.tensor([[char_to_int[c] for c in x]]).to(device)
logits, _ = self.forward(data)
pred_idx = logits.argmax(dim=-1).item()
return int_to_char.get(pred_idx, '<UNK>')We regenerate a slightly large training data set and convert the padded sequences to integers for input to the embedding.
max_length = 5
dataX, dataY = make_training_data(num_inputs=1000, max_length=max_length)
# Reshape vectors and convert to integers which is required by embedding
X = pad_input_sequences(dataX, max_length).squeeze().int()
y = nn.functional.one_hot(torch.tensor(dataY))
dataset = AlphabetDataset(X, y)
n_samples = len(dataset)
n_test = int(n_samples * 0.1)
indices = list(range(n_samples))
# Set seed for reproducibility
random.seed(84)
random.shuffle(indices)
test_dataset = Subset(dataset, indices[:n_test])
train_dataset = Subset(dataset, indices[n_test:])
test_dataloader = DataLoader(test_dataset, batch_size=10, shuffle=False)
train_dataloader = DataLoader(train_dataset, batch_size=10, shuffle=True)model = EmbeddedAlphabetRNN(vocab_size=VOCAB_SIZE)
model.to(device)criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
print(f"Using {device} device")
epochs = 20
accuracy = torchmetrics.Accuracy(task='multiclass', num_classes=VOCAB_SIZE, top_k=1)
liveplot = LivePlot()
liveplot.fig.update_layout(width=1200, height=800, font_size=18)
liveplot.increment(epochs)
train(model=model,
train_loader=train_dataloader,
dev_loader=test_dataloader,
optimizer=optimizer,
criterion=criterion,
metric=accuracy,
max_epochs=epochs,
liveplot=liveplot,
device=device)num_examples = 2
for i in range(num_examples):
pattern_index = np.random.randint(len(dataX))
X = "".join([int_to_char[int(index)] for index in dataX[pattern_index]])
pred = model.predict(X, device)
print(f"{X:>5s} -> {pred}")This lab has shown you how to model character sequence data, such as you would find in protein or DNA sequences. The model performance was greatly improved by embedding characters in a lower-dimensional space.