ScaDaMaLe Course site and book

In this notebook, we do prediction with the time series method - Autoregressive integrated moving average (ARIMA). We preprocess the data and prepare for prediction. Then we predicted the new cases (smoothed) and new deaths (smoothed) for world and Sweden. We predict the future value from the history value. After the prediction part we evaluated our results.

  1. Import data and preprocess

// You need to uncomment this line if you haven't preprocess data yet.

%run "./02_DataPreprocess"
  1. Prepare for data

display(df_cleaned_time_series)
df_cleaned_time_series.printSchema
root
 |-- iso_code: string (nullable = true)
 |-- continent: string (nullable = false)
 |-- location: string (nullable = true)
 |-- date: string (nullable = true)
 |-- total_cases: double (nullable = false)
 |-- new_cases: double (nullable = true)
 |-- new_cases_smoothed: double (nullable = false)
 |-- total_deaths: double (nullable = false)
 |-- new_deaths: double (nullable = true)
 |-- new_deaths_smoothed: double (nullable = false)
 |-- reproduction_rate: double (nullable = false)
 |-- icu_patients: double (nullable = true)
 |-- icu_patients_per_million: double (nullable = true)
 |-- hosp_patients: double (nullable = true)
 |-- hosp_patients_per_million: double (nullable = true)
 |-- weekly_icu_admissions: double (nullable = true)
 |-- weekly_icu_admissions_per_million: double (nullable = true)
 |-- weekly_hosp_admissions: double (nullable = true)
 |-- weekly_hosp_admissions_per_million: double (nullable = true)
 |-- total_tests: double (nullable = false)
 |-- new_tests: double (nullable = true)
 |-- total_tests_per_thousand: double (nullable = true)
 |-- new_tests_per_thousand: double (nullable = true)
 |-- new_tests_smoothed: double (nullable = true)
 |-- new_tests_smoothed_per_thousand: double (nullable = true)
 |-- tests_per_case: double (nullable = true)
 |-- positive_rate: double (nullable = true)
 |-- tests_units: double (nullable = true)
 |-- stringency_index: double (nullable = false)
 |-- population: double (nullable = true)
 |-- population_density: double (nullable = true)
 |-- median_age: double (nullable = true)
 |-- aged_65_older: double (nullable = true)
 |-- aged_70_older: double (nullable = true)
 |-- gdp_per_capita: double (nullable = true)
 |-- extreme_poverty: double (nullable = true)
 |-- cardiovasc_death_rate: double (nullable = true)
 |-- diabetes_prevalence: double (nullable = true)
 |-- female_smokers: double (nullable = true)
 |-- male_smokers: double (nullable = true)
 |-- handwashing_facilities: double (nullable = true)
 |-- hospital_beds_per_thousand: double (nullable = true)
 |-- life_expectancy: double (nullable = true)
 |-- human_development_index: double (nullable = true)
 |-- total_cases_per_million: double (nullable = true)
 |-- new_cases_per_million: double (nullable = true)
 |-- new_cases_smoothed_per_million: double (nullable = true)
 |-- total_deaths_per_million: double (nullable = true)
 |-- new_deaths_per_million: double (nullable = true)
 |-- new_deaths_smoothed_per_million: double (nullable = true)
// There is no "World" in the 126 countries. we need to calculate it.
val countries = df_cleaned_time_series.groupBy("location").count().sort($"location")
display(countries)

2.1.1 The smoothed new cases of the world.

We use the smoothed new cases because the raw data fluctuates greatly by day - on workdays, there are more new cases than on weekends.

// prediction for all over the world
import org.apache.spark.sql.functions._
// val df_world = df_cleaned_time_series.withColumn("date", (col("date").cast("Timestamp"))).where("location == 'World'").select($"date",$"new_cases_smoothed")

val df_world = df_cleaned_time_series.groupBy("date").sum("new_cases_smoothed").sort(col("date")).withColumnRenamed("sum(new_cases_smoothed)","new_cases_smoothed")

display(df_world)

df_world.printSchema
root
 |-- date: string (nullable = true)
 |-- new_cases_smoothed: double (nullable = true)
df_world.createOrReplaceTempView("df_world")
// val df_world_deaths = df_cleaned_time_series.withColumn("date", (col("date").cast("Timestamp"))).where("location == 'World'").select($"date",$"new_deaths_smoothed")
val df_world_deaths = df_cleaned_time_series.groupBy("date").sum("new_deaths_smoothed").sort(col("date")).withColumnRenamed("sum(new_deaths_smoothed)","new_deaths_smoothed")

df_world_deaths.createOrReplaceTempView("df_world_deaths")
df_world_deaths: org.apache.spark.sql.DataFrame = [date: string, new_deaths_smoothed: double]

2.2.1 The smoothed new cases of Sweden

In addition to the new cases all over the world, we also care about the cases in Sweden. Here we deal with smoothed new cases of Sweden.

// Select one contry for prediction
import org.apache.spark.sql.functions._

