ScaDaMaLe Course site and book

display(dbutils.fs.ls("/datasets/group12/"))
path name size
dbfs:/datasets/group12/20_12_04_08_31_44.csv 20_12_04_08_31_44.csv 1.4181338e7
dbfs:/datasets/group12/20_12_04_08_32_40.csv 20_12_04_08_32_40.csv 1.4181338e7
dbfs:/datasets/group12/20_12_04_10_47_08.csv 20_12_04_10_47_08.csv 1.4190774e7
dbfs:/datasets/group12/21_01_07_08_50_05.csv 21_01_07_08_50_05.csv 1.4577033e7
dbfs:/datasets/group12/21_01_07_09_05_33.csv 21_01_07_09_05_33.csv 1.4577033e7
dbfs:/datasets/group12/analysis/ analysis/ 0.0
dbfs:/datasets/group12/chkpoint/ chkpoint/ 0.0

Load parquet file

val df = spark.read.parquet("dbfs:/datasets/group12/analysis/*.parquet")

display(df)

Load csv file

-> %scala //if want to load csv

val filelocation = "/datasets/group12/201204104708.csv" val file_type = "csv"

// CSV options val inferschema = "true" val firstrowisheader = "true" val delimiter = ","

// The applied options are for CSV files. For other file types, these will be ignored. val df = spark.read.format(filetype) .option("inferSchema", inferschema) .option("header", firstrowisheader) .option("sep", delimiter) .load(filelocation)

display(df)

Number of data

df.count()
res19: Long = 60544

Missing Features in data due to multiple web resource

import org.apache.spark.sql.functions._

for (c <- df.columns) {
  println(c + ": " + df.filter(col(c).isNull).count())
}

Here shows HK does not have meaningful value and there is one unknown international location in data.

display(df.filter($"location"==="Hong Kong" || $"iso_code".isNull)) //HK data iteself is not complete for all dates, and all available data is null! HAVE TO FILTER IT OUT COMPLETELY

190 valid countries data to continue

val df_filteredLocation = df.filter($"iso_code"=!="HKG").filter($"iso_code".isNotNull)
display(df_filteredLocation.select($"location").distinct()) // 190 valid countries 

Fill missing continent value for World aggregate data NOTE: it will be filled as "World"

display(df_filteredLocation.where($"continent".isNull))
val df_fillContinentNull = df_filteredLocation.na.fill("World",Array("continent"))
display(df_fillContinentNull)
df_fillContinentNull.count()
res27: Long = 60158
import org.apache.spark.sql.functions._

for (c <- df_fillContinentNull.columns) {
  println(c + ": " + df_fillContinentNull.filter(col(c).isNull).count())
}
display(df_fillContinentNull.select($"date",$"iso_code").groupBy($"iso_code").count())  // some country starts logging data earlier
val df_filtered_date = df_fillContinentNull.filter($"date">"2020-01-22")
df_filtered_date: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [iso_code: string, continent: string ... 48 more fields]
display(df_filtered_date.select($"date",$"iso_code").groupBy($"iso_code").count())  // all countries have 316 days logging
display(df_filtered_date.select($"date",$"iso_code", $"total_cases", $"total_deaths", $"new_cases", $"new_deaths", $"new_cases_smoothed", $"new_deaths_smoothed").filter($"new_cases_smoothed".isNull || $"new_deaths_smoothed".isNull))

All missing data of newcasessmoothed and newdeathssmoothed from early, so just fill with 0

display(df_filtered_date.select($"date",$"iso_code", $"total_cases", $"total_deaths", $"new_cases", $"new_deaths", $"new_cases_smoothed", $"new_deaths_smoothed")
        .filter($"new_cases_smoothed".isNull || $"new_deaths_smoothed".isNull).select($"date").distinct())
date
2020-01-23
2020-01-27
2020-01-24
2020-01-26
2020-01-25
val df_fillNullForSmooth = df_filtered_date.na.fill(0,Array("new_cases_smoothed"))
                           .na.fill(0,Array("new_deaths_smoothed"))
display(df_fillNullForSmooth)

Fill totaldeaths and totalcases null value

Strictly, when newcases is always 0, totalcases could be imputed as 0. The same apply to total_deaths

val df_NULL_total_cases = df_fillNullForSmooth.select($"date",$"iso_code", $"total_cases", $"total_deaths", $"new_cases", $"new_deaths", $"new_cases_smoothed", $"new_deaths_smoothed")
                          .filter($"total_cases".isNull)


display(df_NULL_total_cases.filter($"new_cases"===0).groupBy("iso_code").count())

When totalcase is Null, all previous newcases is always 0.

df_NULL_total_cases.filter($"total_cases".isNull).groupBy("iso_code").count().except(df_NULL_total_cases.filter($"new_cases"===0).groupBy("iso_code").count()).show() // When total_case is Null, all new_cases is always 0
+--------+-----+
|iso_code|count|
+--------+-----+
+--------+-----+
val df_fillNullForTotalCases = df_fillNullForSmooth.na.fill(0, Array("total_cases"))
                               
