ScaDaMaLe Course site and book

Prediction with time series model - Gaussian Processes

This notebook contains time series prediction with gaussian processes. The data used for prediction is new cases (smoothed) and new deaths (smoothed) for both an aggregated number of countries in the world and for Sweden. To implement the gaussian process model, the python package Gpytorch is used.

  1. Install, import, load and preprocess data

Install, import and execute the other relevant notebooks here to load and preprocess data...

pip install gpytorch
Python interpreter will be restarted. Collecting gpytorch Downloading gpytorch-1.3.0.tar.gz (283 kB) Building wheels for collected packages: gpytorch Building wheel for gpytorch (setup.py): started Building wheel for gpytorch (setup.py): finished with status 'done' Created wheel for gpytorch: filename=gpytorch-1.3.0-py2.py3-none-any.whl size=473796 sha256=5882e250a68a9042a1e51e11617837c2e922878bd22e515cf9459b217c96ba2b Stored in directory: /root/.cache/pip/wheels/1d/f0/2c/2146864c1f7bd8a844c4143115c05c392da763fd8b249adb9d Successfully built gpytorch Installing collected packages: gpytorch Successfully installed gpytorch-1.3.0 Python interpreter will be restarted.
# python imports import gpytorch as gpth import torch as th import matplotlib.pyplot as plt import numpy as np
"./02_DataPreprocess"
  1. Additional data preprocessing in Scala

// define dataframe summing up the new cases smoothed for each date val df_ncworld = df_cleaned_time_series.groupBy("date").sum("new_cases_smoothed").sort(col("date")).withColumnRenamed("sum(new_cases_smoothed)","new_cases_smoothed") display(df_ncworld)

// define dataframe summing up the new deaths smoothed for each date val df_ndworld = df_cleaned_time_series.groupBy("date").sum("new_deaths_smoothed").sort(col("date")).withColumnRenamed("sum(new_deaths_smoothed)","new_deaths_smoothed") display(df_ndworld)

// Add a time index for the date import org.apache.spark.sql.expressions.Window val window_spec = Window.orderBy($"date") val df_ncworld_indexed = df_ncworld.withColumn("time_idx",row_number.over(window_spec)) val df_ndworld_indexed = df_ndworld.withColumn("time_idx",row_number.over(window_spec)) display(df_ncworld_indexed)
// Get max and min of time index import org.apache.spark.sql.functions.{min, max} import org.apache.spark.sql.Row val id_maxmin = df_ncworld_indexed.agg(max("time_idx"), min("time_idx")).head() val id_max: Int = id_maxmin.getInt(0) val id_min: Int = id_maxmin.getInt(1)
import org.apache.spark.sql.functions.{min, max} import org.apache.spark.sql.Row id_maxmin: org.apache.spark.sql.Row = [316,1] id_max: Int = 316 id_min: Int = 1

Extract a window for prediction

// define training and test data intervalls. test data is set to 10% of the total dataset time length. val test_wnd: Int = (0.1*id_max).toInt val train_wnd: Int = (0.9*id_max).toInt val df_ncworld_train = df_ncworld_indexed.where($"time_idx" > id_max-train_wnd-test_wnd && $"time_idx" <= id_max-test_wnd) val df_ncworld_test = df_ncworld_indexed.where($"time_idx" > id_max-test_wnd && $"time_idx" <= id_max) val df_ndworld_train = df_ndworld_indexed.where($"time_idx" > id_max-train_wnd-test_wnd && $"time_idx" <= id_max-test_wnd) val df_ndworld_test = df_ndworld_indexed.where($"time_idx" > id_max-test_wnd && $"time_idx" <= id_max) display(df_ncworld_test)

Convert to python for further processing

