ScaDaMaLe Course site and book

In this model, we use scala to process the data and predict the total cases. In this data set, there are many features which are constant for each country and don’t change with time. So, we tried to predict the total cases on a selected date, from some countries to other countries, without considering the time series.

  1. Import data and preprocess

// You need to uncomment this line if you haven't preprocess data yet. %run "./02_DataPreprocess"
  1. Data process

display(df_cleaned_time_series)
df_cleaned_time_series.printSchema
root |-- iso_code: string (nullable = true) |-- continent: string (nullable = false) |-- location: string (nullable = true) |-- date: string (nullable = true) |-- total_cases: double (nullable = false) |-- new_cases: double (nullable = true) |-- new_cases_smoothed: double (nullable = false) |-- total_deaths: double (nullable = false) |-- new_deaths: double (nullable = true) |-- new_deaths_smoothed: double (nullable = false) |-- reproduction_rate: double (nullable = false) |-- icu_patients: double (nullable = true) |-- icu_patients_per_million: double (nullable = true) |-- hosp_patients: double (nullable = true) |-- hosp_patients_per_million: double (nullable = true) |-- weekly_icu_admissions: double (nullable = true) |-- weekly_icu_admissions_per_million: double (nullable = true) |-- weekly_hosp_admissions: double (nullable = true) |-- weekly_hosp_admissions_per_million: double (nullable = true) |-- total_tests: double (nullable = false) |-- new_tests: double (nullable = true) |-- total_tests_per_thousand: double (nullable = true) |-- new_tests_per_thousand: double (nullable = true) |-- new_tests_smoothed: double (nullable = true) |-- new_tests_smoothed_per_thousand: double (nullable = true) |-- tests_per_case: double (nullable = true) |-- positive_rate: double (nullable = true) |-- tests_units: double (nullable = true) |-- stringency_index: double (nullable = false) |-- population: double (nullable = true) |-- population_density: double (nullable = true) |-- median_age: double (nullable = true) |-- aged_65_older: double (nullable = true) |-- aged_70_older: double (nullable = true) |-- gdp_per_capita: double (nullable = true) |-- extreme_poverty: double (nullable = true) |-- cardiovasc_death_rate: double (nullable = true) |-- diabetes_prevalence: double (nullable = true) |-- female_smokers: double (nullable = true) |-- male_smokers: double (nullable = true) |-- handwashing_facilities: double (nullable = true) |-- hospital_beds_per_thousand: double (nullable = true) |-- life_expectancy: double (nullable = true) |-- human_development_index: double (nullable = true) |-- total_cases_per_million: double (nullable = true) |-- new_cases_per_million: double (nullable = true) |-- new_cases_smoothed_per_million: double (nullable = true) |-- total_deaths_per_million: double (nullable = true) |-- new_deaths_per_million: double (nullable = true) |-- new_deaths_smoothed_per_million: double (nullable = true)
import org.apache.spark.sql.functions._ for (c <- df_cleaned_time_series.columns) { println(c + ": " + df_cleaned_time_series.filter(col(c).isNull).count()) }
iso_code: 0 continent: 0 location: 0 date: 0 total_cases: 0 new_cases: 0 new_cases_smoothed: 0 total_deaths: 0 new_deaths: 0 new_deaths_smoothed: 0 reproduction_rate: 0 icu_patients: 36018 icu_patients_per_million: 36018 hosp_patients: 34870 hosp_patients_per_million: 34870 weekly_icu_admissions: 41062 weekly_icu_admissions_per_million: 41062 weekly_hosp_admissions: 40715 weekly_hosp_admissions_per_million: 40715 total_tests: 0 new_tests: 20510 total_tests_per_thousand: 0 new_tests_per_thousand: 20510 new_tests_smoothed: 18176 new_tests_smoothed_per_thousand: 18176 tests_per_case: 19301 positive_rate: 19749 tests_units: 41600 stringency_index: 0 population: 0 population_density: 0 median_age: 0 aged_65_older: 0 aged_70_older: 0 gdp_per_capita: 0 extreme_poverty: 11168 cardiovasc_death_rate: 0 diabetes_prevalence: 0 female_smokers: 0 male_smokers: 0 handwashing_facilities: 24124 hospital_beds_per_thousand: 0 life_expectancy: 0 human_development_index: 0 total_cases_per_million: 0 new_cases_per_million: 0 new_cases_smoothed_per_million: 0 total_deaths_per_million: 0 new_deaths_per_million: 0 new_deaths_smoothed_per_million: 0 import org.apache.spark.sql.functions._

Prepare the data for training. We choose a day we want to predict, and select the constant features, and select the target column for prediction.

val df_by_location = df_cleaned_time_series.filter($"date" === "2020-12-01").sort($"continent").select($"iso_code",$"stringency_index", $"population",$"population_density",$"gdp_per_capita",$"diabetes_prevalence",$"total_cases_per_million",$"total_cases") display(df_by_location)
df_by_location.count()
res145: Long = 159