display(df_fillNullForTotalCases)
val df_NULL_total_death = df_fillNullForTotalCases.select($"date",$"iso_code", $"total_cases", $"total_deaths", $"new_cases", $"new_deaths", $"new_cases_smoothed", $"new_deaths_smoothed")
                          .filter($"total_deaths".isNull)


display(df_NULL_total_death.filter($"new_deaths"===0).groupBy("iso_code").count().sort())

If totaldeaths is Null when all newdeaths is always 0, then we could simply assign 0 for NULL, otherwise need to investigate more.

Three countries (ISL, PNG, SVK) have abnormal correction on new_cases data.

val abnormal_countries = df_NULL_total_death.filter($"total_deaths".isNull).groupBy("iso_code").count().except(df_NULL_total_death.filter($"new_deaths"===0).groupBy("iso_code").count())
abnormal_countries.show()
df_NULL_total_death.filter($"new_deaths"===0).groupBy("iso_code").count().except(df_NULL_total_death.filter($"total_deaths".isNull).groupBy("iso_code").count()).show()
+--------+-----+
|iso_code|count|
+--------+-----+
|     PNG|  186|
|     SVK|   65|
|     ISL|   54|
+--------+-----+

+--------+-----+
|iso_code|count|
+--------+-----+
|     PNG|  185|
|     SVK|   64|
|     ISL|   52|
+--------+-----+

abnormal_countries: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [iso_code: string, count: bigint]

show abnormal death correction