df_ncworld_train.createOrReplaceTempView("df_ncworld_train") df_ncworld_test.createOrReplaceTempView("df_ncworld_test") df_ndworld_train.createOrReplaceTempView("df_ndworld_train") df_ndworld_test.createOrReplaceTempView("df_ndworld_test")
df_ncworld_train = spark.table("df_ncworld_train") df_ncworld_test = spark.table("df_ncworld_test") df_ndworld_train = spark.table("df_ndworld_train") df_ndworld_test = spark.table("df_ndworld_test")
val df_ncdenswe = df_cleaned_time_series.select($"location", $"date", $"new_cases_smoothed").where(expr("location = 'Sweden' or location = 'Denmark'")) val df_nddenswe = df_cleaned_time_series.select($"location", $"date", $"new_deaths_smoothed").where(expr("location = 'Sweden' or location = 'Denmark'"))
df_ncdenswe: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [location: string, date: string ... 1 more field] df_nddenswe: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [location: string, date: string ... 1 more field]
// Add a time index for the date import org.apache.spark.sql.expressions.Window val window_spec = Window.partitionBy("location").orderBy($"date") val df_ncdenswe_indexed = df_ncdenswe.withColumn("time_idx",row_number.over(window_spec)) display(df_ncdenswe_indexed) val df_nddenswe_indexed = df_nddenswe.withColumn("time_idx",row_number.over(window_spec)) display(df_nddenswe_indexed)
val test_wnd: Int = (0.1*id_max).toInt val train_wnd: Int = (0.9*id_max).toInt val df_ncdenswe_train = df_ncdenswe_indexed.where($"time_idx" > id_max-train_wnd-test_wnd && $"time_idx" <= id_max-test_wnd) val df_ncdenswe_test = df_ncdenswe_indexed.where($"time_idx" > id_max-test_wnd && $"time_idx" <= id_max) val df_nddenswe_train = df_nddenswe_indexed.where($"time_idx" > id_max-train_wnd-test_wnd && $"time_idx" <= id_max-test_wnd) val df_nddenswe_test = df_nddenswe_indexed.where($"time_idx" > id_max-test_wnd && $"time_idx" <= id_max) display(df_ncdenswe_test)
df_ncdenswe_train.createOrReplaceTempView("df_ncdenswe_train") df_ncdenswe_test.createOrReplaceTempView("df_ncdenswe_test") df_nddenswe_train.createOrReplaceTempView("df_nddenswe_train") df_nddenswe_test.createOrReplaceTempView("df_nddenswe_test")
df_ncdenswe_train = spark.table("df_ncdenswe_train") df_ncdenswe_test = spark.table("df_ncdenswe_test") df_nddenswe_train = spark.table("df_nddenswe_train") df_nddenswe_test = spark.table("df_nddenswe_test")
  1. Time series prediction with Gaussian Processes

In this section we perform predictions based on the input data. Some additional preprocessing in Python is done as well. The transition from Scala to Python is motivated by the use of the python package Gpytorch for implementing the gaussian process model.

3.1 World multistep prediction

As similar operations are performed for processing data, a class is first defined to enable code reuse

