display(dbutils.fs.ls("/datasets/group12/"))
path | name | size |
---|---|---|
dbfs:/datasets/group12/20_12_04_08_31_44.csv | 20_12_04_08_31_44.csv | 1.4181338e7 |
dbfs:/datasets/group12/20_12_04_08_32_40.csv | 20_12_04_08_32_40.csv | 1.4181338e7 |
dbfs:/datasets/group12/20_12_04_10_47_08.csv | 20_12_04_10_47_08.csv | 1.4190774e7 |
dbfs:/datasets/group12/21_01_07_08_50_05.csv | 21_01_07_08_50_05.csv | 1.4577033e7 |
dbfs:/datasets/group12/21_01_07_09_05_33.csv | 21_01_07_09_05_33.csv | 1.4577033e7 |
dbfs:/datasets/group12/analysis/ | analysis/ | 0.0 |
dbfs:/datasets/group12/chkpoint/ | chkpoint/ | 0.0 |
val df = spark.read.parquet("dbfs:/datasets/group12/analysis/*.parquet")
display(df)
df.count()
res10: Long = 60544
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
// filter unknow and HK data
val df_filteredLocation = df.filter($"iso_code"=!="HKG").filter($"iso_code".isNotNull)
// fill missing continent value for World data
val df_fillContinentNull = df_filteredLocation.na.fill("World",Array("continent")).cache
df_filteredLocation.unpersist()
df_filteredLocation: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [iso_code: string, continent: string ... 48 more fields]
df_fillContinentNull: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [iso_code: string, continent: string ... 48 more fields]
res4: df_filteredLocation.type = [iso_code: string, continent: string ... 48 more fields]
// filter date before 2020-01-23
val df_filtered_date = df_fillContinentNull.filter($"date">"2020-01-22").cache
df_fillContinentNull.unpersist()
df_filtered_date: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [iso_code: string, continent: string ... 48 more fields]
res6: df_fillContinentNull.type = [iso_code: string, continent: string ... 48 more fields]
// fill missing for new_cases_smoothed and new_deaths_smoothed
val df_fillNullForSmooth = df_filtered_date.na.fill(0,Array("new_cases_smoothed"))
.na.fill(0,Array("new_deaths_smoothed"))
.cache
df_filtered_date.unpersist()
// fill missing for total_cases
val df_fillNullForTotalCases = df_fillNullForSmooth.na.fill(0, Array("total_cases")).cache
df_fillNullForSmooth.unpersist()
// correct total_deaths, new_deaths, new_deaths_smoothed
val df_fillNullForTotalDeathsSpecial = df_fillNullForTotalCases.withColumn("total_deaths_correct",
when(col("iso_code").equalTo("ISL")&&(col("date")>"2020-03-13" && col("date")<"2020-03-22"),0)
.when(col("iso_code").equalTo("PNG")&&(col("date")>"2020-07-19" && col("date")<"2020-07-23"),0)
.when(col("iso_code").equalTo("SVK")&&(col("date")>"2020-03-17" && col("date")<"2020-03-23"),0).otherwise(col("total_deaths")))
.withColumn("new_deaths_correct",
when(col("iso_code").equalTo("ISL")&&(col("date")>"2020-03-13" && col("date")<"2020-03-22"),0)
.when(col("iso_code").equalTo("PNG")&&(col("date")>"2020-07-19" && col("date")<"2020-07-23"),0)
.when(col("iso_code").equalTo("SVK")&&(col("date")>"2020-03-17" && col("date")<"2020-03-23"),0).otherwise(col("new_deaths")))
.withColumn("new_deaths_smoothed_correct",
when(col("iso_code").equalTo("ISL")&&(col("date")>"2020-03-13" && col("date")<"2020-03-22"),0)
.when(col("iso_code").equalTo("PNG")&&(col("date")>"2020-07-19" && col("date")<"2020-07-23"),0)
.when(col("iso_code").equalTo("SVK")&&(col("date")>"2020-03-17" && col("date")<"2020-03-23"),0).otherwise(col("new_deaths_smoothed")))
.cache
df_fillNullForTotalCases.unpersist()
val df_cleaned = df_fillNullForTotalDeathsSpecial
.drop("new_deaths", "total_deaths", "new_deaths_smoothed") // drop old column to rename
.withColumnRenamed("new_deaths_correct","new_deaths")
.withColumnRenamed("total_deaths_correct","total_deaths")
.withColumnRenamed("new_deaths_smoothed_correct","new_deaths_smoothed")
.na.fill(0, Array("total_deaths"))
.select(df.columns.head, df.columns.tail: _*)
.cache
df_fillNullForTotalDeathsSpecial.unpersist()
display(df_cleaned)
3. Select invariant (during pandemic) features for clustering and filter out countries that have missing invariant features
Invariant feature list: - population - populationdensity - medianage - aged65older - aged70older - gdppercapita - cardiovascdeathrate - diabetesprevalence - femalesmokers - malesmokers - hospitalbedsperthousand - lifeexpectancy - humandevelopment_index
// select invariant features
val df_invariantFeatures = df_cleaned.select($"iso_code",$"location", $"population",$"population_density",
$"median_age", $"aged_65_older",
$"aged_70_older",$"gdp_per_capita",
$"cardiovasc_death_rate",$"diabetes_prevalence",
$"female_smokers",$"male_smokers",$"hospital_beds_per_thousand",
$"life_expectancy",$"human_development_index").cache
// Extract valid distrinct features RDD
val valid_distinct_features = df_invariantFeatures.distinct()
.filter($"population".isNotNull && $"population_density".isNotNull && $"median_age".isNotNull &&
$"aged_65_older".isNotNull && $"aged_70_older".isNotNull && $"gdp_per_capita".isNotNull &&
$"cardiovasc_death_rate".isNotNull && $"diabetes_prevalence".isNotNull && $"female_smokers".isNotNull &&
$"male_smokers".isNotNull && $"hospital_beds_per_thousand".isNotNull && $"life_expectancy".isNotNull &&
$"human_development_index".isNotNull)
// filter out NULL feature countries
val df_cleaned_feature = df_cleaned.filter($"location".isin(valid_distinct_features.select($"location").rdd.map(r => r(0)).collect().toSeq: _*)).cache
df_cleaned.unpersist()
display(df_cleaned_feature)
4. Imputing missing data for
- totalcasesper_million
- newcasesper_million
- newcasessmoothedpermillion
- totaldeathsper_million
- newdeathsper_million
- newdeathssmoothedpermillion
val df_cleaned_feature_permillion = df_cleaned_feature.withColumn("total_cases_per_million_correct", df_cleaned_feature("total_cases")/df_cleaned_feature("population")*1000000)
.withColumn("new_cases_per_million_correct", df_cleaned_feature("new_cases")/df_cleaned_feature("population")*1000000)
.withColumn("new_cases_smoothed_per_million_correct", df_cleaned_feature("new_cases_smoothed")/df_cleaned_feature("population")*1000000)
.withColumn("total_deaths_per_million_correct", df_cleaned_feature("total_deaths")/df_cleaned_feature("population")*1000000)
.withColumn("new_deaths_per_million_correct", df_cleaned_feature("new_deaths")/df_cleaned_feature("population")*1000000)
.withColumn("new_deaths_smoothed_per_million_correct", df_cleaned_feature("new_deaths_smoothed")/df_cleaned_feature("population")*1000000)
.drop("total_cases_per_million", "new_cases_per_million", "new_cases_smoothed_per_million",
"total_deaths_per_million", "new_deaths_per_million", "new_deaths_smoothed_per_million") // drop old column to rename
.withColumnRenamed("total_cases_per_million_correct","total_cases_per_million")
.withColumnRenamed("new_cases_per_million_correct","new_cases_per_million")
.withColumnRenamed("new_cases_smoothed_per_million_correct","new_cases_smoothed_per_million")
.withColumnRenamed("total_deaths_per_million_correct","total_deaths_per_million")
.withColumnRenamed("new_deaths_per_million_correct","new_deaths_per_million")
.withColumnRenamed("new_deaths_smoothed_per_million_correct","new_deaths_smoothed_per_million")
df_cleaned_feature_permillion: org.apache.spark.sql.DataFrame = [iso_code: string, continent: string ... 48 more fields]
5. Impute time series data of
- reproduction_rate
- total_tests
- stringency_index
- totaltestsper_thousand
val df_cleaned_time_series = df_cleaned_feature_permillion
.withColumn("reproduction_rate", last("reproduction_rate", true)
.over(Window.partitionBy("location").orderBy("date").rowsBetween(-df_cleaned_feature_permillion.count(), 0)))
.withColumn("reproduction_rate", first("reproduction_rate", true)
.over(Window.partitionBy("location").orderBy("date").rowsBetween(0, df_cleaned_feature_permillion.count())))
.na.fill(0, Array("reproduction_rate"))
.withColumn("stringency_index", last("stringency_index", true)
.over(Window.partitionBy("location").orderBy("date").rowsBetween(-df_cleaned_feature_permillion.count(), 0)))
.na.fill(0, Array("stringency_index"))
.withColumn("total_tests", last("total_tests", true)
.over(Window.partitionBy("location").orderBy("date").rowsBetween(-df_cleaned_feature_permillion.count(), 0)))
.withColumn("total_tests", first("total_tests", true)
.over(Window.partitionBy("location").orderBy("date").rowsBetween(0, df_cleaned_feature_permillion.count())))
.na.fill(0, Array("total_tests"))
.withColumn("total_tests_per_thousand", col("total_tests")/col("population")*1000)
.cache
df_cleaned_feature_permillion.unpersist()
display(df_cleaned_time_series)