display(df_fillNullForSmooth.filter($"iso_code"==="ISL").sort("date").filter($"date">"2020-03-13" && $"date"<"2020-03-22")) // death data correction between 2020-03-14 and 2020-03-21, total_deaths -> all 0, new_deaths -> all 0, new_deaths_smoothed -> all 0
iso_code continent location date total_cases new_cases new_cases_smoothed total_deaths new_deaths new_deaths_smoothed total_cases_per_million new_cases_per_million new_cases_smoothed_per_million total_deaths_per_million new_deaths_per_million new_deaths_smoothed_per_million reproduction_rate icu_patients icu_patients_per_million hosp_patients hosp_patients_per_million weekly_icu_admissions weekly_icu_admissions_per_million weekly_hosp_admissions weekly_hosp_admissions_per_million total_tests new_tests total_tests_per_thousand new_tests_per_thousand new_tests_smoothed new_tests_smoothed_per_thousand tests_per_case positive_rate tests_units stringency_index population population_density median_age aged_65_older aged_70_older gdp_per_capita extreme_poverty cardiovasc_death_rate diabetes_prevalence female_smokers male_smokers handwashing_facilities hospital_beds_per_thousand life_expectancy human_development_index
ISL Europe Iceland 2020-03-14 156.0 22.0 15.143 null 0.0 0.0 457.143 64.469 44.375 null 0.0 0.0 1.62 1.0 2.93 3.0 8.791 null null null null 1827.0 323.0 5.354 0.947 198.0 0.58 7.6e-2 13.1 null 16.67 341250.0 3.404 37.3 14.431 9.207 46482.958 0.2 117.992 5.31 14.3 15.2 null 2.91 82.99 0.935
ISL Europe Iceland 2020-03-15 171.0 15.0 17.286 5.0 5.0 0.714 501.099 43.956 50.654 14.652 14.652 2.093 1.61 2.0 5.861 3.0 8.791 null null null null 2902.0 1075.0 8.504 3.15 346.0 1.014 5.0e-2 20.0 null 25.0 341250.0 3.404 37.3 14.431 9.207 46482.958 0.2 117.992 5.31 14.3 15.2 null 2.91 82.99 0.935
ISL Europe Iceland 2020-03-16 180.0 9.0 17.429 null -5.0 0.0 527.473 26.374 51.073 null -14.652 0.0 1.63 2.0 5.861 4.0 11.722 null null null null 4609.0 1707.0 13.506 5.002 576.0 1.688 3.0e-2 33.0 null 50.93 341250.0 3.404 37.3 14.431 9.207 46482.958 0.2 117.992 5.31 14.3 15.2 null 2.91 82.99 0.935
ISL Europe Iceland 2020-03-17 220.0 40.0 21.571 1.0 1.0 0.143 644.689 117.216 63.213 2.93 2.93 0.419 1.7 2.0 5.861 5.0 14.652 null null null null 6009.0 1400.0 17.609 4.103 752.0 2.204 2.9e-2 34.9 null 50.93 341250.0 3.404 37.3 14.431 9.207 46482.958 0.2 117.992 5.31 14.3 15.2 null 2.91 82.99 0.935
ISL Europe Iceland 2020-03-18 250.0 30.0 23.571 1.0 0.0 0.143 732.601 87.912 69.074 2.93 0.0 0.419 1.73 0.0 0.0 6.0 17.582 null null null null 7837.0 1828.0 22.966 5.357 992.0 2.907 2.4e-2 42.1 null 50.93 341250.0 3.404 37.3 14.431 9.207 46482.958 0.2 117.992 5.31 14.3 15.2 null 2.91 82.99 0.935
ISL Europe Iceland 2020-03-19 330.0 80.0 32.429 1.0 0.0 0.143 967.033 234.432 95.029 2.93 0.0 0.419 1.78 1.0 2.93 6.0 17.582 null null null null 9148.0 1311.0 26.807 3.842 1143.0 3.349 2.8e-2 35.2 null 50.93 341250.0 3.404 37.3 14.431 9.207 46482.958 0.2 117.992 5.31 14.3 15.2 null 2.91 82.99 0.935
ISL Europe Iceland 2020-03-20 409.0 79.0 39.286 null -1.0 0.0 1198.535 231.502 115.123 null -2.93 0.0 1.75 1.0 2.93 10.0 29.304 null null null null 9727.0 579.0 28.504 1.697 1175.0 3.443 3.3e-2 29.9 null 53.7 341250.0 3.404 37.3 14.431 9.207 46482.958 0.2 117.992 5.31 14.3 15.2 null 2.91 82.99 0.935
ISL Europe Iceland 2020-03-21 473.0 64.0 45.286 1.0 1.0 0.143 1386.081 187.546 132.705 2.93 2.93 0.419 1.68 1.0 2.93 12.0 35.165 null null null null 10077.0 350.0 29.53 1.026 1179.0 3.455 3.8e-2 26.0 null 53.7 341250.0 3.404 37.3 14.431 9.207 46482.958 0.2 117.992 5.31 14.3 15.2 null 2.91 82.99 0.935
display(df_fillNullForSmooth.filter($"iso_code"==="PNG").sort("date").filter($"date">"2020-07-19" && $"date"<"2020-07-24" )) // death data correction between 2020-07-20 and 2020-07-22, total_deaths -> all 0, new_deaths -> all 0, new_deaths_smoothed -> all 0
iso_code continent location date total_cases new_cases new_cases_smoothed total_deaths new_deaths new_deaths_smoothed total_cases_per_million new_cases_per_million new_cases_smoothed_per_million total_deaths_per_million new_deaths_per_million new_deaths_smoothed_per_million reproduction_rate icu_patients icu_patients_per_million hosp_patients hosp_patients_per_million weekly_icu_admissions weekly_icu_admissions_per_million weekly_hosp_admissions weekly_hosp_admissions_per_million total_tests new_tests total_tests_per_thousand new_tests_per_thousand new_tests_smoothed new_tests_smoothed_per_thousand tests_per_case positive_rate tests_units stringency_index population population_density median_age aged_65_older aged_70_older gdp_per_capita extreme_poverty cardiovasc_death_rate diabetes_prevalence female_smokers male_smokers handwashing_facilities hospital_beds_per_thousand life_expectancy human_development_index
PNG Oceania Papua New Guinea 2020-07-20 19.0 3.0 1.143 1.0 1.0 0.143 2.124 0.335 0.128 0.112 0.112 1.6e-2 null null null null null null null null null null null null null null null null null null 45.37 8947027.0 18.22 22.6 3.808 2.142 3823.194 null 561.494 17.65 23.5 48.8 null null 64.5 0.544
PNG Oceania Papua New Guinea 2020-07-21 27.0 8.0 2.286 1.0 0.0 0.143 3.018 0.894 0.255 0.112 0.0 1.6e-2 null null null null null null null null null null null null null null null null null null 45.37 8947027.0 18.22 22.6 3.808 2.142 3823.194 null 561.494 17.65 23.5 48.8 null null 64.5 0.544
PNG Oceania Papua New Guinea 2020-07-22 30.0 3.0 2.714 null -1.0 0.0 3.353 0.335 0.303 null -0.112 0.0 null null null null null null null null null null null null null null null null null null 45.37 8947027.0 18.22 22.6 3.808 2.142 3823.194 null 561.494 17.65 23.5 48.8 null null 64.5 0.544
PNG Oceania Papua New Guinea 2020-07-23 31.0 1.0 2.857 null 0.0 0.0 3.465 0.112 0.319 null 0.0 0.0 null null null null null null null null null null null null null null null null null null 45.37 8947027.0 18.22 22.6 3.808 2.142 3823.194 null 561.494 17.65 23.5 48.8 null null 64.5 0.544
display(df_fillNullForSmooth.filter($"iso_code"==="SVK").sort("date").filter($"date">"2020-03-16" && $"date"<"2020-03-23")) // death data correction between 2020-03-18 and 2020-03-22, total_deaths -> all 0, new_deaths -> all 0, new_deaths_smoothed -> all 0
iso_code continent location date total_cases new_cases new_cases_smoothed total_deaths new_deaths new_deaths_smoothed total_cases_per_million new_cases_per_million new_cases_smoothed_per_million total_deaths_per_million new_deaths_per_million new_deaths_smoothed_per_million reproduction_rate icu_patients icu_patients_per_million hosp_patients hosp_patients_per_million weekly_icu_admissions weekly_icu_admissions_per_million weekly_hosp_admissions weekly_hosp_admissions_per_million total_tests new_tests total_tests_per_thousand new_tests_per_thousand new_tests_smoothed new_tests_smoothed_per_thousand tests_per_case positive_rate tests_units stringency_index population population_density median_age aged_65_older aged_70_older gdp_per_capita extreme_poverty cardiovasc_death_rate diabetes_prevalence female_smokers male_smokers handwashing_facilities hospital_beds_per_thousand life_expectancy human_development_index
SVK Europe Slovakia 2020-03-17 72.0 9.0 9.286 null 0.0 0.0 13.188 1.648 1.701 null 0.0 0.0 null null null null null null null null null 1913.0 318.0 0.35 5.8e-2 173.0 3.2e-2 5.4e-2 18.6 null 75.0 5459643.0 113.128 41.2 15.07 9.167 30155.152 0.7 287.959 7.29 23.1 37.7 null 5.82 77.54 0.855
SVK Europe Slovakia 2020-03-18 105.0 33.0 13.571 1.0 1.0 0.143 19.232 6.044 2.486 0.183 0.183 2.6e-2 null null null null null null null null null 2138.0 225.0 0.392 4.1e-2 192.0 3.5e-2 7.1e-2 14.1 null 75.0 5459643.0 113.128 41.2 15.07 9.167 30155.152 0.7 287.959 7.29 23.1 37.7 null 5.82 77.54 0.855
SVK Europe Slovakia 2020-03-19 123.0 18.0 15.286 1.0 0.0 0.143 22.529 3.297 2.8 0.183 0.0 2.6e-2 1.19 null null null null null null null null 2439.0 301.0 0.447 5.5e-2 221.0 4.0e-2 6.9e-2 14.5 null 75.0 5459643.0 113.128 41.2 15.07 9.167 30155.152 0.7 287.959 7.29 23.1 37.7 null 5.82 77.54 0.855
SVK Europe Slovakia 2020-03-20 137.0 14.0 15.0 1.0 0.0 0.143 25.093 2.564 2.747 0.183 0.0 2.6e-2 1.19 null null null null null null null null 2807.0 368.0 0.514 6.7e-2 265.0 4.9e-2 5.7e-2 17.7 null 75.0 5459643.0 113.128 41.2 15.07 9.167 30155.152 0.7 287.959 7.29 23.1 37.7 null 5.82 77.54 0.855
SVK Europe Slovakia 2020-03-21 178.0 41.0 19.143 1.0 0.0 0.143 32.603 7.51 3.506 0.183 0.0 2.6e-2 1.19 null null null null null null null null 3247.0 440.0 0.595 8.1e-2 300.0 5.5e-2 6.4e-2 15.7 null 75.0 5459643.0 113.128 41.2 15.07 9.167 30155.152 0.7 287.959 7.29 23.1 37.7 null 5.82 77.54 0.855
SVK Europe Slovakia 2020-03-22 185.0 7.0 18.714 null -1.0 0.0 33.885 1.282 3.428 null -0.183 0.0 1.18 null null null null null null null null 3489.0 242.0 0.639 4.4e-2 293.0 5.4e-2 6.4e-2 15.7 null 75.0 5459643.0 113.128 41.2 15.07 9.167 30155.152 0.7 287.959 7.29 23.1 37.7 null 5.82 77.54 0.855