Rescale the feature values and the target value.

import org.apache.spark.sql.functions._ import org.apache.spark.sql.Column val min_str_index = df_by_location.select(min($"stringency_index")).first()(0) val max_str_index = df_by_location.select(max($"stringency_index")).first()(0) val min_population = df_by_location.select(min($"population")).first()(0) val max_population = df_by_location.select(max($"population")).first()(0) val min_population_density = df_by_location.select(min($"population_density")).first()(0) val max_population_density = df_by_location.select(max($"population_density")).first()(0) val min_gdp_per_capita = df_by_location.select(min($"gdp_per_capita")).first()(0) val max_gdp_per_capita = df_by_location.select(max($"gdp_per_capita")).first()(0) val min_diabetes_prevalence = df_by_location.select(min($"diabetes_prevalence")).first()(0) val max_diabetes_prevalence = df_by_location.select(max($"diabetes_prevalence")).first()(0) val df_by_location_normalized = df_by_location .withColumn("normal_stringency_index",($"stringency_index" -lit(min_str_index))/(lit(max_str_index)-lit(min_str_index))) .withColumn("normal_population", ($"population" - lit(min_population))/(lit(max_population)-lit(min_population))) .withColumn("normal_population_density",($"population_density" - lit(min_population_density))/(lit(max_population_density) - lit(min_population_density))) .withColumn("normal_gdp_per_capita", ($"gdp_per_capita" - lit(min_gdp_per_capita))/(lit(max_gdp_per_capita)- lit(min_gdp_per_capita))) .withColumn("normal_diabetes_prevalence", ($"diabetes_prevalence" - lit(min_diabetes_prevalence))/lit(max_diabetes_prevalence) - lit(min_diabetes_prevalence)).withColumn("log_total_cases_per_million", log($"total_cases_per_million")).toDF display(df_by_location_normalized)
df_by_location_normalized.printSchema
root |-- iso_code: string (nullable = true) |-- stringency_index: double (nullable = false) |-- population: double (nullable = true) |-- population_density: double (nullable = true) |-- gdp_per_capita: double (nullable = true) |-- diabetes_prevalence: double (nullable = true) |-- total_cases_per_million: double (nullable = true) |-- total_cases: double (nullable = false) |-- normal_stringency_index: double (nullable = true) |-- normal_population: double (nullable = true) |-- normal_population_density: double (nullable = true) |-- normal_gdp_per_capita: double (nullable = true) |-- normal_diabetes_prevalence: double (nullable = true) |-- log_total_cases_per_million: double (nullable = true)
val df_by_location_normalized_selected = df_by_location_normalized.select($"normal_stringency_index",$"normal_population",$"normal_population_density",$"normal_gdp_per_capita", $"normal_diabetes_prevalence",$"log_total_cases_per_million")
df_by_location_normalized_selected: org.apache.spark.sql.DataFrame = [normal_stringency_index: double, normal_population: double ... 4 more fields]
display(df_by_location_normalized_selected)
  1. Linear Regression from selected value to new cases

These values are irrelevant to time, but relevant to country. So we try to predict the total case in some contries from the data in other contries.

