ScaDaMaLe Course site and book

In this model, we use scala to process the data and predict the total cases. In this data set, there are many features which are constant for each country and don’t change with time. So, we tried to predict the total cases on a selected date, from some countries to other countries, without considering the time series.

  1. Import data and preprocess

// You need to uncomment this line if you haven't preprocess data yet.

%run "./02_DataPreprocess"
  1. Data process

display(df_cleaned_time_series)
df_cleaned_time_series.printSchema
root
 |-- iso_code: string (nullable = true)
 |-- continent: string (nullable = false)
 |-- location: string (nullable = true)
 |-- date: string (nullable = true)
 |-- total_cases: double (nullable = false)
 |-- new_cases: double (nullable = true)
 |-- new_cases_smoothed: double (nullable = false)
 |-- total_deaths: double (nullable = false)
 |-- new_deaths: double (nullable = true)
 |-- new_deaths_smoothed: double (nullable = false)
 |-- reproduction_rate: double (nullable = false)
 |-- icu_patients: double (nullable = true)
 |-- icu_patients_per_million: double (nullable = true)
 |-- hosp_patients: double (nullable = true)
 |-- hosp_patients_per_million: double (nullable = true)
 |-- weekly_icu_admissions: double (nullable = true)
 |-- weekly_icu_admissions_per_million: double (nullable = true)
 |-- weekly_hosp_admissions: double (nullable = true)
 |-- weekly_hosp_admissions_per_million: double (nullable = true)
 |-- total_tests: double (nullable = false)
 |-- new_tests: double (nullable = true)
 |-- total_tests_per_thousand: double (nullable = true)
 |-- new_tests_per_thousand: double (nullable = true)
 |-- new_tests_smoothed: double (nullable = true)
 |-- new_tests_smoothed_per_thousand: double (nullable = true)
 |-- tests_per_case: double (nullable = true)
 |-- positive_rate: double (nullable = true)
 |-- tests_units: double (nullable = true)
 |-- stringency_index: double (nullable = false)
 |-- population: double (nullable = true)
 |-- population_density: double (nullable = true)
 |-- median_age: double (nullable = true)
 |-- aged_65_older: double (nullable = true)
 |-- aged_70_older: double (nullable = true)
 |-- gdp_per_capita: double (nullable = true)
 |-- extreme_poverty: double (nullable = true)
 |-- cardiovasc_death_rate: double (nullable = true)
 |-- diabetes_prevalence: double (nullable = true)
 |-- female_smokers: double (nullable = true)
 |-- male_smokers: double (nullable = true)
 |-- handwashing_facilities: double (nullable = true)
 |-- hospital_beds_per_thousand: double (nullable = true)
 |-- life_expectancy: double (nullable = true)
 |-- human_development_index: double (nullable = true)
 |-- total_cases_per_million: double (nullable = true)
 |-- new_cases_per_million: double (nullable = true)
 |-- new_cases_smoothed_per_million: double (nullable = true)
 |-- total_deaths_per_million: double (nullable = true)
 |-- new_deaths_per_million: double (nullable = true)
 |-- new_deaths_smoothed_per_million: double (nullable = true)
import org.apache.spark.sql.functions._

for (c <- df_cleaned_time_series.columns) {
  println(c + ": " + df_cleaned_time_series.filter(col(c).isNull).count())
}
iso_code: 0
continent: 0
location: 0
date: 0
total_cases: 0
new_cases: 0
new_cases_smoothed: 0
total_deaths: 0
new_deaths: 0
new_deaths_smoothed: 0
reproduction_rate: 0
icu_patients: 36018
icu_patients_per_million: 36018
hosp_patients: 34870
hosp_patients_per_million: 34870
weekly_icu_admissions: 41062
weekly_icu_admissions_per_million: 41062
weekly_hosp_admissions: 40715
weekly_hosp_admissions_per_million: 40715
total_tests: 0
new_tests: 20510
total_tests_per_thousand: 0
new_tests_per_thousand: 20510
new_tests_smoothed: 18176
new_tests_smoothed_per_thousand: 18176
tests_per_case: 19301
positive_rate: 19749
tests_units: 41600
stringency_index: 0
population: 0
population_density: 0
median_age: 0
aged_65_older: 0
aged_70_older: 0
gdp_per_capita: 0
extreme_poverty: 11168
cardiovasc_death_rate: 0
diabetes_prevalence: 0
female_smokers: 0
male_smokers: 0
handwashing_facilities: 24124
hospital_beds_per_thousand: 0
life_expectancy: 0
human_development_index: 0
total_cases_per_million: 0
new_cases_per_million: 0
new_cases_smoothed_per_million: 0
total_deaths_per_million: 0
new_deaths_per_million: 0
new_deaths_smoothed_per_million: 0
import org.apache.spark.sql.functions._

Prepare the data for training. We choose a day we want to predict, and select the constant features, and select the target column for prediction.