Correct new_deaths correction back to 0

val df_fillNullForTotalDeathsSpecial = df_fillNullForTotalCases.withColumn("total_deaths_correct", 
                                        when(col("iso_code").equalTo("ISL")&&(col("date")>"2020-03-13" && col("date")<"2020-03-22"),0)
                                       .when(col("iso_code").equalTo("PNG")&&(col("date")>"2020-07-19" && col("date")<"2020-07-23"),0)
                                       .when(col("iso_code").equalTo("SVK")&&(col("date")>"2020-03-17" && col("date")<"2020-03-23"),0).otherwise(col("total_deaths")))
                            .withColumn("new_deaths_correct", 
                                        when(col("iso_code").equalTo("ISL")&&(col("date")>"2020-03-13" && col("date")<"2020-03-22"),0)
                                       .when(col("iso_code").equalTo("PNG")&&(col("date")>"2020-07-19" && col("date")<"2020-07-23"),0)
                                       .when(col("iso_code").equalTo("SVK")&&(col("date")>"2020-03-17" && col("date")<"2020-03-23"),0).otherwise(col("new_deaths")))
                            .withColumn("new_deaths_smoothed_correct", 
                                        when(col("iso_code").equalTo("ISL")&&(col("date")>"2020-03-13" && col("date")<"2020-03-22"),0)
                                       .when(col("iso_code").equalTo("PNG")&&(col("date")>"2020-07-19" && col("date")<"2020-07-23"),0)
                                       .when(col("iso_code").equalTo("SVK")&&(col("date")>"2020-03-17" && col("date")<"2020-03-23"),0).otherwise(col("new_deaths_smoothed")))

df_fillNullForTotalDeathsSpecial: org.apache.spark.sql.DataFrame = [iso_code: string, continent: string ... 51 more fields]

Expect to see an empty table, so correction is right

val df_NULL_total_death_ = df_fillNullForTotalDeathsSpecial.select($"date",$"iso_code", $"total_cases", $"total_deaths_correct", $"new_cases", $"new_deaths_correct", $"new_cases_smoothed", $"new_deaths_smoothed_correct")
                          .filter($"total_deaths_correct".isNull)


df_NULL_total_death_.filter($"total_deaths_correct".isNull).groupBy("iso_code").count().except(df_NULL_total_death_.filter($"new_deaths_correct"===0).groupBy("iso_code").count()).show()
+--------+-----+
|iso_code|count|
+--------+-----+
+--------+-----+