from pyspark.sql.functions import col import matplotlib.pyplot as plt class GPDataSet(): def __init__(self, df_train, df_test, datacol, filterloc = None, add_input = None): """ class for processing input data to GP. As similar code is reused, this class enables some code reuse. param: 'df_train', training data dataframe param: 'df_test', test data dataframe param: 'datacol', data column in dataframe to perform predictions on, e.g. 'new_cases_smoothed' param: 'filterloc', location column in dataframe to perform predictions on, e.g. 'Sweden' param: 'add_input', additional location column in dataframe to use as input for predictions, e.g. 'Denmark' """ self.df_train = df_train self.df_test = df_test self.datacol = datacol self.filterloc = filterloc self.add_input = add_input self.num_xdim = None def convert_to_numpy(self): """ convert dataframe to numpy arrays. This process may takes a while. """ # if no filter for location is specified if self.filterloc is None: x_train_np = np.array(self.df_train.orderBy("time_idx").select("time_idx").rdd.map(lambda x: x[0]).collect()) x_test_np = np.array(self.df_test.orderBy("time_idx").select("time_idx").rdd.map(lambda x: x[0]).collect()) y_train_np = np.array(self.df_train.orderBy("time_idx").select(self.datacol).rdd.map(lambda x: x[0]).collect()) y_test_np = np.array(self.df_test.orderBy("time_idx").select(self.datacol).rdd.map(lambda x: x[0]).collect()) num_xdim = 1 # if a filter for location is specified else: if self.add_input is None: x_train_np = np.array(self.df_train.filter(col("location") == self.filterloc).orderBy("time_idx").select("time_idx").rdd.map(lambda x: x[0]).collect()) x_test_np = np.array(self.df_test.filter(col("location") == self.filterloc).orderBy("time_idx").select("time_idx").rdd.map(lambda x: x[0]).collect()) num_xdim = 1 # if prediction should add additional input from e.g. a neighbouring country else: x_train_time = np.array(self.df_train.filter(col("location") == self.filterloc).orderBy("time_idx").select("time_idx").rdd.map(lambda x: x[0]).collect()) x_test_time = np.array(self.df_test.filter(col("location") == self.filterloc).orderBy("time_idx").select("time_idx").rdd.map(lambda x: x[0]).collect()) x_train_add = np.array(self.df_train.filter(col("location") == self.add_input).orderBy("time_idx").select(self.datacol).rdd.map(lambda x: x[0]).collect()) x_test_add = np.array(self.df_test.filter(col("location") == self.add_input).orderBy("time_idx").select(self.datacol).rdd.map(lambda x: x[0]).collect()) x_train = np.stack((x_train_time, x_train_add), axis=0) x_test = np.stack((x_test_time, x_test_add), axis=0) x_train_np = np.moveaxis(x_train, 1, 0) x_test_np = np.moveaxis(x_test, 1, 0) num_xdim = 2 # output data y_train_np = np.array(self.df_train.filter(col("location") == self.filterloc).orderBy("time_idx").select(self.datacol).rdd.map(lambda x: x[0]).collect()) y_test_np = np.array(self.df_test.filter(col("location") == self.filterloc).orderBy("time_idx").select(self.datacol).rdd.map(lambda x: x[0]).collect()) self.x_train_np = x_train_np self.x_test_np = x_test_np self.y_train_np = y_train_np self.y_test_np = y_test_np self.num_xdim = num_xdim def plot_numpy_data(self): """ plot numpy arrays """ if self.num_xdim == 2: fig, (ax1, ax2) = plt.subplots(1,2, figsize=(12,6)) ax1.plot(self.x_train_np[:,0], self.y_train_np, 'k*') ax1.legend(['train data']) ax1.set_xlabel('time [days]') ax1.set_ylabel('output') ax1.set_title('training data') ax1.grid() ax2.plot(self.x_train_np[:,0], self.x_train_np[:,1], 'k*') ax2.legend(['train data']) ax2.set_xlabel('time [days]') ax2.set_ylabel('additional input') ax2.set_title('training data') ax2.grid() else: fig, ax = plt.subplots(1,1, figsize=(12,6)) ax.plot(self.x_train_np, self.y_train_np, 'k*') ax.legend(['train data']) ax.set_xlabel('time [days]') ax.set_ylabel('output') ax.set_title('training data') ax.grid() def get_train_length(self): if self.num_xdim == 2: return len(self.x_train_np[:,0]) else: return len(self.x_train_np) def process_numpy_data(self, nth_subsample = 4, window_red = 0.8): """ reduction of data by subsampling data and reducing length of data window. """ assert window_red > 0 and window_red <= 1, "please adjust 'window_red' parameter to be between 0 and 1" start_idx = int((self.get_train_length())*window_red) self.x_train = th.tensor(self.x_train_np[start_idx::nth_subsample], dtype=th.float) self.x_test = th.tensor(self.x_test_np, dtype=th.float) self.y_train = th.tensor(self.y_train_np[start_idx::nth_subsample], dtype=th.float) self.y_test = th.tensor(self.y_test_np, dtype=th.float) self.normalize() def set_time_to_zero(self): """ sets the time vector to start at time zero """ if self.num_xdim == 2: self.x_train_min = self.x_train[:,0].min() self.x_train[:,0] = self.x_train[:,0] - self.x_train_min self.x_test[:,0] = self.x_test[:,0] - self.x_train_min else: self.x_train_min = self.x_train.min() self.x_train = self.x_train - self.x_train_min self.x_test = self.x_test - self.x_train_min def normalize(self): """ normalize the data to improve predictions """ self.set_time_to_zero() self.x_train_mean = self.x_train.mean() self.x_train_std = self.x_train.std() self.x_train = (self.x_train - self.x_train_mean) / self.x_train_std self.x_test = (self.x_test - self.x_train_mean) / self.x_train_std self.y_train_mean = self.y_train.mean() self.y_train_std = self.y_train.std() self.y_train = (self.y_train - self.y_train_mean) / self.y_train_std self.y_test = (self.y_test - self.y_train_mean) / self.y_train_std self.data_normalized = True def plot_reduced_data(self): """ plots the reduced training data """ with th.no_grad(): if self.num_xdim == 2: fig, (ax1, ax2) = plt.subplots(1,2, figsize=(12,6)) ax1.plot(self.x_train[:,0], self.y_train, 'k*') ax1.legend(['train data']) ax1.set_xlabel('time [days]') ax1.set_ylabel('output') ax1.set_title('training data') ax1.grid() ax2.plot(self.x_train[:,0], self.x_train[:,1], 'k*') ax2.legend(['train data']) ax2.set_xlabel('time [days]') ax2.set_ylabel('additional input') ax2.set_title('training data') ax2.grid() else: fig, ax = plt.subplots(1,1, figsize=(12,6)) ax.plot(self.x_train, self.y_train, 'k*') ax.legend(['train data']) ax.set_xlabel('time [days]') ax.set_ylabel('output') ax.set_title('training data') ax.grid()

Use class to convert dataframes to numpy arrays for further processing. Note, the conversion may take a while.