val df_by_location = df_cleaned_time_series.filter($"date" === "2020-12-01").sort($"continent").select($"iso_code",$"stringency_index", $"population",$"population_density",$"gdp_per_capita",$"diabetes_prevalence",$"total_cases_per_million",$"total_cases")
display(df_by_location)
df_by_location.count()
res145: Long = 159

Rescale the feature values and the target value.

import org.apache.spark.sql.functions._
import org.apache.spark.sql.Column
val min_str_index = df_by_location.select(min($"stringency_index")).first()(0)
val max_str_index = df_by_location.select(max($"stringency_index")).first()(0)
val min_population = df_by_location.select(min($"population")).first()(0)
val max_population = df_by_location.select(max($"population")).first()(0)
val min_population_density = 
df_by_location.select(min($"population_density")).first()(0)
val max_population_density = 
df_by_location.select(max($"population_density")).first()(0)
val min_gdp_per_capita = df_by_location.select(min($"gdp_per_capita")).first()(0)
val max_gdp_per_capita = df_by_location.select(max($"gdp_per_capita")).first()(0)
val min_diabetes_prevalence = 
df_by_location.select(min($"diabetes_prevalence")).first()(0)
val max_diabetes_prevalence = df_by_location.select(max($"diabetes_prevalence")).first()(0)

val df_by_location_normalized = df_by_location
  .withColumn("normal_stringency_index",($"stringency_index" -lit(min_str_index))/(lit(max_str_index)-lit(min_str_index)))
  .withColumn("normal_population", ($"population" - lit(min_population))/(lit(max_population)-lit(min_population)))
  .withColumn("normal_population_density",($"population_density" - lit(min_population_density))/(lit(max_population_density) - lit(min_population_density)))
  .withColumn("normal_gdp_per_capita", ($"gdp_per_capita" - lit(min_gdp_per_capita))/(lit(max_gdp_per_capita)- lit(min_gdp_per_capita)))
  .withColumn("normal_diabetes_prevalence", ($"diabetes_prevalence" - lit(min_diabetes_prevalence))/lit(max_diabetes_prevalence) - lit(min_diabetes_prevalence)).withColumn("log_total_cases_per_million", log($"total_cases_per_million")).toDF
display(df_by_location_normalized)
df_by_location_normalized.printSchema
root
 |-- iso_code: string (nullable = true)
 |-- stringency_index: double (nullable = false)
 |-- population: double (nullable = true)
 |-- population_density: double (nullable = true)
 |-- gdp_per_capita: double (nullable = true)
 |-- diabetes_prevalence: double (nullable = true)
 |-- total_cases_per_million: double (nullable = true)
 |-- total_cases: double (nullable = false)
 |-- normal_stringency_index: double (nullable = true)
 |-- normal_population: double (nullable = true)
 |-- normal_population_density: double (nullable = true)
 |-- normal_gdp_per_capita: double (nullable = true)
 |-- normal_diabetes_prevalence: double (nullable = true)
 |-- log_total_cases_per_million: double (nullable = true)
val df_by_location_normalized_selected = df_by_location_normalized.select($"normal_stringency_index",$"normal_population",$"normal_population_density",$"normal_gdp_per_capita", $"normal_diabetes_prevalence",$"log_total_cases_per_million")
df_by_location_normalized_selected: org.apache.spark.sql.DataFrame = [normal_stringency_index: double, normal_population: double ... 4 more fields]
display(df_by_location_normalized_selected)
  1. Linear Regression from selected value to new cases

These values are irrelevant to time, but relevant to country. So we try to predict the total case in some contries from the data in other contries.

df_by_location_normalized_selected.createOrReplaceTempView("covid_table")
import org.apache.spark.ml.feature.VectorAssembler

val vectorizer =  new VectorAssembler()
.setInputCols(Array("normal_stringency_index", "normal_population", "normal_population_density", "normal_gdp_per_capita", "normal_diabetes_prevalence"))
.setOutputCol("features")

// make a DataFrame called dataset from the table
val dataset = vectorizer.transform(df_by_location_normalized_selected).select("features","log_total_cases_per_million") 
import org.apache.spark.ml.feature.VectorAssembler
vectorizer: org.apache.spark.ml.feature.VectorAssembler = VectorAssembler: uid=vecAssembler_a8c5337c1334, handleInvalid=error, numInputCols=5
dataset: org.apache.spark.sql.DataFrame = [features: vector, log_total_cases_per_million: double]
display(dataset)
var Array(split20, split80) = dataset.randomSplit(Array(0.20, 0.80), 1800009193L)
split20: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [features: vector, log_total_cases_per_million: double]
split80: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [features: vector, log_total_cases_per_million: double]
val testSet = split20.cache()

val trainingSet = split80.cache()
testSet: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [features: vector, log_total_cases_per_million: double]
trainingSet: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [features: vector, log_total_cases_per_million: double]
testSet.count() // action to actually cache
res156: Long = 26
trainingSet.count() // action to actually cache
res157: Long = 133
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.regression.LinearRegressionModel
import org.apache.spark.ml.Pipeline