df_NULL_total_death_: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [date: string, iso_code: string ... 6 more fields]

fill rest NULL value for total_death.

val df_fillNullForTotalDeaths = df_fillNullForTotalDeathsSpecial
                                  .drop("new_deaths", "total_deaths", "new_deaths_smoothed") // drop old column to rename
                                  .withColumnRenamed("new_deaths_correct","new_deaths")
                                  .withColumnRenamed("total_deaths_correct","total_deaths")
                                  .withColumnRenamed("new_deaths_smoothed_correct","new_deaths_smoothed")
                                  .na.fill(0, Array("total_deaths"))
                                  .select(df.columns.head, df.columns.tail: _*)
display(df_fillNullForTotalDeaths)

All first 10 column is clean now!

(All code above is for illustration, for processing just run cell below )

// filter unknow and HK data
val df_filteredLocation = df.filter($"iso_code"=!="HKG").filter($"iso_code".isNotNull)

// fill missing continent value for World data
val df_fillContinentNull = df_filteredLocation.na.fill("World",Array("continent")).cache
df_filteredLocation.unpersist()

// filter date before 2020-01-23
val df_filtered_date = df_fillContinentNull.filter($"date">"2020-01-22").cache
df_fillContinentNull.unpersist()

// fill missing for new_cases_smoothed and new_deaths_smoothed
val df_fillNullForSmooth = df_filtered_date.na.fill(0,Array("new_cases_smoothed"))
                           .na.fill(0,Array("new_deaths_smoothed"))
                           .cache
df_filtered_date.unpersist()

// fill missing for total_cases
val df_fillNullForTotalCases = df_fillNullForSmooth.na.fill(0, Array("total_cases")).cache
df_fillNullForSmooth.unpersist()

// correct total_deaths, new_deaths, new_deaths_smoothed
val df_fillNullForTotalDeathsSpecial = df_fillNullForTotalCases.withColumn("total_deaths_correct", 
                                        when(col("iso_code").equalTo("ISL")&&(col("date")>"2020-03-13" && col("date")<"2020-03-22"),0)
                                       .when(col("iso_code").equalTo("PNG")&&(col("date")>"2020-07-19" && col("date")<"2020-07-23"),0)
                                       .when(col("iso_code").equalTo("SVK")&&(col("date")>"2020-03-17" && col("date")<"2020-03-23"),0).otherwise(col("total_deaths")))
                            .withColumn("new_deaths_correct", 
                                        when(col("iso_code").equalTo("ISL")&&(col("date")>"2020-03-13" && col("date")<"2020-03-22"),0)
                                       .when(col("iso_code").equalTo("PNG")&&(col("date")>"2020-07-19" && col("date")<"2020-07-23"),0)
                                       .when(col("iso_code").equalTo("SVK")&&(col("date")>"2020-03-17" && col("date")<"2020-03-23"),0).otherwise(col("new_deaths")))
                            .withColumn("new_deaths_smoothed_correct", 
                                        when(col("iso_code").equalTo("ISL")&&(col("date")>"2020-03-13" && col("date")<"2020-03-22"),0)
                                       .when(col("iso_code").equalTo("PNG")&&(col("date")>"2020-07-19" && col("date")<"2020-07-23"),0)
                                       .when(col("iso_code").equalTo("SVK")&&(col("date")>"2020-03-17" && col("date")<"2020-03-23"),0).otherwise(col("new_deaths_smoothed")))
                            .cache
df_fillNullForTotalCases.unpersist()

val df_cleaned = df_fillNullForTotalDeathsSpecial
                                  .drop("new_deaths", "total_deaths", "new_deaths_smoothed") // drop old column to rename
                                  .withColumnRenamed("new_deaths_correct","new_deaths")
                                  .withColumnRenamed("total_deaths_correct","total_deaths")
                                  .withColumnRenamed("new_deaths_smoothed_correct","new_deaths_smoothed")
                                  .na.fill(0, Array("total_deaths"))
                                  .select(df.columns.head, df.columns.tail: _*)
                                  .cache
df_fillNullForTotalDeathsSpecial.unpersist()

display(df_cleaned)
import org.apache.spark.sql.functions._

for (c <- df_cleaned.columns) {
  println(c + ": " + df_cleaned.filter(col(c).isNull).count())
}

3. select invariant (during pandemic) features for clustering

double check whether they are constant for each country, and if not, change all the value to mean and filter out countries that have missing constant features

Candidate list: - population - populationdensity - medianage - aged65older - aged70older - gdppercapita

  • cardiovascdeathrate
  • diabetes_prevalence
  • female_smokers
  • male_smokers
  • hospitalbedsper_thousand
  • life_expectancy
  • humandevelopmentindex
val df_invariantFeatures = df_cleaned.select($"location", $"population",$"population_density",
                                             $"median_age", $"aged_65_older",
                                             $"aged_70_older",$"gdp_per_capita",
                                             $"cardiovasc_death_rate",$"diabetes_prevalence",
                                             $"female_smokers",$"male_smokers",$"hospital_beds_per_thousand",
                                             $"life_expectancy",$"human_development_index")