ds_ncworld = GPDataSet(df_ncworld_train, df_ncworld_test, datacol = 'new_cases_smoothed', filterloc = None, add_input=None) ds_ndworld = GPDataSet(df_ndworld_train, df_ndworld_test, datacol = 'new_deaths_smoothed', filterloc = None, add_input=None) ds_ncworld.convert_to_numpy() ds_ndworld.convert_to_numpy()

Plot

ds_ncworld.plot_numpy_data()

ds_ndworld.plot_numpy_data()

Process data by subsampling, reducing data window and normalize data. The gaussian process model is a so called non parametric model and will be mainly based on the data points. As such, to reduce the computation and the complexity of the model, we subsample and reduce the number of datapoints.

ds_ncworld.process_numpy_data(nth_subsample = 4, window_red = 0.8) ds_ndworld.process_numpy_data(nth_subsample = 4, window_red = 0.8)

Plot processed data

ds_ncworld.plot_reduced_data()

ds_ndworld.plot_reduced_data()

Define gaussian process classes using Gpytorch and different kernels.

import gpytorch as gpth class GPLinearRBF(gpth.models.ExactGP): def __init__(self, train_x, train_y, likelihood): super(GPLinearRBF, self).__init__(train_x, train_y, likelihood) self.mean_module = gpth.means.ConstantMean() self.covar_module = gpth.kernels.ScaleKernel(gpth.kernels.LinearKernel() + gpth.kernels.RBFKernel()) def forward(self, x): x_mean = self.mean_module(x) x_covar = self.covar_module(x) return gpth.distributions.MultivariateNormal(x_mean, x_covar) class GPLinearMatern(gpth.models.ExactGP): def __init__(self, train_x, train_y, likelihood): super(GPLinearMatern, self).__init__(train_x, train_y, likelihood) self.mean_module = gpth.means.ConstantMean() self.covar_module = gpth.kernels.ScaleKernel(gpth.kernels.LinearKernel() + gpth.kernels.MaternKernel()) def forward(self, x): x_mean = self.mean_module(x) x_covar = self.covar_module(x) return gpth.distributions.MultivariateNormal(x_mean, x_covar)

Define a training class for the Gaussian Process models