df_by_location_normalized_selected.createOrReplaceTempView("covid_table")
import org.apache.spark.ml.feature.VectorAssembler val vectorizer = new VectorAssembler() .setInputCols(Array("normal_stringency_index", "normal_population", "normal_population_density", "normal_gdp_per_capita", "normal_diabetes_prevalence")) .setOutputCol("features") // make a DataFrame called dataset from the table val dataset = vectorizer.transform(df_by_location_normalized_selected).select("features","log_total_cases_per_million")
import org.apache.spark.ml.feature.VectorAssembler vectorizer: org.apache.spark.ml.feature.VectorAssembler = VectorAssembler: uid=vecAssembler_a8c5337c1334, handleInvalid=error, numInputCols=5 dataset: org.apache.spark.sql.DataFrame = [features: vector, log_total_cases_per_million: double]
display(dataset)
var Array(split20, split80) = dataset.randomSplit(Array(0.20, 0.80), 1800009193L)
split20: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [features: vector, log_total_cases_per_million: double] split80: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [features: vector, log_total_cases_per_million: double]
val testSet = split20.cache() val trainingSet = split80.cache()
testSet: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [features: vector, log_total_cases_per_million: double] trainingSet: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [features: vector, log_total_cases_per_million: double]
testSet.count() // action to actually cache
res156: Long = 26
trainingSet.count() // action to actually cache
res157: Long = 133
import org.apache.spark.ml.regression.LinearRegression import org.apache.spark.ml.regression.LinearRegressionModel import org.apache.spark.ml.Pipeline // Let's initialize our linear regression learner val lr = new LinearRegression() // We use explain params to dump the parameters we can use lr.explainParams() // Now we set the parameters for the method lr.setPredictionCol("prediction") .setLabelCol("log_total_cases_per_million") .setMaxIter(100) .setRegParam(0.1) val lrModel = lr.fit(trainingSet)
import org.apache.spark.ml.regression.LinearRegression import org.apache.spark.ml.regression.LinearRegressionModel import org.apache.spark.ml.Pipeline lr: org.apache.spark.ml.regression.LinearRegression = linReg_04758f25dc55 lrModel: org.apache.spark.ml.regression.LinearRegressionModel = LinearRegressionModel: uid=linReg_04758f25dc55, numFeatures=5
val trainingSummary = lrModel.summary println(s"Coefficients: ${lrModel.coefficients}, Intercept: ${lrModel.intercept}") println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")
Coefficients: [2.6214237261445112,-1.3643062210132013,-1.3234981005291635,4.903123743799173,1.0283056897021852], Intercept: 6.691449394053385 RMSE: 1.605896246405295 trainingSummary: org.apache.spark.ml.regression.LinearRegressionTrainingSummary = org.apache.spark.ml.regression.LinearRegressionTrainingSummary@3c0bba63
import org.apache.spark.ml.evaluation.RegressionEvaluator // make predictions on the test data val predictions = lrModel.transform(testSet) predictions.select("prediction", "log_total_cases_per_million", "features").show() // select (prediction, true label) and compute test error. val evaluator = new RegressionEvaluator() .setLabelCol("log_total_cases_per_million") .setPredictionCol("prediction") .setMetricName("rmse") val rmse = evaluator.evaluate(predictions) println(s"Root Mean Squared Error (RMSE) on test data = $rmse")
+------------------+---------------------------+--------------------+ | prediction|log_total_cases_per_million| features| +------------------+---------------------------+--------------------+ | 6.328494753118636| 2.142543078223737|[0.15958180147058...| | 8.115061043502033| 7.528810569839765|[0.36167279411764...| | 7.462997808492604| 6.223671398897003|[0.64889705882352...| | 7.831137150153838| 6.524287884057365|[0.79779411764705...| |10.269420769779092| 5.843997267897207|[0.40429687499999...| | 7.289562258542376| 2.6304048908829563|[0.49471507352941...| | 9.963465130096218| 9.237218853465539|[0.57444852941176...| |13.213369998258539| 10.784078124343976|[0.74460018382352...| | 9.213228147281598| 10.572977149641726|[0.75528492647058...| | 9.085379666714474| 9.143147511395949|[0.82444852941176...| | 6.750739656770235| 10.952187342908323|[0.0,3.6806047717...| | 8.135800825140628| 10.373288860272808|[0.51056985294117...| | 7.507644816227425| 10.203096222487993|[0.55319393382352...| |10.048805671611257| 8.817232647019427|[0.56376378676470...| | 9.404481665413662| 10.158884909113675|[0.61695772058823...| | 9.488257390217875| 10.351014339169227|[0.64889705882352...| | 9.08870238448793| 10.291011744026449|[0.71806066176470...| | 8.899278092318433| 10.041688001727678|[0.72334558823529...| | 8.899278092318433| 10.041688001727678|[0.72334558823529...| | 9.353065533403424| 9.427461709192647|[0.75528492647058...| +------------------+---------------------------+--------------------+ only showing top 20 rows Root Mean Squared Error (RMSE) on test data = 2.2259062146564705 import org.apache.spark.ml.evaluation.RegressionEvaluator predictions: org.apache.spark.sql.DataFrame = [features: vector, log_total_cases_per_million: double ... 1 more field] evaluator: org.apache.spark.ml.evaluation.RegressionEvaluator = RegressionEvaluator: uid=regEval_575198e0fd5f, metricName=rmse, throughOrigin=false rmse: Double = 2.2259062146564705
val predictions = lrModel.transform(testSet) display(predictions)
val new_predictions = predictions.withColumn("new_prediction", exp($"prediction")).withColumn("total_cases_per_million",exp($"log_total_cases_per_million")).select("new_prediction", "total_cases_per_million", "features") display(new_predictions)
// select (prediction, true label) and compute test error. val evaluator = new RegressionEvaluator() .setLabelCol("total_cases_per_million") .setPredictionCol("new_prediction") .setMetricName("rmse") val rmse = evaluator.evaluate(new_predictions) println("Root Mean Squared Error (RMSE) on test data = $rmse")
Root Mean Squared Error (RMSE) on test data = $rmse evaluator: org.apache.spark.ml.evaluation.RegressionEvaluator = RegressionEvaluator: uid=regEval_dcf763c5af79, metricName=rmse, throughOrigin=false rmse: Double = 99893.56063834814
  1. Conclusion and Reflections

We've tried several ways to preprocess the consant feature, but still didn't get a good result. We came to the conclusion that only predict the total cases of a country from other countries without considering the history time series values are not resonable. This is because the constant feature columns cannot reflect the total cases well. Therefore, we decided to use some time series methods to predict the value from the history value.