display(df_invariantFeatures)
display(df_invariantFeatures.describe())
summary location population population_density median_age aged_65_older aged_70_older gdp_per_capita cardiovasc_death_rate diabetes_prevalence female_smokers male_smokers hospital_beds_per_thousand life_expectancy human_development_index
count 60104 60104 58524 57260 56312 56944 57260 57892 58524 44928 44296 52835 59788 57576
mean null 8.179034692057101E7 305.56040665368073 30.204722319245512 8.595353423781793 5.418455904046091 18397.155660565862 262.00796225730636 7.89011004032537 10.368771011396003 32.645260520137235 3.000101788587119 72.8302766441427 0.7086279005141026
stddev null 5.801702332960505E8 1534.259203474429 9.077112325166668 6.17972101589387 4.214972529727923 19409.896953039588 120.63832428074626 4.2077256136575105 10.396263307877218 13.558764723459655 2.438486660152889 7.538806415792373 0.15398160968293945
min Afghanistan 809.0 1.98 15.1 1.144 0.526 661.24 79.37 0.99 0.1 7.7 0.1 53.28 0.354
max Zimbabwe 7.794798729E9 19347.5 48.2 27.049 18.493 116935.6 724.417 30.53 44.0 78.1 13.8 86.75 0.953
for (c <- df_invariantFeatures.columns) {
  println(c + ": " + df_invariantFeatures.filter(col(c).isNull).count())
}
location: 0
population: 0
population_density: 1580
median_age: 2844
aged_65_older: 3792
aged_70_older: 3160
gdp_per_capita: 2844
cardiovasc_death_rate: 2212
diabetes_prevalence: 1580
female_smokers: 15176
male_smokers: 15808
hospital_beds_per_thousand: 7269
life_expectancy: 316
human_development_index: 2528

Although some countries seems like an outlier, it does have constant femalesmokers and malesmokers

val constant_feature_checker = df_cleaned.groupBy("location")
          .agg(
              //stddev("stringency_index").as("std_si"),         
              stddev("population").as("std_pop"),           
              stddev("population_density").as("std_pd"),
              stddev("median_age").as("std_ma"),         
              stddev("aged_65_older").as("std_a65"),           
              stddev("aged_70_older").as("std_a70"),  
              stddev("gdp_per_capita").as("std_gdp"),
              stddev("cardiovasc_death_rate").as("std_cdr"),         
              stddev("diabetes_prevalence").as("std_dp"),           
              stddev("female_smokers").as("std_fs"),      
              stddev("male_smokers").as("std_ms"),        
              stddev("hospital_beds_per_thousand").as("std_hbpt"),           
              stddev("life_expectancy").as("std_le"),
              stddev("human_development_index").as("std_hdi")
            )
           .where(
                  (col("std_pop") > 0) || (col("std_pd") > 1e-20) || (col("std_ma") > 0) || (col("std_a65") > 0) || (col("std_a70") > 0) || (col("std_gdp") > 0 ||
                   col("std_cdr") > 0) || (col("std_dp") > 0) || (col("std_fs") > 0) || (col("std_ms") > 0) || (col("std_hbpt") > 0) || (col("std_le") > 0) || (col("std_hdi") > 0))

display(constant_feature_checker)

Each country have some constant features always

val distinct_features = df_invariantFeatures.distinct()

display(distinct_features)

In total, 126 countries have complete features

val valid_distinct_features = distinct_features.filter($"population".isNotNull && $"population_density".isNotNull && $"median_age".isNotNull && 
                         $"aged_65_older".isNotNull && $"aged_70_older".isNotNull && $"gdp_per_capita".isNotNull &&
                         $"cardiovasc_death_rate".isNotNull && $"diabetes_prevalence".isNotNull && $"female_smokers".isNotNull && 
                         $"male_smokers".isNotNull && $"hospital_beds_per_thousand".isNotNull && $"life_expectancy".isNotNull &&
                         $"human_development_index".isNotNull)
display(valid_distinct_features)

country list

display(valid_distinct_features.select($"location"))
valid_distinct_features.select($"location").count()
res69: Long = 126
val df_cleaned_feature = df_cleaned.filter($"location".isin(valid_distinct_features.select($"location").rdd.map(r => r(0)).collect().toSeq: _*))

display(df_cleaned_feature)

All data contains complete list of invariant time feature

(All code above is for illustration, for processing just run cell below )

// select invariant features
val df_invariantFeatures = df_cleaned.select($"location", $"population",$"population_density",
                                             $"median_age", $"aged_65_older",
                                             $"aged_70_older",$"gdp_per_capita",
                                             $"cardiovasc_death_rate",$"diabetes_prevalence",
                                             $"female_smokers",$"male_smokers",$"hospital_beds_per_thousand",
                                             $"life_expectancy",$"human_development_index").cache

// Extract valid distrinct features RDD
val valid_distinct_features = df_invariantFeatures.distinct()
                                 .filter($"population".isNotNull && $"population_density".isNotNull && $"median_age".isNotNull && 
                                 $"aged_65_older".isNotNull && $"aged_70_older".isNotNull && $"gdp_per_capita".isNotNull &&
                                 $"cardiovasc_death_rate".isNotNull && $"diabetes_prevalence".isNotNull && $"female_smokers".isNotNull && 
                                 $"male_smokers".isNotNull && $"hospital_beds_per_thousand".isNotNull && $"life_expectancy".isNotNull &&
                                 $"human_development_index".isNotNull).cache