import matplotlib.pyplot as plt import math from sklearn.metrics import mean_squared_error from sklearn.metrics import mean_absolute_error class GPTrainer(): def __init__(self, gp_model, x_train, x_train_min, x_train_mean, x_train_std, x_test, y_train, y_test, y_train_mean, y_train_std, device='cpu', train_iter = 300, lr=0.1, verbose = True): """ class to manage training and prediction of data param: 'gp_model', name of gaussian process model including kernel to use param: 'x_train', pytorch tensor (sequence, dim), normalized input training data, starting at time zero param: 'x_train_min', pytorch tensor, start time of input training data param: 'x_train_mean', pytorch tensor, mean used when normalizing input training data param: 'x_train_std', pytorch tensor, std deviation used when normalizing input training data param: 'x_test', pytorch tensor, normalized input test data, starting at time zero param: 'y_train', pytorch tensor, normalized output training data param: 'y_train_mean', pytorch tensor, mean used when normalizing output training data param: 'y_train_std', pytorch tensor, std deviation used when normalizing output training data param: 'y_test', pytorch tensor, normalized output test data param: 'device', cpu or cuda. currently only tested for cpu. param: 'train_iter', number of training iterations to fit kernel parameters to data param: 'lr', learning rate param: 'verbose', print information such as loss during training """ # data self.x_train = x_train.to(device) self.x_train_min = x_train_min self.x_train_mean = x_train_mean self.x_train_std = x_train_std self.x_test = x_test.to(device) self.x_cat = th.cat((x_train,x_test),dim=0).to(device) self.y_train = y_train.to(device) self.y_train_mean = y_train_mean self.y_train_std = y_train_std self.y_test = y_test.to(device) self.preds = None # define GP likelihood self.likelihood = gpth.likelihoods.GaussianLikelihood() # GP model selection and init assert gp_model == 'GPLinearRBF' or 'GPLinearMatern', "Error: GP model selected is not defined" if gp_model == 'GPLinearRBF': self.model = GPLinearRBF(self.x_train, self.y_train, self.likelihood).to(device) if gp_model == 'GPLinearMatern': self.model = GPLinearMatern(self.x_train, self.y_train, self.likelihood).to(device) # training param self.train_iter = train_iter self.lr = lr self.device = device self.optimizer = th.optim.Adam(self.model.parameters(), lr=self.lr) self.loss_fn = gpth.mlls.ExactMarginalLogLikelihood(self.likelihood, self.model) self.verbose = verbose # plots self.fig = None self.ax = None def train(self): """ training of gaussian process model to fit kernel parameters to data """ self.model.train() self.likelihood.train() for iter_idx in range(1,self.train_iter+1): self.optimizer.zero_grad() out = self.model(self.x_train) loss = -self.loss_fn(out, self.y_train).mean() loss.backward() self.optimizer.step() if iter_idx % 10 == 0 and self.verbose is True: print(f"Iter: {iter_idx}, train_loss: {loss.item()}") def prediction(self): """ predict data """ self.model.eval() self.likelihood.eval() with th.no_grad(): #, gpth.settings.fast_pred_var(): self.preds = self.likelihood(self.model(self.x_cat)) def denormalize_y(self, data): """ denormalize the output data """ return data*self.y_train_std + self.y_train_mean def denormalize_x(self, data): """ denormalize the input data """ return data*self.x_train_std + self.x_train_mean def plot(self): """ plot the data """ with th.no_grad(): # extract time index dimension xdim = None try: _, xdim = self.x_train.shape except: pass if xdim == None or xdim == 1: x_train = self.denormalize_x(self.x_train) x_test = self.denormalize_x(self.x_test) x_cat = self.denormalize_x(self.x_cat) elif xdim > 1: x_train = self.denormalize_x(self.x_train)[:,0] x_test = self.denormalize_x(self.x_test)[:,0] x_cat = self.denormalize_x(self.x_cat)[:,0] # plot self.fig, self.ax = plt.subplots(1,1, figsize=(12,6)) lower = self.denormalize_y(self.preds.mean - self.preds.variance.sqrt() * 1.96) upper = self.denormalize_y(self.preds.mean + self.preds.variance.sqrt() * 1.96) self.ax.plot(x_train.numpy()+self.x_train_min.numpy(), self.denormalize_y(self.y_train).numpy(), 'k*') self.ax.plot(x_test.numpy()+self.x_train_min.numpy(), self.denormalize_y(self.y_test).numpy(), 'r*') self.ax.plot(x_cat.numpy()+self.x_train_min.numpy(), self.denormalize_y(self.preds.mean).numpy(), 'b') self.ax.fill_between(x_cat.numpy()+self.x_train_min.numpy(), lower.numpy(), upper.numpy(), alpha=0.3) self.ax.legend(['train data', 'test data', 'predicted mean', 'predicted confidence 95%']) self.ax.set_xlabel('time [days]') self.ax.set_ylabel('prediction') self.ax.set_title('prediction') self.ax.grid() def print_data_dim(self): """ print shapes for debug purpose """ print("data shapes:") print(f'x_train: {self.x_train.shape}') print(f'x_test: {self.x_test.shape}') print(f'x_cat: {self.x_cat.shape}') print(f'y_train: {self.y_train.shape}') print(f'y_test: {self.y_test.shape}') try: print(f'preds mean: {self.preds.mean.shape}') except: pass def evaluate(self): """ evaluation of predictions """ with th.no_grad(): # data to evaluate test_data = self.denormalize_y(self.y_test) predictions = self.denormalize_y(self.preds.mean[-len(self.y_test):]) # evaluate error_mse = mean_squared_error(test_data, predictions) error_rmse = math.sqrt(error_mse) error_abs = mean_absolute_error(test_data, predictions) avg_gt = test_data.sum() / len(test_data) mse_percentage = error_rmse / avg_gt * 100 abs_percentage = error_abs / avg_gt * 100 # print print('Average of groundtruth: %.3f' % avg_gt) print('Test MSE: %.3f' % error_mse) print('Test RMSE: %.3f' % error_rmse) print('RMSE percentage error: %.3f' % mse_percentage, '%') print('Test ABS: %.3f' % error_abs) print('ABS percentage error: %.3f' % abs_percentage, '%')

Init the training class for the Gaussian Process models

pred_ncworld = GPTrainer(gp_model='GPLinearRBF', x_train=ds_ncworld.x_train, x_train_min=ds_ncworld.x_train_min, x_train_mean=ds_ncworld.x_train_mean, x_train_std=ds_ncworld.x_train_std, x_test=ds_ncworld.x_test, y_train=ds_ncworld.y_train, y_test=ds_ncworld.y_test, y_train_mean=ds_ncworld.y_train_mean, y_train_std=ds_ncworld.y_train_std) pred_ndworld = GPTrainer(gp_model='GPLinearRBF', x_train=ds_ndworld.x_train, x_train_min=ds_ndworld.x_train_min, x_train_mean=ds_ndworld.x_train_mean, x_train_std=ds_ndworld.x_train_std, x_test=ds_ndworld.x_test, y_train=ds_ndworld.y_train, y_test=ds_ndworld.y_test, y_train_mean=ds_ndworld.y_train_mean, y_train_std=ds_ndworld.y_train_std)