// Let's initialize our linear regression learner
val lr = new LinearRegression()
// We use explain params to dump the parameters we can use
lr.explainParams()
// Now we set the parameters for the method
lr.setPredictionCol("prediction")
  .setLabelCol("log_total_cases_per_million")
  .setMaxIter(100)
  .setRegParam(0.1)
val lrModel = lr.fit(trainingSet)
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.regression.LinearRegressionModel
import org.apache.spark.ml.Pipeline
lr: org.apache.spark.ml.regression.LinearRegression = linReg_04758f25dc55
lrModel: org.apache.spark.ml.regression.LinearRegressionModel = LinearRegressionModel: uid=linReg_04758f25dc55, numFeatures=5
val trainingSummary = lrModel.summary

println(s"Coefficients: ${lrModel.coefficients}, Intercept: ${lrModel.intercept}")
println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")
Coefficients: [2.6214237261445112,-1.3643062210132013,-1.3234981005291635,4.903123743799173,1.0283056897021852], Intercept: 6.691449394053385
RMSE: 1.605896246405295
trainingSummary: org.apache.spark.ml.regression.LinearRegressionTrainingSummary = org.apache.spark.ml.regression.LinearRegressionTrainingSummary@3c0bba63
import org.apache.spark.ml.evaluation.RegressionEvaluator

// make predictions on the test data
val predictions = lrModel.transform(testSet)
predictions.select("prediction", "log_total_cases_per_million", "features").show()

// select (prediction, true label) and compute test error.
val evaluator = new RegressionEvaluator()
  .setLabelCol("log_total_cases_per_million")
  .setPredictionCol("prediction")
  .setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")
+------------------+---------------------------+--------------------+
|        prediction|log_total_cases_per_million|            features|
+------------------+---------------------------+--------------------+
| 6.328494753118636|          2.142543078223737|[0.15958180147058...|
| 8.115061043502033|          7.528810569839765|[0.36167279411764...|
| 7.462997808492604|          6.223671398897003|[0.64889705882352...|
| 7.831137150153838|          6.524287884057365|[0.79779411764705...|
|10.269420769779092|          5.843997267897207|[0.40429687499999...|
| 7.289562258542376|         2.6304048908829563|[0.49471507352941...|
| 9.963465130096218|          9.237218853465539|[0.57444852941176...|
|13.213369998258539|         10.784078124343976|[0.74460018382352...|
| 9.213228147281598|         10.572977149641726|[0.75528492647058...|
| 9.085379666714474|          9.143147511395949|[0.82444852941176...|
| 6.750739656770235|         10.952187342908323|[0.0,3.6806047717...|
| 8.135800825140628|         10.373288860272808|[0.51056985294117...|
| 7.507644816227425|         10.203096222487993|[0.55319393382352...|
|10.048805671611257|          8.817232647019427|[0.56376378676470...|
| 9.404481665413662|         10.158884909113675|[0.61695772058823...|
| 9.488257390217875|         10.351014339169227|[0.64889705882352...|
|  9.08870238448793|         10.291011744026449|[0.71806066176470...|
| 8.899278092318433|         10.041688001727678|[0.72334558823529...|
| 8.899278092318433|         10.041688001727678|[0.72334558823529...|
| 9.353065533403424|          9.427461709192647|[0.75528492647058...|
+------------------+---------------------------+--------------------+
only showing top 20 rows

Root Mean Squared Error (RMSE) on test data = 2.2259062146564705
import org.apache.spark.ml.evaluation.RegressionEvaluator
predictions: org.apache.spark.sql.DataFrame = [features: vector, log_total_cases_per_million: double ... 1 more field]
evaluator: org.apache.spark.ml.evaluation.RegressionEvaluator = RegressionEvaluator: uid=regEval_575198e0fd5f, metricName=rmse, throughOrigin=false
rmse: Double = 2.2259062146564705
val predictions = lrModel.transform(testSet)
display(predictions)
val new_predictions = predictions.withColumn("new_prediction", exp($"prediction")).withColumn("total_cases_per_million",exp($"log_total_cases_per_million")).select("new_prediction", "total_cases_per_million", "features")
display(new_predictions)
// select (prediction, true label) and compute test error.
val evaluator = new RegressionEvaluator()
  .setLabelCol("total_cases_per_million")
  .setPredictionCol("new_prediction")
  .setMetricName("rmse")
val rmse = evaluator.evaluate(new_predictions)
println("Root Mean Squared Error (RMSE) on test data = $rmse")
Root Mean Squared Error (RMSE) on test data = $rmse
evaluator: org.apache.spark.ml.evaluation.RegressionEvaluator = RegressionEvaluator: uid=regEval_dcf763c5af79, metricName=rmse, throughOrigin=false
rmse: Double = 99893.56063834814
  1. Conclusion and Reflections

We've tried several ways to preprocess the consant feature, but still didn't get a good result. We came to the conclusion that only predict the total cases of a country from other countries without considering the history time series values are not resonable. This is because the constant feature columns cannot reflect the total cases well. Therefore, we decided to use some time series methods to predict the value from the history value.