val df_sw = df_cleaned_time_series.withColumn("date", (col("date").cast("Timestamp"))).where("location == 'Sweden'").select($"date",$"new_cases_smoothed")
display(df_sw)

df_sw.printSchema
root
 |-- date: timestamp (nullable = true)
 |-- new_cases_smoothed: double (nullable = false)
df_sw.createOrReplaceTempView("df_sw")
val df_sw_deaths = df_cleaned_time_series.withColumn("date", (col("date").cast("Timestamp"))).where("location == 'Sweden'").select($"date",$"new_deaths_smoothed")
df_sw_deaths.createOrReplaceTempView("df_sw_deaths")
df_sw_deaths: org.apache.spark.sql.DataFrame = [date: timestamp, new_deaths_smoothed: double]
  1. Time series regression with ARIMA

ARIMA - Autoregressive Integrated Moving Average model. It's widely used in time series analysis. see defination here: https://en.wikipedia.org/wiki/Autoregressiveintegratedmoving_average

# import some libraries
# dbutils.library.installPyPI('numpy','1.16.3')
# dbutils.library.installPyPI('pandas','1.1.5')
# dbutils.library.restartPython()
import pandas
from matplotlib import pyplot
print(pandas.__version__)
data = spark.table("df_world")

print(type(data))
1.0.1
<class 'pyspark.sql.dataframe.DataFrame'>
from pyspark.sql.functions import *
from datetime import datetime
from pyspark.sql.functions import to_date, to_timestamp
data_pd = data.toPandas()
import numpy as np
import pandas as pd
from statsmodels.tsa.arima_model import ARIMA
import sklearn
import statsmodels
from datetime import date
print(data_pd.columns)
data_pd['date'] = pd.to_datetime(data_pd['date'])
Index(['date', 'new_cases_smoothed'], dtype='object')
// # %python
// # data_pd.plot(x='date', y = 'new_cases_smoothed', figsize=(8,5))

import math
def Predict_by_ARIMA(data_pd, one_step = True, training_length = 0.9):
  data_pd1 = data_pd.set_index('date')
  X = data_pd1.values
  train_size = int(len(X) * training_length) #the length you need for training.
  train, test = X[0:train_size], X[train_size:len(X)]
  test_date = data_pd1.index[train_size:len(X)]
  history = [x for x in train]
  predictions = list()
  print("training_series_size: ", train_size)
  print("test_series_size: ", len(test))
  for t in range(len(test)):
    model = ARIMA(history, order=(2,1,0))
    model_fit = model.fit(disp=0)
    output = model_fit.forecast()
    yhat = output[0]
    predictions.append(yhat)
    if one_step:
      obs = test[t] # use real value, only predict next step
    else:
      obs = yhat # use predicted value, predict all test data
    history.append(obs)
    current_date = test_date[t]
    print(str(current_date.date()), 'pred=%f, gt=%f' % (yhat, obs))
  return test, predictions
test_world, predictions_world = Predict_by_ARIMA(data_pd, True, 0.9)
print("test size: ", len(test_world))
print("predicted size: ", len(predictions_world))
# plot
fig_world = pyplot.figure()  
pyplot.plot(test_world)
pyplot.plot(predictions_world, color='red')
pyplot.show()

print(data_pd)
_, predictions_world_multi = Predict_by_ARIMA(data_pd, False)
print("test size: ", len(test_world))
print("predicted size: ", len(predictions_world))
# plot
fig_world_multi = pyplot.figure()  
pyplot.plot(test_world)
pyplot.plot(predictions_world_multi, color='red')
pyplot.show()

data_world_death = spark.table("df_world_deaths")
data_world_death_pd = data_world_death.toPandas()
print(data_world_death_pd.columns)
data_world_death_pd['date'] = pd.to_datetime(data_world_death_pd['date'])
Index(['date', 'new_deaths_smoothed'], dtype='object')
test_world_death, predictions_world_death = Predict_by_ARIMA(data_world_death_pd)
print("test size: ", len(test_world_death))
print("predicted size: ", len(predictions_world_death))
# plot
fig_world_death = pyplot.figure()  
pyplot.plot(test_world_death)
pyplot.plot(predictions_world_death, color='red')
pyplot.show()

_, predictions_world_death_multi = Predict_by_ARIMA(data_world_death_pd, False)
print("test size: ", len(test_world_death))
print("predicted size: ", len(predictions_world_death_multi))
# plot
fig_world_death = pyplot.figure()  
pyplot.plot(test_world_death)
pyplot.plot(predictions_world_death_multi, color='red')
pyplot.show()

from datetime import datetime
from datetime import date
from matplotlib import pyplot
import numpy as np
import pandas as pd
from pyspark.sql.functions import to_date, to_timestamp
from pyspark.sql.functions import *
from statsmodels.tsa.arima_model import ARIMA
import sklearn
import statsmodels

data_sw = spark.table("df_sw")
data_sw_pd = data_sw.toPandas()
print(data_sw_pd.columns)
data_sw_pd['date'] = pd.to_datetime(data_sw_pd['date'])
Index(['date', 'new_cases_smoothed'], dtype='object')
data_sw_pd.plot(x='date', y = 'new_cases_smoothed', figsize=(8,5))