Training

print('\ntraining new cases prediction model') pred_ncworld.train() print('\ntraining new deaths prediction model') pred_ndworld.train()

Prediction and plot

pred_ncworld.prediction() pred_ncworld.plot() pred_ncworld.ax.set_ylabel('new cases smoothed') pred_ncworld.ax.set_title('new cases smoothed')

pred_ndworld.prediction() pred_ndworld.plot() pred_ndworld.ax.set_ylabel('new deaths smoothed') pred_ndworld.ax.set_title('new deaths smoothed')

To perform onestep ahead mean prediction, we define some additional functions

def onestep_prediction(dataset): onestep = th.cat((dataset.y_train, dataset.y_test), dim=0) # output vector for idx in range(len(dataset.y_test)): # define training and test data. Training data is iteratively, step by step, expanded by the use of test data x_train = th.cat((dataset.x_train, dataset.x_test[:idx]), dim=0) x_test = dataset.x_test[idx:] y_train = th.cat((dataset.y_train, dataset.y_test[:idx]), dim=0) y_test = dataset.y_test[idx:] # create a gaussian process model, train and make predictions pred_model = GPTrainer(gp_model='GPLinearRBF', x_train=x_train, x_train_min=dataset.x_train_min, x_train_mean=dataset.x_train_mean, x_train_std=dataset.x_train_std, x_test=x_test, y_train=y_train, y_test=y_test, y_train_mean=dataset.y_train_mean, y_train_std=dataset.y_train_std, verbose=False) pred_model.train() pred_model.prediction() # store one step predictions onestep[len(dataset.y_train) + idx] = pred_model.preds.mean[len(dataset.x_train)+idx] # plot results fig, ax = plt.subplots(1,1, figsize=(12,6)) ax.plot(pred_model.x_train_min + pred_model.denormalize_x(dataset.x_test), pred_model.denormalize_y(dataset.y_test),'*r', pred_model.x_train_min + pred_model.denormalize_x(dataset.x_test), pred_model.denormalize_y(onestep[len(dataset.y_train):]),'k*') ax.legend(['test data', 'prediction mean']) ax.set_xlabel('time [days]') ax.set_ylabel('prediction mean') ax.set_title('one step ahead prediction') ax.grid() # return onestep prediction return onestep

We iteratively predict the next one step ahead

onestep_pred_ncworld = onestep_prediction(ds_ncworld)

onestep_pred_ndworld = onestep_prediction(ds_ndworld)

3.3 Sweden multistep prediction

Use class to convert dataframes to numpy arrays for further processing. Note, the conversion may take a while.

ds_ncswe = GPDataSet(df_ncdenswe_train, df_ncdenswe_test, datacol = 'new_cases_smoothed', filterloc = 'Sweden', add_input=None) ds_ndswe = GPDataSet(df_nddenswe_train, df_nddenswe_test, datacol = 'new_deaths_smoothed', filterloc = 'Sweden', add_input=None) ds_ncswe.convert_to_numpy() ds_ndswe.convert_to_numpy()

Plot data.

ds_ncswe.plot_numpy_data()

ds_ndswe.plot_numpy_data()

Process data by subsampling, reducing data window and normalize data. The gaussian process model is a so called non parametric model and will be mainly based on the data points. As such, to reduce the computation and the complexity of the model, we subsample and reduce the number of datapoints.

ds_ncswe.process_numpy_data(nth_subsample = 4, window_red = 0.8) ds_ndswe.process_numpy_data(nth_subsample = 4, window_red = 0.8)

Plot processed data

ds_ncswe.plot_reduced_data()

ds_ndswe.plot_reduced_data()

Init the training class for the Gaussian Process models

pred_ncswe = GPTrainer(gp_model='GPLinearRBF', x_train=ds_ncswe.x_train, x_train_min=ds_ncswe.x_train_min, x_train_mean=ds_ncswe.x_train_mean, x_train_std=ds_ncswe.x_train_std, x_test=ds_ncswe.x_test, y_train=ds_ncswe.y_train, y_test=ds_ncswe.y_test, y_train_mean=ds_ncswe.y_train_mean, y_train_std=ds_ncswe.y_train_std) pred_ndswe = GPTrainer(gp_model='GPLinearRBF', x_train=ds_ndswe.x_train, x_train_min=ds_ndswe.x_train_min, x_train_mean=ds_ncswe.x_train_mean, x_train_std=ds_ncswe.x_train_std, x_test=ds_ndswe.x_test, y_train=ds_ndswe.y_train, y_test=ds_ndswe.y_test, y_train_mean=ds_ndswe.y_train_mean, y_train_std=ds_ndswe.y_train_std)