df_invariantFeatures.unpersist()

// filter out NULL feature countries
val df_cleaned_feature = df_cleaned.filter($"location".isin(valid_distinct_features.select($"location").rdd.map(r => r(0)).collect().toSeq: _*)).cache

df_cleaned.unpersist()

display(df_cleaned_feature)
import org.apache.spark.sql.functions._

for (c <- df_cleaned_feature.columns) {
  println(c + ": " + df_cleaned_feature.filter(col(c).isNull).count())
}

4. Imputing missing time series data of

  • totalcasesper_million
  • newcasesper_million
  • newcasessmoothedpermillion
  • totaldeathsper_million
  • newdeathsper_million
  • newdeathssmoothedpermillion
val per_million_data = df_cleaned_feature.select($"location", $"date", $"iso_code", $"total_cases", 
                                                 $"total_deaths", $"new_cases", $"new_deaths", $"new_cases_smoothed", 
                                                 $"new_deaths_smoothed", $"population", $"population_density", 
                                                 $"total_cases_per_million", $"new_cases_per_million", $"new_cases_smoothed_per_million", 
                                                 $"total_deaths_per_million", $"new_deaths_per_million", $"new_deaths_smoothed_per_million")

display(per_million_data)
val per_million_data_corrected = per_million_data.withColumn("total_cases_per_million_correct", per_million_data("total_cases")/per_million_data("population")*1000000)
                                                 .withColumn("new_cases_per_million_correct", per_million_data("new_cases")/per_million_data("population")*1000000)
                                                 .withColumn("new_cases_smoothed_per_million_correct", per_million_data("new_cases_smoothed")/per_million_data("population")*1000000)
                                                 .withColumn("total_deaths_per_million_correct", per_million_data("total_deaths")/per_million_data("population")*1000000)
                                                 .withColumn("new_deaths_per_million_correct", per_million_data("new_deaths")/per_million_data("population")*1000000)
                                                 .withColumn("new_deaths_smoothed_per_million_correct", per_million_data("new_deaths_smoothed")/per_million_data("population")*1000000)
                                                 .drop("total_cases_per_million", "new_cases_per_million", "new_cases_smoothed_per_million", 
                                                       "total_deaths_per_million", "new_deaths_per_million", "new_deaths_smoothed_per_million") // drop old column to rename
                                                 .withColumnRenamed("total_cases_per_million_correct","total_cases_per_million")
                                                 .withColumnRenamed("new_cases_per_million_correct","new_cases_per_million")
                                                 .withColumnRenamed("new_cases_smoothed_per_million_correct","new_cases_smoothed_per_million")
                                                 .withColumnRenamed("total_deaths_per_million_correct","total_deaths_per_million")
                                                 .withColumnRenamed("new_deaths_per_million_correct","new_deaths_per_million")
                                                 .withColumnRenamed("new_deaths_smoothed_per_million_correct","new_deaths_smoothed_per_million")
per_million_data_corrected: org.apache.spark.sql.DataFrame = [location: string, date: string ... 15 more fields]
val df_cleaned_feature_permillion = df_cleaned_feature.withColumn("total_cases_per_million_correct", df_cleaned_feature("total_cases")/df_cleaned_feature("population")*1000000)
                                                   .withColumn("new_cases_per_million_correct", df_cleaned_feature("new_cases")/df_cleaned_feature("population")*1000000)
                                                   .withColumn("new_cases_smoothed_per_million_correct", df_cleaned_feature("new_cases_smoothed")/df_cleaned_feature("population")*1000000)
                                                   .withColumn("total_deaths_per_million_correct", df_cleaned_feature("total_deaths")/df_cleaned_feature("population")*1000000)
                                                   .withColumn("new_deaths_per_million_correct", df_cleaned_feature("new_deaths")/df_cleaned_feature("population")*1000000)
                                                   .withColumn("new_deaths_smoothed_per_million_correct", df_cleaned_feature("new_deaths_smoothed")/df_cleaned_feature("population")*1000000)
                                                   .drop("total_cases_per_million", "new_cases_per_million", "new_cases_smoothed_per_million", 
                                                         "total_deaths_per_million", "new_deaths_per_million", "new_deaths_smoothed_per_million") // drop old column to rename
                                                   .withColumnRenamed("total_cases_per_million_correct","total_cases_per_million")
                                                   .withColumnRenamed("new_cases_per_million_correct","new_cases_per_million")
                                                   .withColumnRenamed("new_cases_smoothed_per_million_correct","new_cases_smoothed_per_million")
                                                   .withColumnRenamed("total_deaths_per_million_correct","total_deaths_per_million")
                                                   .withColumnRenamed("new_deaths_per_million_correct","new_deaths_per_million")
                                                   .withColumnRenamed("new_deaths_smoothed_per_million_correct","new_deaths_smoothed_per_million")
df_cleaned_feature_permillion: org.apache.spark.sql.DataFrame = [iso_code: string, continent: string ... 48 more fields]

5. Impute time series of

  • reproduction_rate
  • total_tests
  • stringency_index
  • totaltestsper_thousand

