display(dbutils.fs.ls("/datasets/group12/"))
path | name | size |
---|---|---|
dbfs:/datasets/group12/20_12_04_08_31_44.csv | 20_12_04_08_31_44.csv | 1.4181338e7 |
dbfs:/datasets/group12/20_12_04_08_32_40.csv | 20_12_04_08_32_40.csv | 1.4181338e7 |
dbfs:/datasets/group12/20_12_04_10_47_08.csv | 20_12_04_10_47_08.csv | 1.4190774e7 |
dbfs:/datasets/group12/21_01_07_08_50_05.csv | 21_01_07_08_50_05.csv | 1.4577033e7 |
dbfs:/datasets/group12/21_01_07_09_05_33.csv | 21_01_07_09_05_33.csv | 1.4577033e7 |
dbfs:/datasets/group12/analysis/ | analysis/ | 0.0 |
dbfs:/datasets/group12/chkpoint/ | chkpoint/ | 0.0 |
Load parquet file
val df = spark.read.parquet("dbfs:/datasets/group12/analysis/*.parquet")
display(df)
Load csv file
-> %scala //if want to load csv
val filelocation = "/datasets/group12/201204104708.csv" val file_type = "csv"
// CSV options val inferschema = "true" val firstrowisheader = "true" val delimiter = ","
// The applied options are for CSV files. For other file types, these will be ignored. val df = spark.read.format(filetype) .option("inferSchema", inferschema) .option("header", firstrowisheader) .option("sep", delimiter) .load(filelocation)
display(df)
Number of data
df.count()
res19: Long = 60544
Missing Features in data due to multiple web resource
import org.apache.spark.sql.functions._
for (c <- df.columns) {
println(c + ": " + df.filter(col(c).isNull).count())
}
Here shows HK does not have meaningful value and there is one unknown international location in data.
display(df.filter($"location"==="Hong Kong" || $"iso_code".isNull)) //HK data iteself is not complete for all dates, and all available data is null! HAVE TO FILTER IT OUT COMPLETELY
190 valid countries data to continue
val df_filteredLocation = df.filter($"iso_code"=!="HKG").filter($"iso_code".isNotNull)
display(df_filteredLocation.select($"location").distinct()) // 190 valid countries
Fill missing continent value for World aggregate data NOTE: it will be filled as "World"
display(df_filteredLocation.where($"continent".isNull))
val df_fillContinentNull = df_filteredLocation.na.fill("World",Array("continent"))
display(df_fillContinentNull)
df_fillContinentNull.count()
res27: Long = 60158
import org.apache.spark.sql.functions._
for (c <- df_fillContinentNull.columns) {
println(c + ": " + df_fillContinentNull.filter(col(c).isNull).count())
}
display(df_fillContinentNull.select($"date",$"iso_code").groupBy($"iso_code").count()) // some country starts logging data earlier
val df_filtered_date = df_fillContinentNull.filter($"date">"2020-01-22")
df_filtered_date: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [iso_code: string, continent: string ... 48 more fields]
display(df_filtered_date.select($"date",$"iso_code").groupBy($"iso_code").count()) // all countries have 316 days logging
display(df_filtered_date.select($"date",$"iso_code", $"total_cases", $"total_deaths", $"new_cases", $"new_deaths", $"new_cases_smoothed", $"new_deaths_smoothed").filter($"new_cases_smoothed".isNull || $"new_deaths_smoothed".isNull))
All missing data of newcasessmoothed and newdeathssmoothed from early, so just fill with 0
display(df_filtered_date.select($"date",$"iso_code", $"total_cases", $"total_deaths", $"new_cases", $"new_deaths", $"new_cases_smoothed", $"new_deaths_smoothed")
.filter($"new_cases_smoothed".isNull || $"new_deaths_smoothed".isNull).select($"date").distinct())
date |
---|
2020-01-23 |
2020-01-27 |
2020-01-24 |
2020-01-26 |
2020-01-25 |
val df_fillNullForSmooth = df_filtered_date.na.fill(0,Array("new_cases_smoothed"))
.na.fill(0,Array("new_deaths_smoothed"))
display(df_fillNullForSmooth)
Fill totaldeaths and totalcases null value
Strictly, when newcases is always 0, totalcases could be imputed as 0. The same apply to total_deaths
val df_NULL_total_cases = df_fillNullForSmooth.select($"date",$"iso_code", $"total_cases", $"total_deaths", $"new_cases", $"new_deaths", $"new_cases_smoothed", $"new_deaths_smoothed")
.filter($"total_cases".isNull)
display(df_NULL_total_cases.filter($"new_cases"===0).groupBy("iso_code").count())
When totalcase is Null, all previous newcases is always 0.
df_NULL_total_cases.filter($"total_cases".isNull).groupBy("iso_code").count().except(df_NULL_total_cases.filter($"new_cases"===0).groupBy("iso_code").count()).show() // When total_case is Null, all new_cases is always 0
+--------+-----+
|iso_code|count|
+--------+-----+
+--------+-----+
val df_fillNullForTotalCases = df_fillNullForSmooth.na.fill(0, Array("total_cases"))
display(df_fillNullForTotalCases)
val df_NULL_total_death = df_fillNullForTotalCases.select($"date",$"iso_code", $"total_cases", $"total_deaths", $"new_cases", $"new_deaths", $"new_cases_smoothed", $"new_deaths_smoothed")
.filter($"total_deaths".isNull)
display(df_NULL_total_death.filter($"new_deaths"===0).groupBy("iso_code").count().sort())
If totaldeaths is Null when all newdeaths is always 0, then we could simply assign 0 for NULL, otherwise need to investigate more.
Three countries (ISL, PNG, SVK) have abnormal correction on new_cases data.
val abnormal_countries = df_NULL_total_death.filter($"total_deaths".isNull).groupBy("iso_code").count().except(df_NULL_total_death.filter($"new_deaths"===0).groupBy("iso_code").count())
abnormal_countries.show()
df_NULL_total_death.filter($"new_deaths"===0).groupBy("iso_code").count().except(df_NULL_total_death.filter($"total_deaths".isNull).groupBy("iso_code").count()).show()
+--------+-----+
|iso_code|count|
+--------+-----+
| PNG| 186|
| SVK| 65|
| ISL| 54|
+--------+-----+
+--------+-----+
|iso_code|count|
+--------+-----+
| PNG| 185|
| SVK| 64|
| ISL| 52|
+--------+-----+
abnormal_countries: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [iso_code: string, count: bigint]
show abnormal death correction
display(df_fillNullForSmooth.filter($"iso_code"==="ISL").sort("date").filter($"date">"2020-03-13" && $"date"<"2020-03-22")) // death data correction between 2020-03-14 and 2020-03-21, total_deaths -> all 0, new_deaths -> all 0, new_deaths_smoothed -> all 0
iso_code | continent | location | date | total_cases | new_cases | new_cases_smoothed | total_deaths | new_deaths | new_deaths_smoothed | total_cases_per_million | new_cases_per_million | new_cases_smoothed_per_million | total_deaths_per_million | new_deaths_per_million | new_deaths_smoothed_per_million | reproduction_rate | icu_patients | icu_patients_per_million | hosp_patients | hosp_patients_per_million | weekly_icu_admissions | weekly_icu_admissions_per_million | weekly_hosp_admissions | weekly_hosp_admissions_per_million | total_tests | new_tests | total_tests_per_thousand | new_tests_per_thousand | new_tests_smoothed | new_tests_smoothed_per_thousand | tests_per_case | positive_rate | tests_units | stringency_index | population | population_density | median_age | aged_65_older | aged_70_older | gdp_per_capita | extreme_poverty | cardiovasc_death_rate | diabetes_prevalence | female_smokers | male_smokers | handwashing_facilities | hospital_beds_per_thousand | life_expectancy | human_development_index |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
ISL | Europe | Iceland | 2020-03-14 | 156.0 | 22.0 | 15.143 | null | 0.0 | 0.0 | 457.143 | 64.469 | 44.375 | null | 0.0 | 0.0 | 1.62 | 1.0 | 2.93 | 3.0 | 8.791 | null | null | null | null | 1827.0 | 323.0 | 5.354 | 0.947 | 198.0 | 0.58 | 7.6e-2 | 13.1 | null | 16.67 | 341250.0 | 3.404 | 37.3 | 14.431 | 9.207 | 46482.958 | 0.2 | 117.992 | 5.31 | 14.3 | 15.2 | null | 2.91 | 82.99 | 0.935 |
ISL | Europe | Iceland | 2020-03-15 | 171.0 | 15.0 | 17.286 | 5.0 | 5.0 | 0.714 | 501.099 | 43.956 | 50.654 | 14.652 | 14.652 | 2.093 | 1.61 | 2.0 | 5.861 | 3.0 | 8.791 | null | null | null | null | 2902.0 | 1075.0 | 8.504 | 3.15 | 346.0 | 1.014 | 5.0e-2 | 20.0 | null | 25.0 | 341250.0 | 3.404 | 37.3 | 14.431 | 9.207 | 46482.958 | 0.2 | 117.992 | 5.31 | 14.3 | 15.2 | null | 2.91 | 82.99 | 0.935 |
ISL | Europe | Iceland | 2020-03-16 | 180.0 | 9.0 | 17.429 | null | -5.0 | 0.0 | 527.473 | 26.374 | 51.073 | null | -14.652 | 0.0 | 1.63 | 2.0 | 5.861 | 4.0 | 11.722 | null | null | null | null | 4609.0 | 1707.0 | 13.506 | 5.002 | 576.0 | 1.688 | 3.0e-2 | 33.0 | null | 50.93 | 341250.0 | 3.404 | 37.3 | 14.431 | 9.207 | 46482.958 | 0.2 | 117.992 | 5.31 | 14.3 | 15.2 | null | 2.91 | 82.99 | 0.935 |
ISL | Europe | Iceland | 2020-03-17 | 220.0 | 40.0 | 21.571 | 1.0 | 1.0 | 0.143 | 644.689 | 117.216 | 63.213 | 2.93 | 2.93 | 0.419 | 1.7 | 2.0 | 5.861 | 5.0 | 14.652 | null | null | null | null | 6009.0 | 1400.0 | 17.609 | 4.103 | 752.0 | 2.204 | 2.9e-2 | 34.9 | null | 50.93 | 341250.0 | 3.404 | 37.3 | 14.431 | 9.207 | 46482.958 | 0.2 | 117.992 | 5.31 | 14.3 | 15.2 | null | 2.91 | 82.99 | 0.935 |
ISL | Europe | Iceland | 2020-03-18 | 250.0 | 30.0 | 23.571 | 1.0 | 0.0 | 0.143 | 732.601 | 87.912 | 69.074 | 2.93 | 0.0 | 0.419 | 1.73 | 0.0 | 0.0 | 6.0 | 17.582 | null | null | null | null | 7837.0 | 1828.0 | 22.966 | 5.357 | 992.0 | 2.907 | 2.4e-2 | 42.1 | null | 50.93 | 341250.0 | 3.404 | 37.3 | 14.431 | 9.207 | 46482.958 | 0.2 | 117.992 | 5.31 | 14.3 | 15.2 | null | 2.91 | 82.99 | 0.935 |
ISL | Europe | Iceland | 2020-03-19 | 330.0 | 80.0 | 32.429 | 1.0 | 0.0 | 0.143 | 967.033 | 234.432 | 95.029 | 2.93 | 0.0 | 0.419 | 1.78 | 1.0 | 2.93 | 6.0 | 17.582 | null | null | null | null | 9148.0 | 1311.0 | 26.807 | 3.842 | 1143.0 | 3.349 | 2.8e-2 | 35.2 | null | 50.93 | 341250.0 | 3.404 | 37.3 | 14.431 | 9.207 | 46482.958 | 0.2 | 117.992 | 5.31 | 14.3 | 15.2 | null | 2.91 | 82.99 | 0.935 |
ISL | Europe | Iceland | 2020-03-20 | 409.0 | 79.0 | 39.286 | null | -1.0 | 0.0 | 1198.535 | 231.502 | 115.123 | null | -2.93 | 0.0 | 1.75 | 1.0 | 2.93 | 10.0 | 29.304 | null | null | null | null | 9727.0 | 579.0 | 28.504 | 1.697 | 1175.0 | 3.443 | 3.3e-2 | 29.9 | null | 53.7 | 341250.0 | 3.404 | 37.3 | 14.431 | 9.207 | 46482.958 | 0.2 | 117.992 | 5.31 | 14.3 | 15.2 | null | 2.91 | 82.99 | 0.935 |
ISL | Europe | Iceland | 2020-03-21 | 473.0 | 64.0 | 45.286 | 1.0 | 1.0 | 0.143 | 1386.081 | 187.546 | 132.705 | 2.93 | 2.93 | 0.419 | 1.68 | 1.0 | 2.93 | 12.0 | 35.165 | null | null | null | null | 10077.0 | 350.0 | 29.53 | 1.026 | 1179.0 | 3.455 | 3.8e-2 | 26.0 | null | 53.7 | 341250.0 | 3.404 | 37.3 | 14.431 | 9.207 | 46482.958 | 0.2 | 117.992 | 5.31 | 14.3 | 15.2 | null | 2.91 | 82.99 | 0.935 |
display(df_fillNullForSmooth.filter($"iso_code"==="PNG").sort("date").filter($"date">"2020-07-19" && $"date"<"2020-07-24" )) // death data correction between 2020-07-20 and 2020-07-22, total_deaths -> all 0, new_deaths -> all 0, new_deaths_smoothed -> all 0
iso_code | continent | location | date | total_cases | new_cases | new_cases_smoothed | total_deaths | new_deaths | new_deaths_smoothed | total_cases_per_million | new_cases_per_million | new_cases_smoothed_per_million | total_deaths_per_million | new_deaths_per_million | new_deaths_smoothed_per_million | reproduction_rate | icu_patients | icu_patients_per_million | hosp_patients | hosp_patients_per_million | weekly_icu_admissions | weekly_icu_admissions_per_million | weekly_hosp_admissions | weekly_hosp_admissions_per_million | total_tests | new_tests | total_tests_per_thousand | new_tests_per_thousand | new_tests_smoothed | new_tests_smoothed_per_thousand | tests_per_case | positive_rate | tests_units | stringency_index | population | population_density | median_age | aged_65_older | aged_70_older | gdp_per_capita | extreme_poverty | cardiovasc_death_rate | diabetes_prevalence | female_smokers | male_smokers | handwashing_facilities | hospital_beds_per_thousand | life_expectancy | human_development_index |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
PNG | Oceania | Papua New Guinea | 2020-07-20 | 19.0 | 3.0 | 1.143 | 1.0 | 1.0 | 0.143 | 2.124 | 0.335 | 0.128 | 0.112 | 0.112 | 1.6e-2 | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | 45.37 | 8947027.0 | 18.22 | 22.6 | 3.808 | 2.142 | 3823.194 | null | 561.494 | 17.65 | 23.5 | 48.8 | null | null | 64.5 | 0.544 |
PNG | Oceania | Papua New Guinea | 2020-07-21 | 27.0 | 8.0 | 2.286 | 1.0 | 0.0 | 0.143 | 3.018 | 0.894 | 0.255 | 0.112 | 0.0 | 1.6e-2 | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | 45.37 | 8947027.0 | 18.22 | 22.6 | 3.808 | 2.142 | 3823.194 | null | 561.494 | 17.65 | 23.5 | 48.8 | null | null | 64.5 | 0.544 |
PNG | Oceania | Papua New Guinea | 2020-07-22 | 30.0 | 3.0 | 2.714 | null | -1.0 | 0.0 | 3.353 | 0.335 | 0.303 | null | -0.112 | 0.0 | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | 45.37 | 8947027.0 | 18.22 | 22.6 | 3.808 | 2.142 | 3823.194 | null | 561.494 | 17.65 | 23.5 | 48.8 | null | null | 64.5 | 0.544 |
PNG | Oceania | Papua New Guinea | 2020-07-23 | 31.0 | 1.0 | 2.857 | null | 0.0 | 0.0 | 3.465 | 0.112 | 0.319 | null | 0.0 | 0.0 | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | null | 45.37 | 8947027.0 | 18.22 | 22.6 | 3.808 | 2.142 | 3823.194 | null | 561.494 | 17.65 | 23.5 | 48.8 | null | null | 64.5 | 0.544 |
display(df_fillNullForSmooth.filter($"iso_code"==="SVK").sort("date").filter($"date">"2020-03-16" && $"date"<"2020-03-23")) // death data correction between 2020-03-18 and 2020-03-22, total_deaths -> all 0, new_deaths -> all 0, new_deaths_smoothed -> all 0
iso_code | continent | location | date | total_cases | new_cases | new_cases_smoothed | total_deaths | new_deaths | new_deaths_smoothed | total_cases_per_million | new_cases_per_million | new_cases_smoothed_per_million | total_deaths_per_million | new_deaths_per_million | new_deaths_smoothed_per_million | reproduction_rate | icu_patients | icu_patients_per_million | hosp_patients | hosp_patients_per_million | weekly_icu_admissions | weekly_icu_admissions_per_million | weekly_hosp_admissions | weekly_hosp_admissions_per_million | total_tests | new_tests | total_tests_per_thousand | new_tests_per_thousand | new_tests_smoothed | new_tests_smoothed_per_thousand | tests_per_case | positive_rate | tests_units | stringency_index | population | population_density | median_age | aged_65_older | aged_70_older | gdp_per_capita | extreme_poverty | cardiovasc_death_rate | diabetes_prevalence | female_smokers | male_smokers | handwashing_facilities | hospital_beds_per_thousand | life_expectancy | human_development_index |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
SVK | Europe | Slovakia | 2020-03-17 | 72.0 | 9.0 | 9.286 | null | 0.0 | 0.0 | 13.188 | 1.648 | 1.701 | null | 0.0 | 0.0 | null | null | null | null | null | null | null | null | null | 1913.0 | 318.0 | 0.35 | 5.8e-2 | 173.0 | 3.2e-2 | 5.4e-2 | 18.6 | null | 75.0 | 5459643.0 | 113.128 | 41.2 | 15.07 | 9.167 | 30155.152 | 0.7 | 287.959 | 7.29 | 23.1 | 37.7 | null | 5.82 | 77.54 | 0.855 |
SVK | Europe | Slovakia | 2020-03-18 | 105.0 | 33.0 | 13.571 | 1.0 | 1.0 | 0.143 | 19.232 | 6.044 | 2.486 | 0.183 | 0.183 | 2.6e-2 | null | null | null | null | null | null | null | null | null | 2138.0 | 225.0 | 0.392 | 4.1e-2 | 192.0 | 3.5e-2 | 7.1e-2 | 14.1 | null | 75.0 | 5459643.0 | 113.128 | 41.2 | 15.07 | 9.167 | 30155.152 | 0.7 | 287.959 | 7.29 | 23.1 | 37.7 | null | 5.82 | 77.54 | 0.855 |
SVK | Europe | Slovakia | 2020-03-19 | 123.0 | 18.0 | 15.286 | 1.0 | 0.0 | 0.143 | 22.529 | 3.297 | 2.8 | 0.183 | 0.0 | 2.6e-2 | 1.19 | null | null | null | null | null | null | null | null | 2439.0 | 301.0 | 0.447 | 5.5e-2 | 221.0 | 4.0e-2 | 6.9e-2 | 14.5 | null | 75.0 | 5459643.0 | 113.128 | 41.2 | 15.07 | 9.167 | 30155.152 | 0.7 | 287.959 | 7.29 | 23.1 | 37.7 | null | 5.82 | 77.54 | 0.855 |
SVK | Europe | Slovakia | 2020-03-20 | 137.0 | 14.0 | 15.0 | 1.0 | 0.0 | 0.143 | 25.093 | 2.564 | 2.747 | 0.183 | 0.0 | 2.6e-2 | 1.19 | null | null | null | null | null | null | null | null | 2807.0 | 368.0 | 0.514 | 6.7e-2 | 265.0 | 4.9e-2 | 5.7e-2 | 17.7 | null | 75.0 | 5459643.0 | 113.128 | 41.2 | 15.07 | 9.167 | 30155.152 | 0.7 | 287.959 | 7.29 | 23.1 | 37.7 | null | 5.82 | 77.54 | 0.855 |
SVK | Europe | Slovakia | 2020-03-21 | 178.0 | 41.0 | 19.143 | 1.0 | 0.0 | 0.143 | 32.603 | 7.51 | 3.506 | 0.183 | 0.0 | 2.6e-2 | 1.19 | null | null | null | null | null | null | null | null | 3247.0 | 440.0 | 0.595 | 8.1e-2 | 300.0 | 5.5e-2 | 6.4e-2 | 15.7 | null | 75.0 | 5459643.0 | 113.128 | 41.2 | 15.07 | 9.167 | 30155.152 | 0.7 | 287.959 | 7.29 | 23.1 | 37.7 | null | 5.82 | 77.54 | 0.855 |
SVK | Europe | Slovakia | 2020-03-22 | 185.0 | 7.0 | 18.714 | null | -1.0 | 0.0 | 33.885 | 1.282 | 3.428 | null | -0.183 | 0.0 | 1.18 | null | null | null | null | null | null | null | null | 3489.0 | 242.0 | 0.639 | 4.4e-2 | 293.0 | 5.4e-2 | 6.4e-2 | 15.7 | null | 75.0 | 5459643.0 | 113.128 | 41.2 | 15.07 | 9.167 | 30155.152 | 0.7 | 287.959 | 7.29 | 23.1 | 37.7 | null | 5.82 | 77.54 | 0.855 |
Correct new_deaths correction back to 0
val df_fillNullForTotalDeathsSpecial = df_fillNullForTotalCases.withColumn("total_deaths_correct",
when(col("iso_code").equalTo("ISL")&&(col("date")>"2020-03-13" && col("date")<"2020-03-22"),0)
.when(col("iso_code").equalTo("PNG")&&(col("date")>"2020-07-19" && col("date")<"2020-07-23"),0)
.when(col("iso_code").equalTo("SVK")&&(col("date")>"2020-03-17" && col("date")<"2020-03-23"),0).otherwise(col("total_deaths")))
.withColumn("new_deaths_correct",
when(col("iso_code").equalTo("ISL")&&(col("date")>"2020-03-13" && col("date")<"2020-03-22"),0)
.when(col("iso_code").equalTo("PNG")&&(col("date")>"2020-07-19" && col("date")<"2020-07-23"),0)
.when(col("iso_code").equalTo("SVK")&&(col("date")>"2020-03-17" && col("date")<"2020-03-23"),0).otherwise(col("new_deaths")))
.withColumn("new_deaths_smoothed_correct",
when(col("iso_code").equalTo("ISL")&&(col("date")>"2020-03-13" && col("date")<"2020-03-22"),0)
.when(col("iso_code").equalTo("PNG")&&(col("date")>"2020-07-19" && col("date")<"2020-07-23"),0)
.when(col("iso_code").equalTo("SVK")&&(col("date")>"2020-03-17" && col("date")<"2020-03-23"),0).otherwise(col("new_deaths_smoothed")))
df_fillNullForTotalDeathsSpecial: org.apache.spark.sql.DataFrame = [iso_code: string, continent: string ... 51 more fields]
Expect to see an empty table, so correction is right
val df_NULL_total_death_ = df_fillNullForTotalDeathsSpecial.select($"date",$"iso_code", $"total_cases", $"total_deaths_correct", $"new_cases", $"new_deaths_correct", $"new_cases_smoothed", $"new_deaths_smoothed_correct")
.filter($"total_deaths_correct".isNull)
df_NULL_total_death_.filter($"total_deaths_correct".isNull).groupBy("iso_code").count().except(df_NULL_total_death_.filter($"new_deaths_correct"===0).groupBy("iso_code").count()).show()
+--------+-----+
|iso_code|count|
+--------+-----+
+--------+-----+
df_NULL_total_death_: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [date: string, iso_code: string ... 6 more fields]
fill rest NULL value for total_death.
val df_fillNullForTotalDeaths = df_fillNullForTotalDeathsSpecial
.drop("new_deaths", "total_deaths", "new_deaths_smoothed") // drop old column to rename
.withColumnRenamed("new_deaths_correct","new_deaths")
.withColumnRenamed("total_deaths_correct","total_deaths")
.withColumnRenamed("new_deaths_smoothed_correct","new_deaths_smoothed")
.na.fill(0, Array("total_deaths"))
.select(df.columns.head, df.columns.tail: _*)
display(df_fillNullForTotalDeaths)
All first 10 column is clean now!
(All code above is for illustration, for processing just run cell below )
// filter unknow and HK data
val df_filteredLocation = df.filter($"iso_code"=!="HKG").filter($"iso_code".isNotNull)
// fill missing continent value for World data
val df_fillContinentNull = df_filteredLocation.na.fill("World",Array("continent")).cache
df_filteredLocation.unpersist()
// filter date before 2020-01-23
val df_filtered_date = df_fillContinentNull.filter($"date">"2020-01-22").cache
df_fillContinentNull.unpersist()
// fill missing for new_cases_smoothed and new_deaths_smoothed
val df_fillNullForSmooth = df_filtered_date.na.fill(0,Array("new_cases_smoothed"))
.na.fill(0,Array("new_deaths_smoothed"))
.cache
df_filtered_date.unpersist()
// fill missing for total_cases
val df_fillNullForTotalCases = df_fillNullForSmooth.na.fill(0, Array("total_cases")).cache
df_fillNullForSmooth.unpersist()
// correct total_deaths, new_deaths, new_deaths_smoothed
val df_fillNullForTotalDeathsSpecial = df_fillNullForTotalCases.withColumn("total_deaths_correct",
when(col("iso_code").equalTo("ISL")&&(col("date")>"2020-03-13" && col("date")<"2020-03-22"),0)
.when(col("iso_code").equalTo("PNG")&&(col("date")>"2020-07-19" && col("date")<"2020-07-23"),0)
.when(col("iso_code").equalTo("SVK")&&(col("date")>"2020-03-17" && col("date")<"2020-03-23"),0).otherwise(col("total_deaths")))
.withColumn("new_deaths_correct",
when(col("iso_code").equalTo("ISL")&&(col("date")>"2020-03-13" && col("date")<"2020-03-22"),0)
.when(col("iso_code").equalTo("PNG")&&(col("date")>"2020-07-19" && col("date")<"2020-07-23"),0)
.when(col("iso_code").equalTo("SVK")&&(col("date")>"2020-03-17" && col("date")<"2020-03-23"),0).otherwise(col("new_deaths")))
.withColumn("new_deaths_smoothed_correct",
when(col("iso_code").equalTo("ISL")&&(col("date")>"2020-03-13" && col("date")<"2020-03-22"),0)
.when(col("iso_code").equalTo("PNG")&&(col("date")>"2020-07-19" && col("date")<"2020-07-23"),0)
.when(col("iso_code").equalTo("SVK")&&(col("date")>"2020-03-17" && col("date")<"2020-03-23"),0).otherwise(col("new_deaths_smoothed")))
.cache
df_fillNullForTotalCases.unpersist()
val df_cleaned = df_fillNullForTotalDeathsSpecial
.drop("new_deaths", "total_deaths", "new_deaths_smoothed") // drop old column to rename
.withColumnRenamed("new_deaths_correct","new_deaths")
.withColumnRenamed("total_deaths_correct","total_deaths")
.withColumnRenamed("new_deaths_smoothed_correct","new_deaths_smoothed")
.na.fill(0, Array("total_deaths"))
.select(df.columns.head, df.columns.tail: _*)
.cache
df_fillNullForTotalDeathsSpecial.unpersist()
display(df_cleaned)
import org.apache.spark.sql.functions._
for (c <- df_cleaned.columns) {
println(c + ": " + df_cleaned.filter(col(c).isNull).count())
}
3. select invariant (during pandemic) features for clustering
double check whether they are constant for each country, and if not, change all the value to mean and filter out countries that have missing constant features
Candidate list: - population - populationdensity - medianage - aged65older - aged70older - gdppercapita
- cardiovascdeathrate
- diabetes_prevalence
- female_smokers
- male_smokers
- hospitalbedsper_thousand
- life_expectancy
- humandevelopmentindex
val df_invariantFeatures = df_cleaned.select($"location", $"population",$"population_density",
$"median_age", $"aged_65_older",
$"aged_70_older",$"gdp_per_capita",
$"cardiovasc_death_rate",$"diabetes_prevalence",
$"female_smokers",$"male_smokers",$"hospital_beds_per_thousand",
$"life_expectancy",$"human_development_index")
display(df_invariantFeatures)
display(df_invariantFeatures.describe())
summary | location | population | population_density | median_age | aged_65_older | aged_70_older | gdp_per_capita | cardiovasc_death_rate | diabetes_prevalence | female_smokers | male_smokers | hospital_beds_per_thousand | life_expectancy | human_development_index |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
count | 60104 | 60104 | 58524 | 57260 | 56312 | 56944 | 57260 | 57892 | 58524 | 44928 | 44296 | 52835 | 59788 | 57576 |
mean | null | 8.179034692057101E7 | 305.56040665368073 | 30.204722319245512 | 8.595353423781793 | 5.418455904046091 | 18397.155660565862 | 262.00796225730636 | 7.89011004032537 | 10.368771011396003 | 32.645260520137235 | 3.000101788587119 | 72.8302766441427 | 0.7086279005141026 |
stddev | null | 5.801702332960505E8 | 1534.259203474429 | 9.077112325166668 | 6.17972101589387 | 4.214972529727923 | 19409.896953039588 | 120.63832428074626 | 4.2077256136575105 | 10.396263307877218 | 13.558764723459655 | 2.438486660152889 | 7.538806415792373 | 0.15398160968293945 |
min | Afghanistan | 809.0 | 1.98 | 15.1 | 1.144 | 0.526 | 661.24 | 79.37 | 0.99 | 0.1 | 7.7 | 0.1 | 53.28 | 0.354 |
max | Zimbabwe | 7.794798729E9 | 19347.5 | 48.2 | 27.049 | 18.493 | 116935.6 | 724.417 | 30.53 | 44.0 | 78.1 | 13.8 | 86.75 | 0.953 |
for (c <- df_invariantFeatures.columns) {
println(c + ": " + df_invariantFeatures.filter(col(c).isNull).count())
}
location: 0
population: 0
population_density: 1580
median_age: 2844
aged_65_older: 3792
aged_70_older: 3160
gdp_per_capita: 2844
cardiovasc_death_rate: 2212
diabetes_prevalence: 1580
female_smokers: 15176
male_smokers: 15808
hospital_beds_per_thousand: 7269
life_expectancy: 316
human_development_index: 2528
Although some countries seems like an outlier, it does have constant femalesmokers and malesmokers
val constant_feature_checker = df_cleaned.groupBy("location")
.agg(
//stddev("stringency_index").as("std_si"),
stddev("population").as("std_pop"),
stddev("population_density").as("std_pd"),
stddev("median_age").as("std_ma"),
stddev("aged_65_older").as("std_a65"),
stddev("aged_70_older").as("std_a70"),
stddev("gdp_per_capita").as("std_gdp"),
stddev("cardiovasc_death_rate").as("std_cdr"),
stddev("diabetes_prevalence").as("std_dp"),
stddev("female_smokers").as("std_fs"),
stddev("male_smokers").as("std_ms"),
stddev("hospital_beds_per_thousand").as("std_hbpt"),
stddev("life_expectancy").as("std_le"),
stddev("human_development_index").as("std_hdi")
)
.where(
(col("std_pop") > 0) || (col("std_pd") > 1e-20) || (col("std_ma") > 0) || (col("std_a65") > 0) || (col("std_a70") > 0) || (col("std_gdp") > 0 ||
col("std_cdr") > 0) || (col("std_dp") > 0) || (col("std_fs") > 0) || (col("std_ms") > 0) || (col("std_hbpt") > 0) || (col("std_le") > 0) || (col("std_hdi") > 0))
display(constant_feature_checker)
Each country have some constant features always
val distinct_features = df_invariantFeatures.distinct()
display(distinct_features)
In total, 126 countries have complete features
val valid_distinct_features = distinct_features.filter($"population".isNotNull && $"population_density".isNotNull && $"median_age".isNotNull &&
$"aged_65_older".isNotNull && $"aged_70_older".isNotNull && $"gdp_per_capita".isNotNull &&
$"cardiovasc_death_rate".isNotNull && $"diabetes_prevalence".isNotNull && $"female_smokers".isNotNull &&
$"male_smokers".isNotNull && $"hospital_beds_per_thousand".isNotNull && $"life_expectancy".isNotNull &&
$"human_development_index".isNotNull)
display(valid_distinct_features)
country list
display(valid_distinct_features.select($"location"))
valid_distinct_features.select($"location").count()
res69: Long = 126
val df_cleaned_feature = df_cleaned.filter($"location".isin(valid_distinct_features.select($"location").rdd.map(r => r(0)).collect().toSeq: _*))
display(df_cleaned_feature)
All data contains complete list of invariant time feature
(All code above is for illustration, for processing just run cell below )
// select invariant features
val df_invariantFeatures = df_cleaned.select($"location", $"population",$"population_density",
$"median_age", $"aged_65_older",
$"aged_70_older",$"gdp_per_capita",
$"cardiovasc_death_rate",$"diabetes_prevalence",
$"female_smokers",$"male_smokers",$"hospital_beds_per_thousand",
$"life_expectancy",$"human_development_index").cache
// Extract valid distrinct features RDD
val valid_distinct_features = df_invariantFeatures.distinct()
.filter($"population".isNotNull && $"population_density".isNotNull && $"median_age".isNotNull &&
$"aged_65_older".isNotNull && $"aged_70_older".isNotNull && $"gdp_per_capita".isNotNull &&
$"cardiovasc_death_rate".isNotNull && $"diabetes_prevalence".isNotNull && $"female_smokers".isNotNull &&
$"male_smokers".isNotNull && $"hospital_beds_per_thousand".isNotNull && $"life_expectancy".isNotNull &&
$"human_development_index".isNotNull).cache
df_invariantFeatures.unpersist()
// filter out NULL feature countries
val df_cleaned_feature = df_cleaned.filter($"location".isin(valid_distinct_features.select($"location").rdd.map(r => r(0)).collect().toSeq: _*)).cache
df_cleaned.unpersist()
display(df_cleaned_feature)
import org.apache.spark.sql.functions._
for (c <- df_cleaned_feature.columns) {
println(c + ": " + df_cleaned_feature.filter(col(c).isNull).count())
}
4. Imputing missing time series data of
- totalcasesper_million
- newcasesper_million
- newcasessmoothedpermillion
- totaldeathsper_million
- newdeathsper_million
- newdeathssmoothedpermillion
val per_million_data = df_cleaned_feature.select($"location", $"date", $"iso_code", $"total_cases",
$"total_deaths", $"new_cases", $"new_deaths", $"new_cases_smoothed",
$"new_deaths_smoothed", $"population", $"population_density",
$"total_cases_per_million", $"new_cases_per_million", $"new_cases_smoothed_per_million",
$"total_deaths_per_million", $"new_deaths_per_million", $"new_deaths_smoothed_per_million")
display(per_million_data)
val per_million_data_corrected = per_million_data.withColumn("total_cases_per_million_correct", per_million_data("total_cases")/per_million_data("population")*1000000)
.withColumn("new_cases_per_million_correct", per_million_data("new_cases")/per_million_data("population")*1000000)
.withColumn("new_cases_smoothed_per_million_correct", per_million_data("new_cases_smoothed")/per_million_data("population")*1000000)
.withColumn("total_deaths_per_million_correct", per_million_data("total_deaths")/per_million_data("population")*1000000)
.withColumn("new_deaths_per_million_correct", per_million_data("new_deaths")/per_million_data("population")*1000000)
.withColumn("new_deaths_smoothed_per_million_correct", per_million_data("new_deaths_smoothed")/per_million_data("population")*1000000)
.drop("total_cases_per_million", "new_cases_per_million", "new_cases_smoothed_per_million",
"total_deaths_per_million", "new_deaths_per_million", "new_deaths_smoothed_per_million") // drop old column to rename
.withColumnRenamed("total_cases_per_million_correct","total_cases_per_million")
.withColumnRenamed("new_cases_per_million_correct","new_cases_per_million")
.withColumnRenamed("new_cases_smoothed_per_million_correct","new_cases_smoothed_per_million")
.withColumnRenamed("total_deaths_per_million_correct","total_deaths_per_million")
.withColumnRenamed("new_deaths_per_million_correct","new_deaths_per_million")
.withColumnRenamed("new_deaths_smoothed_per_million_correct","new_deaths_smoothed_per_million")
per_million_data_corrected: org.apache.spark.sql.DataFrame = [location: string, date: string ... 15 more fields]
val df_cleaned_feature_permillion = df_cleaned_feature.withColumn("total_cases_per_million_correct", df_cleaned_feature("total_cases")/df_cleaned_feature("population")*1000000)
.withColumn("new_cases_per_million_correct", df_cleaned_feature("new_cases")/df_cleaned_feature("population")*1000000)
.withColumn("new_cases_smoothed_per_million_correct", df_cleaned_feature("new_cases_smoothed")/df_cleaned_feature("population")*1000000)
.withColumn("total_deaths_per_million_correct", df_cleaned_feature("total_deaths")/df_cleaned_feature("population")*1000000)
.withColumn("new_deaths_per_million_correct", df_cleaned_feature("new_deaths")/df_cleaned_feature("population")*1000000)
.withColumn("new_deaths_smoothed_per_million_correct", df_cleaned_feature("new_deaths_smoothed")/df_cleaned_feature("population")*1000000)
.drop("total_cases_per_million", "new_cases_per_million", "new_cases_smoothed_per_million",
"total_deaths_per_million", "new_deaths_per_million", "new_deaths_smoothed_per_million") // drop old column to rename
.withColumnRenamed("total_cases_per_million_correct","total_cases_per_million")
.withColumnRenamed("new_cases_per_million_correct","new_cases_per_million")
.withColumnRenamed("new_cases_smoothed_per_million_correct","new_cases_smoothed_per_million")
.withColumnRenamed("total_deaths_per_million_correct","total_deaths_per_million")
.withColumnRenamed("new_deaths_per_million_correct","new_deaths_per_million")
.withColumnRenamed("new_deaths_smoothed_per_million_correct","new_deaths_smoothed_per_million")
df_cleaned_feature_permillion: org.apache.spark.sql.DataFrame = [iso_code: string, continent: string ... 48 more fields]
5. Impute time series of
- reproduction_rate
- total_tests
- stringency_index
- totaltestsper_thousand
fill null in reproduction_rate by last available and next available value
All countries has missing data at beginning or in the end
display(df_cleaned_feature_permillion.select($"reproduction_rate", $"location", $"date").filter($"reproduction_rate".isNull).groupBy("location").count().sort("location"))
display(df_cleaned_feature_permillion.select($"reproduction_rate", $"location", $"date").filter($"reproduction_rate".isNull).groupBy("location").agg(max("date").as("max_date"), min("date").as("min_date")).sort("location"))
display(df_cleaned_feature_permillion.select($"reproduction_rate", $"location", $"date").filter($"location"==="Albania"))
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val df_cleaned_reproduction_rate= df_cleaned_feature_permillion.select($"reproduction_rate", $"location", $"date")
.withColumn("reproduction_rate", last("reproduction_rate", true)
.over(Window.partitionBy("location").orderBy("date").rowsBetween(-df_cleaned_feature_permillion.count(), 0)))
.withColumn("reproduction_rate", first("reproduction_rate", true)
.over(Window.partitionBy("location").orderBy("date").rowsBetween(0, df_cleaned_feature_permillion.count())))
.na.fill(0, Array("reproduction_rate"))
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
df_cleaned_reproduction_rate: org.apache.spark.sql.DataFrame = [reproduction_rate: double, location: string ... 1 more field]
countries miss stringency_index value
display(df_cleaned_feature_permillion.select($"stringency_index", $"location", $"date").filter($"stringency_index".isNull).groupBy("location").count().sort("count"))
start and end date for null value of stringency_index for each country
display(df_cleaned_feature_permillion.select($"stringency_index", $"location", $"date").filter($"stringency_index".isNull).groupBy("location").agg(max("date").as("max_date"), min("date").as("min_date")))
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val df_cleaned_stringency = df_cleaned_feature_permillion.select($"stringency_index", $"location", $"date")
.withColumn("stringency_index_corect", last("stringency_index", true)
.over(Window.partitionBy("location").orderBy("date").rowsBetween(-df_cleaned_feature_permillion.count(), 0)))
display(df_cleaned_stringency.filter($"stringency_index".isNull).filter($"stringency_index_corect".isNull).groupBy("location").count())
location | count |
---|---|
Comoros | 316.0 |
Bahamas | 316.0 |
Malta | 316.0 |
Montenegro | 316.0 |
Armenia | 316.0 |
total_tests, impute by last available or next available value
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val df_cleaned_total_cases = df_cleaned_feature_permillion.select($"total_tests", $"location", $"date")
.withColumn("total_tests", last("total_tests", true)
.over(Window.partitionBy("location").orderBy("date").rowsBetween(-df_cleaned_feature_permillion.count(), 0)))
.withColumn("total_tests", first("total_tests", true)
.over(Window.partitionBy("location").orderBy("date").rowsBetween(0, df_cleaned_feature_permillion.count())))
.na.fill(0, Array("total_tests"))
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
df_cleaned_total_cases: org.apache.spark.sql.DataFrame = [total_tests: double, location: string ... 1 more field]
display(df_cleaned_feature_permillion.select($"total_tests", $"location", $"date").filter($"total_tests".isNull).groupBy("location").count())
val total_tests_date_maxmin = df_cleaned_feature_permillion.select($"total_tests", $"location", $"date").filter($"total_tests".isNull).groupBy("location").agg(max("date").as("max_date"), min("date").as("min_date"))
display(total_tests_date_maxmin)
process stringencyindex, reproductionrate, totaltests, totaltestsperthousand
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val df_cleaned_time_series = df_cleaned_feature_permillion
.withColumn("reproduction_rate", last("reproduction_rate", true)
.over(Window.partitionBy("location").orderBy("date").rowsBetween(-df_cleaned_feature_permillion.count(), 0)))
.withColumn("reproduction_rate", first("reproduction_rate", true)
.over(Window.partitionBy("location").orderBy("date").rowsBetween(0, df_cleaned_feature_permillion.count())))
.na.fill(0, Array("reproduction_rate"))
.withColumn("stringency_index", last("stringency_index", true)
.over(Window.partitionBy("location").orderBy("date").rowsBetween(-df_cleaned_feature_permillion.count(), 0)))
.na.fill(0, Array("stringency_index"))
.withColumn("total_tests", last("total_tests", true)
.over(Window.partitionBy("location").orderBy("date").rowsBetween(-df_cleaned_feature_permillion.count(), 0)))
.withColumn("total_tests", first("total_tests", true)
.over(Window.partitionBy("location").orderBy("date").rowsBetween(0, df_cleaned_feature_permillion.count())))
.na.fill(0, Array("total_tests"))
.withColumn("total_tests_per_thousand", col("total_tests")/col("population")*1000)
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
df_cleaned_time_series: org.apache.spark.sql.DataFrame = [iso_code: string, continent: string ... 48 more fields]
display(df_cleaned_time_series)
import org.apache.spark.sql.functions._
for (c <- df_cleaned_time_series.columns) {
println(c + ": " + df_cleaned_time_series.filter(col(c).isNull).count())
}