Training

print('\ntraining new cases prediction model') pred_ncswe.train() print('\ntraining new deaths prediction model') pred_ndswe.train()

Prediction and plot

pred_ncswe.prediction() pred_ncswe.plot() pred_ncswe.ax.set_ylabel('new cases smoothed') pred_ncswe.ax.set_title('new cases smoothed')

pred_ndswe.prediction() pred_ndswe.plot() pred_ndswe.ax.set_ylabel('new deaths smoothed') pred_ndswe.ax.set_title('new deaths smoothed')

onestep_pred_ncswe = onestep_prediction(ds_ncswe)

onestep_pred_ndswe = onestep_prediction(ds_ndswe)

3.5 Sweden multistep prediction with additional data input from neighbouring country

Assuming we knew the results from a neighbouring country and if data is correlated, we could presumably improve the prediction

Plot resulting data used for prediction. Both plots appears to follow a form of trend.

ds_ncswex = GPDataSet(df_ncdenswe_train, df_ncdenswe_test, datacol = 'new_cases_smoothed', filterloc = 'Sweden', add_input='Denmark') ds_ndswex = GPDataSet(df_nddenswe_train, df_nddenswe_test, datacol = 'new_deaths_smoothed', filterloc = 'Sweden', add_input='Denmark') ds_ncswex.convert_to_numpy() ds_ndswex.convert_to_numpy()

Plot data.

ds_ncswex.plot_numpy_data()

ds_ndswex.plot_numpy_data()

Process data by subsampling, reducing data window and normalize data. The gaussian process model is a so called non parametric model and will be mainly based on the data points. As such, to reduce the computation and the complexity of the model, we subsample and reduce the number of datapoints.

ds_ncswex.process_numpy_data(nth_subsample = 4, window_red = 0.8) ds_ndswex.process_numpy_data(nth_subsample = 4, window_red = 0.8)

Plot processed data

ds_ncswex.plot_reduced_data()

ds_ndswex.plot_reduced_data()

Init the training class for the Gaussian Process models

pred_ncswex = GPTrainer(gp_model='GPLinearRBF', x_train=ds_ncswex.x_train, x_train_min=ds_ncswex.x_train_min, x_train_mean=ds_ncswex.x_train_mean, x_train_std=ds_ncswex.x_train_std, x_test=ds_ncswex.x_test, y_train=ds_ncswex.y_train, y_test=ds_ncswex.y_test, y_train_mean=ds_ncswex.y_train_mean, y_train_std=ds_ncswex.y_train_std) pred_ndswex = GPTrainer(gp_model='GPLinearRBF', x_train=ds_ndswex.x_train, x_train_min=ds_ndswex.x_train_min, x_train_mean=ds_ndswex.x_train_mean, x_train_std=ds_ndswex.x_train_std, x_test=ds_ndswex.x_test, y_train=ds_ndswex.y_train, y_test=ds_ndswex.y_test, y_train_mean=ds_ndswex.y_train_mean, y_train_std=ds_ndswex.y_train_std)

Training

print('\ntraining new cases prediction model') pred_ncswex.train() print('\ntraining new deaths prediction model') pred_ndswex.train()
pred_ncswex.prediction() pred_ncswex.plot() pred_ncswex.ax.set_ylabel('new cases smoothed') pred_ncswex.ax.set_title('new cases smoothed')

pred_ndswex.prediction() pred_ndswex.plot() pred_ndswex.ax.set_ylabel('new deaths smoothed') pred_ndswex.ax.set_title('new deaths smoothed')

  1. Evaluation

Evaluation of new cases smoothed

pred_ncworld.evaluate()
Average of groundtruth: 554788.000 Test MSE: 1850106112.000 Test RMSE: 43012.860 RMSE percentage error: 7.753 % Test ABS: 33457.289 ABS percentage error: 6.031 %

Evaluation of new deaths smoothed

pred_ndworld.evaluate()
Average of groundtruth: 8842.582 Test MSE: 8629836.000 Test RMSE: 2937.658 RMSE percentage error: 33.222 % Test ABS: 2727.578 ABS percentage error: 30.846 %

To evaluate the onestep ahead prediction, we define an additional function

def evaluate(test_data, prediction): with th.no_grad(): # evaluate error_mse = mean_squared_error(test_data, prediction) error_rmse = math.sqrt(error_mse) error_abs = mean_absolute_error(test_data, prediction) avg_gt = test_data.sum() / len(test_data) mse_percentage = error_rmse / avg_gt * 100 abs_percentage = error_abs / avg_gt * 100 # print print('Average of groundtruth: %.3f' % avg_gt) print('Test MSE: %.3f' % error_mse) print('Test RMSE: %.3f' % error_rmse) print('RMSE percentage error: %.3f' % mse_percentage, '%') print('Test ABS: %.3f' % error_abs) print('ABS percentage error: %.3f' % abs_percentage, '%')