fill null in reproduction_rate by last available and next available value

All countries has missing data at beginning or in the end

display(df_cleaned_feature_permillion.select($"reproduction_rate", $"location", $"date").filter($"reproduction_rate".isNull).groupBy("location").count().sort("location"))
display(df_cleaned_feature_permillion.select($"reproduction_rate", $"location", $"date").filter($"reproduction_rate".isNull).groupBy("location").agg(max("date").as("max_date"), min("date").as("min_date")).sort("location"))
display(df_cleaned_feature_permillion.select($"reproduction_rate", $"location", $"date").filter($"location"==="Albania"))
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val df_cleaned_reproduction_rate= df_cleaned_feature_permillion.select($"reproduction_rate", $"location", $"date")
                              .withColumn("reproduction_rate", last("reproduction_rate", true)
                                         .over(Window.partitionBy("location").orderBy("date").rowsBetween(-df_cleaned_feature_permillion.count(), 0)))
                              .withColumn("reproduction_rate", first("reproduction_rate", true)
                                         .over(Window.partitionBy("location").orderBy("date").rowsBetween(0, df_cleaned_feature_permillion.count())))
                              .na.fill(0, Array("reproduction_rate"))
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
df_cleaned_reproduction_rate: org.apache.spark.sql.DataFrame = [reproduction_rate: double, location: string ... 1 more field]

countries miss stringency_index value

display(df_cleaned_feature_permillion.select($"stringency_index", $"location", $"date").filter($"stringency_index".isNull).groupBy("location").count().sort("count"))

start and end date for null value of stringency_index for each country

display(df_cleaned_feature_permillion.select($"stringency_index", $"location", $"date").filter($"stringency_index".isNull).groupBy("location").agg(max("date").as("max_date"), min("date").as("min_date")))
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val df_cleaned_stringency = df_cleaned_feature_permillion.select($"stringency_index", $"location", $"date")
                              .withColumn("stringency_index_corect", last("stringency_index", true)
                                         .over(Window.partitionBy("location").orderBy("date").rowsBetween(-df_cleaned_feature_permillion.count(), 0)))
display(df_cleaned_stringency.filter($"stringency_index".isNull).filter($"stringency_index_corect".isNull).groupBy("location").count())
location count
Comoros 316.0
Bahamas 316.0
Malta 316.0
Montenegro 316.0
Armenia 316.0

total_tests, impute by last available or next available value

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val df_cleaned_total_cases = df_cleaned_feature_permillion.select($"total_tests", $"location", $"date")
                              .withColumn("total_tests", last("total_tests", true)
                                         .over(Window.partitionBy("location").orderBy("date").rowsBetween(-df_cleaned_feature_permillion.count(), 0)))
                              .withColumn("total_tests", first("total_tests", true)
                                         .over(Window.partitionBy("location").orderBy("date").rowsBetween(0, df_cleaned_feature_permillion.count())))
                              .na.fill(0, Array("total_tests"))
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
df_cleaned_total_cases: org.apache.spark.sql.DataFrame = [total_tests: double, location: string ... 1 more field]
display(df_cleaned_feature_permillion.select($"total_tests", $"location", $"date").filter($"total_tests".isNull).groupBy("location").count())
val total_tests_date_maxmin = df_cleaned_feature_permillion.select($"total_tests", $"location", $"date").filter($"total_tests".isNull).groupBy("location").agg(max("date").as("max_date"), min("date").as("min_date"))
display(total_tests_date_maxmin)

process stringencyindex, reproductionrate, totaltests, totaltestsperthousand

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val df_cleaned_time_series = df_cleaned_feature_permillion
                              .withColumn("reproduction_rate", last("reproduction_rate", true)
                                         .over(Window.partitionBy("location").orderBy("date").rowsBetween(-df_cleaned_feature_permillion.count(), 0)))
                              .withColumn("reproduction_rate", first("reproduction_rate", true)
                                         .over(Window.partitionBy("location").orderBy("date").rowsBetween(0, df_cleaned_feature_permillion.count())))
                              .na.fill(0, Array("reproduction_rate"))
                              .withColumn("stringency_index", last("stringency_index", true)
                                         .over(Window.partitionBy("location").orderBy("date").rowsBetween(-df_cleaned_feature_permillion.count(), 0)))
                              .na.fill(0, Array("stringency_index"))
                              .withColumn("total_tests", last("total_tests", true)
                                         .over(Window.partitionBy("location").orderBy("date").rowsBetween(-df_cleaned_feature_permillion.count(), 0)))
                              .withColumn("total_tests", first("total_tests", true)
                                         .over(Window.partitionBy("location").orderBy("date").rowsBetween(0, df_cleaned_feature_permillion.count())))
                              .na.fill(0, Array("total_tests"))
                              .withColumn("total_tests_per_thousand", col("total_tests")/col("population")*1000)
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
df_cleaned_time_series: org.apache.spark.sql.DataFrame = [iso_code: string, continent: string ... 48 more fields]
display(df_cleaned_time_series)
import org.apache.spark.sql.functions._

for (c <- df_cleaned_time_series.columns) {
  println(c + ": " + df_cleaned_time_series.filter(col(c).isNull).count())
}