test_sw, predictions_sw = Predict_by_ARIMA(data_sw_pd)
print("test size: ", len(test_sw))
print("predicted size: ", len(predictions_sw))
# plot
fig_sw = pyplot.figure()  
pyplot.plot(test_sw)
pyplot.plot(predictions_sw, color='red')
pyplot.show()

_, predictions_sw_multi = Predict_by_ARIMA(data_sw_pd, False)
print("test size: ", len(test_sw))
print("predicted size: ", len(predictions_sw))
# plot
fig_sw = pyplot.figure()  
pyplot.plot(test_sw)
pyplot.plot(predictions_sw_multi, color='red')
pyplot.show()

data_sw_death = spark.table("df_sw_deaths")
data_sw_death_pd = data_sw_death.toPandas()
print(data_sw_death_pd.columns)
data_sw_death_pd['date'] = pd.to_datetime(data_sw_death_pd['date'])
Index(['date', 'new_deaths_smoothed'], dtype='object')
test_sw_death, predictions_sw_death = Predict_by_ARIMA(data_sw_death_pd)
print("test size: ", len(test_sw_death))
print("predicted size: ", len(predictions_sw_death))
# plot
fig_sw_death = pyplot.figure()  
pyplot.plot(test_sw_death)
pyplot.plot(predictions_sw_death, color='red')
pyplot.show()

_, predictions_sw_death_multi = Predict_by_ARIMA(data_sw_death_pd, False)
print("test size: ", len(test_sw_death))
print("predicted size: ", len(predictions_sw_death_multi))
# plot
fig_sw_death = pyplot.figure()  
pyplot.plot(test_sw_death)
pyplot.plot(predictions_sw_death_multi, color='red')
pyplot.show()

  1. Evaluation

import math
from sklearn.metrics import mean_squared_error
from sklearn.metrics import mean_absolute_error

def Evaluation(test, predictions):
  error_mse = mean_squared_error(test, predictions)
  error_rmse = math.sqrt(error_mse)
  error_abs = mean_absolute_error(test, predictions)
  avg_gt = test[:,0].sum() / len(test)

  mse_percentage = error_rmse / avg_gt * 100
  abs_percentage = error_abs / avg_gt * 100
  print('Average of groundtruth: %.3f' % avg_gt)
  print('Test MSE: %.3f' % error_mse)
  print('Test RMSE: %.3f' % error_rmse)
  print('RMSE percentage error: %.3f' % mse_percentage, '%')
  print('Test ABS: %.3f' % error_abs)
  print('ABS percentage error: %.3f' % abs_percentage, '%')

Evaluation(test_world, predictions_world)
Average of groundtruth: 763744.916
Test MSE: 1382104096.548
Test RMSE: 37176.661
RMSE percentage error: 4.868 %
Test ABS: 19644.392
ABS percentage error: 2.572 %
Evaluation(test_world, predictions_world_multi)
Average of groundtruth: 763744.916
Test MSE: 16372759781.212
Test RMSE: 127956.085
RMSE percentage error: 16.754 %
Test ABS: 114514.607
ABS percentage error: 14.994 %
Evaluation(test_world_death, predictions_world_death)
Average of groundtruth: 11412.091
Test MSE: 358938.532
Test RMSE: 599.115
RMSE percentage error: 5.250 %
Test ABS: 290.352
ABS percentage error: 2.544 %
Evaluation(test_world_death, predictions_world_death_multi)
Average of groundtruth: 11412.091
Test MSE: 5567620.973
Test RMSE: 2359.581
RMSE percentage error: 20.676 %
Test ABS: 2110.218
ABS percentage error: 18.491 %
Evaluation(test_sw, predictions_sw)
Average of groundtruth: 4564.277
Test MSE: 26457.989
Test RMSE: 162.659
RMSE percentage error: 3.564 %
Test ABS: 82.412
ABS percentage error: 1.806 %
Evaluation(test_sw, predictions_sw_multi)
Average of groundtruth: 4564.277
Test MSE: 1514028.102
Test RMSE: 1230.458
RMSE percentage error: 26.958 %
Test ABS: 1172.056
ABS percentage error: 25.679 %
Evaluation(test_sw_death, predictions_sw_death)
Average of groundtruth: 31.862
Test MSE: 24.933
Test RMSE: 4.993
RMSE percentage error: 15.672 %
Test ABS: 3.072
ABS percentage error: 9.643 %
Evaluation(test_sw_death, predictions_sw_death_multi)
Average of groundtruth: 31.862
Test MSE: 775.703
Test RMSE: 27.851
RMSE percentage error: 87.414 %
Test ABS: 25.094
ABS percentage error: 78.759 %
  1. Conclusion and Reflections

With this time series method - ARIMA, we can get quite resonable results. We predicted the new cases (smoothed) and new deaths (smoothed) for world and Sweden. The evaluation of one step shows that we can get good results with a small error. But the multi step results is not good when we want to predict long term results. The prediction model and evaluation function can also been used for other countries.