Evaluation of new cases smoothed

# data to evaluate test = pred_ncworld.denormalize_y(ds_ncworld.y_test) preds = pred_ncworld.denormalize_y(onestep_pred_ncworld[-len(ds_ncworld.y_test):]) evaluate(test, preds)
Average of groundtruth: 554788.000 Test MSE: 38076252.000 Test RMSE: 6170.596 RMSE percentage error: 1.112 % Test ABS: 4394.894 ABS percentage error: 0.792 %

Evaluation of new deaths smoothed

# data to evaluate test = pred_ndworld.denormalize_y(ds_ndworld.y_test) preds = pred_ndworld.denormalize_y(onestep_pred_ndworld[-len(ds_ndworld.y_test):]) evaluate(test, preds)
Average of groundtruth: 8842.582 Test MSE: 29195.436 Test RMSE: 170.867 RMSE percentage error: 1.932 % Test ABS: 137.978 ABS percentage error: 1.560 %

Evaluation of new cases smoothed

pred_ncswe.evaluate()
Average of groundtruth: 4238.664 Test MSE: 4737463.500 Test RMSE: 2176.572 RMSE percentage error: 51.350 % Test ABS: 2068.833 ABS percentage error: 48.809 %

Evaluation of new deaths smoothed

pred_ndswe.evaluate()
Average of groundtruth: 26.254 Test MSE: 755.362 Test RMSE: 27.484 RMSE percentage error: 104.686 % Test ABS: 22.457 ABS percentage error: 85.540 %

Evaluation of new cases smoothed

# data to evaluate test = pred_ncswe.denormalize_y(ds_ncswe.y_test) preds = pred_ncswe.denormalize_y(onestep_pred_ncswe[-len(ds_ncswe.y_test):]) evaluate(test, preds)
Average of groundtruth: 4238.664 Test MSE: 77977.289 Test RMSE: 279.244 RMSE percentage error: 6.588 % Test ABS: 235.829 ABS percentage error: 5.564 %

Evaluation of new deaths smoothed

# data to evaluate test = pred_ndswe.denormalize_y(ds_ndswe.y_test) preds = pred_ndswe.denormalize_y(onestep_pred_ndswe[-len(ds_ndswe.y_test):]) evaluate(test, preds)
Average of groundtruth: 26.254 Test MSE: 33.230 Test RMSE: 5.765 RMSE percentage error: 21.957 % Test ABS: 3.928 ABS percentage error: 14.963 %

Evaluation of new cases smoothed

pred_ncswex.evaluate()
Average of groundtruth: 4238.664 Test MSE: 2658296.750 Test RMSE: 1630.428 RMSE percentage error: 38.466 % Test ABS: 1537.534 ABS percentage error: 36.274 %

Evaluation of new deaths smoothed

pred_ndswex.evaluate()
Average of groundtruth: 26.254 Test MSE: 756.587 Test RMSE: 27.506 RMSE percentage error: 104.771 % Test ABS: 22.476 ABS percentage error: 85.612 %
  1. Conclusions and reflections

Predictions using gaussian processes were made for both new cases smoothed and new deaths smoothed. This included an aggregation of many countries within the world as well as for Sweden. Making single step ahead predictions resulted naturally in smaller errors compared to the multistep predictions. The multistep prediction for Sweden could be improved for new cases smoothed using correlated data from a neighbouring country.

We believe the Gaussian process model is a valuable tool for making predictions. With this work, we would like to highlight that the data points and kernel chosen for the Gaussian process heavily biases the model and strongly influences the predictions. In this project, we selected a combination of a linear kernel and a radial basis function. The reason being that there is a trend in the data and that nearby data points should be more similar than data points further away. By inspecting the data carefully, a more optimal kernel could likely be selected. Also, the confidence intervall provided with the gaussian process model is based on that the kernel is correctly representing the underlying distribution of data.

In terms of scalability, the predictions are somewhat scalable as a user can define a window of data for making the predictions. Furthermore, GPU support could be included and approximations to the gaussian process model could be made.

Compared to the ARIMA model, the gaussian process model performed in most cases slightly worse. However, this may be due to the selection of data points and kernel considering that the gaussian process model is heavily biased by these choices. One reflection is that if one approximately knows the distribution of the underlying data, a gaussian process model with a proper selected kernel may be a good choice.