ScaDaMaLe Course site and book

Stream to parquet file

This notebook allows for setup and execution of the data streaming and querying into a parquet file. The idea is thereafter to perform analysis on the parquet file.

Note that this notebooks assumes one has already has downloaded several "Our World in Data" dataset csv files. This can be done by first running "DownloadFilesPeriodicallyScript" at least once.

Content is based on "038_StructuredStreamingProgGuide" by Raazesh Sainudiin.

start by copying latest downloaded csv data to data analysis folder

dbutils.fs.cp("file:///databricks/driver/projects/group12/logsEveryXSecs/","/datasets/group12/",true)
res0: Boolean = true

check that data is in the group12 folder

display(dbutils.fs.ls("/datasets/group12/"))
path name size
dbfs:/datasets/group12/20_12_04_08_31_44.csv 20_12_04_08_31_44.csv 1.4181338e7
dbfs:/datasets/group12/20_12_04_08_32_40.csv 20_12_04_08_32_40.csv 1.4181338e7
dbfs:/datasets/group12/20_12_04_10_47_08.csv 20_12_04_10_47_08.csv 1.4190774e7
dbfs:/datasets/group12/21_01_07_08_50_05.csv 21_01_07_08_50_05.csv 1.4577033e7
dbfs:/datasets/group12/21_01_07_09_05_33.csv 21_01_07_09_05_33.csv 1.4577033e7
dbfs:/datasets/group12/analysis/ analysis/ 0.0
dbfs:/datasets/group12/chkpoint/ chkpoint/ 0.0

check the schema for the csv files.

val df_csv = spark.read.format("csv").option("header", "true").option("inferSchema", "true").csv("/datasets/group12/21_01_07_09_05_33.csv")
df_csv: org.apache.spark.sql.DataFrame = [iso_code: string, continent: string ... 52 more fields]
df_csv.printSchema

The stream requires a user defined schema. Note that the January 2021 schema is different compared to the December 2020 schema. Below, the user defined schemas are created.

import org.apache.spark.sql.types._

val OurWorldinDataSchema2021 = new StructType()                      
                      .add("iso_code", "string")
                      .add("continent", "string")
                      .add("location", "string")
                      .add("date", "string")
                      .add("total_cases","double")
                      .add("new_cases","double")
                      .add("new_cases_smoothed","double")
                      .add("total_deaths","double")
                      .add("new_deaths","double")
                      .add("new_deaths_smoothed","double")
                      .add("total_cases_per_million","double")
                      .add("new_cases_per_million","double")
                      .add("new_cases_smoothed_per_million","double")
                      .add("total_deaths_per_million","double")
                      .add("new_deaths_per_million","double")
                      .add("new_deaths_smoothed_per_million","double")
                      .add("reproduction_rate", "double")
                      .add("icu_patients", "double")
                      .add("icu_patients_per_million", "double")
                      .add("hosp_patients", "double")
                      .add("hosp_patients_per_million", "double")
                      .add("weekly_icu_admissions", "double")
                      .add("weekly_icu_admissions_per_million", "double")
                      .add("weekly_hosp_admissions", "double")
                      .add("weekly_hosp_admissions_per_million", "double")
                      .add("new_tests", "double")
                      .add("total_tests", "double")
                      .add("total_tests_per_thousand", "double")
                      .add("new_tests_per_thousand", "double")
                      .add("new_tests_smoothed", "double")
                      .add("new_tests_smoothed_per_thousand", "double")
                      .add("positive_rate", "double")
                      .add("tests_per_case", "double")
                      .add("tests_units", "double")
                      .add("total_vaccinations", "double")
                      .add("new_vaccinations", "double")
                      .add("stringency_index","double")
                      .add("population","double")
                      .add("population_density","double")
                      .add("median_age", "double")
                      .add("aged_65_older", "double")
                      .add("aged_70_older", "double")
                      .add("gdp_per_capita","double")
                      .add("extreme_poverty","double")
                      .add("cardiovasc_death_rate","double")
                      .add("diabetes_prevalence","double")
                      .add("female_smokers", "double")
                      .add("male_smokers", "double")
                      .add("handwashing_facilities", "double")
                      .add("hospital_beds_per_thousand", "double")
                      .add("life_expectancy","double")
                      .add("human_development_index","double")

val OurWorldinDataSchema2020 = new StructType()                      
                      .add("iso_code", "string")
                      .add("continent", "string")
                      .add("location", "string")
                      .add("date", "string")
                      .add("total_cases","double")
                      .add("new_cases","double")
                      .add("new_cases_smoothed","double")
                      .add("total_deaths","double")
                      .add("new_deaths","double")
                      .add("new_deaths_smoothed","double")
                      .add("total_cases_per_million","double")
                      .add("new_cases_per_million","double")
                      .add("new_cases_smoothed_per_million","double")
                      .add("total_deaths_per_million","double")
                      .add("new_deaths_per_million","double")
                      .add("new_deaths_smoothed_per_million","double")
                      .add("reproduction_rate", "double")
                      .add("icu_patients", "double")
                      .add("icu_patients_per_million", "double")
                      .add("hosp_patients", "double")
                      .add("hosp_patients_per_million", "double")
                      .add("weekly_icu_admissions", "double")
                      .add("weekly_icu_admissions_per_million", "double")
                      .add("weekly_hosp_admissions", "double")
                      .add("weekly_hosp_admissions_per_million", "double")
                      .add("total_tests", "double")
                      .add("new_tests", "double")
                      .add("total_tests_per_thousand", "double")
                      .add("new_tests_per_thousand", "double")
                      .add("new_tests_smoothed", "double")
                      .add("new_tests_smoothed_per_thousand", "double")
                      .add("tests_per_case", "double")
                      .add("positive_rate", "double")
                      .add("tests_units", "double")
                      .add("stringency_index","double")
                      .add("population","double")
                      .add("population_density","double")
                      .add("median_age", "double")
                      .add("aged_65_older", "double")
                      .add("aged_70_older", "double")
                      .add("gdp_per_capita","double")
                      .add("extreme_poverty","double")
                      .add("cardiovasc_death_rate","double")
                      .add("diabetes_prevalence","double")
                      .add("female_smokers", "double")
                      .add("male_smokers", "double")
                      .add("handwashing_facilities", "double")
                      .add("hospital_beds_per_thousand", "double")
                      .add("life_expectancy","double")
                      .add("human_development_index","double")

Start stream

In January 2021, the schema was updated compared to the schema in December 2020. Below, one can choose which type of csv files to stream below.

Stream for 2020

import org.apache.spark.sql.types._

val OurWorldinDataStream = spark
  .readStream
  .schema(OurWorldinDataSchema2020) 
  .option("MaxFilesPerTrigger", 1)
  .option("latestFirst", "true")
  .format("csv")
  .option("header", "true")
  .load("/datasets/group12/20*.csv")
  .dropDuplicates()
import org.apache.spark.sql.types._
OurWorldinDataStream: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [iso_code: string, continent: string ... 48 more fields]

Stream for 2021

import org.apache.spark.sql.types._

val OurWorldinDataStream2021 = spark
  .readStream
  .schema(OurWorldinDataSchema2021) 
  .option("MaxFilesPerTrigger", 1)
  .option("latestFirst", "true")
  .format("csv")
  .option("header", "true")
  .load("/datasets/group12/21*.csv")
  .dropDuplicates()
import org.apache.spark.sql.types._
OurWorldinDataStream2021: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [iso_code: string, continent: string ... 50 more fields]

display stream 2020

OurWorldinDataStream.isStreaming
res81: Boolean = true
display(OurWorldinDataStream) 

Query to File (2020)

query that saves file into a parquet file at periodic intervalls. Analysis will thereafter be performed on the parquet file

create folders for parquet file and checkpoint data

// remove any previous folders if exists
dbutils.fs.rm("datasets/group12/chkpoint",recurse=true)
dbutils.fs.rm("datasets/group12/analysis",recurse=true)
res14: Boolean = true
dbutils.fs.mkdirs("datasets/group12/chkpoint")
res15: Boolean = true
dbutils.fs.mkdirs("/datasets/group12/analysis")
res16: Boolean = true

initialize query to store data in parquet files based on column selection

import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger

val query = OurWorldinDataStream
                 .select($"iso_code", $"continent", $"location", $"date", $"total_cases", $"new_cases", $"new_cases_smoothed", $"total_deaths", $"new_deaths",$"new_deaths_smoothed", $"total_cases_per_million", $"new_cases_per_million", $"new_cases_smoothed_per_million", $"total_deaths_per_million", $"new_deaths_per_million", $"new_deaths_smoothed_per_million", $"reproduction_rate", $"icu_patients", $"icu_patients_per_million", $"hosp_patients", $"hosp_patients_per_million", $"weekly_icu_admissions", $"weekly_icu_admissions_per_million", $"weekly_hosp_admissions", $"weekly_hosp_admissions_per_million", $"total_tests",$"new_tests", $"total_tests_per_thousand", $"new_tests_per_thousand", $"new_tests_smoothed",$"new_tests_smoothed_per_thousand", $"tests_per_case", $"positive_rate", $"tests_units", $"stringency_index", $"population", $"population_density", $"median_age", $"aged_65_older", $"aged_70_older", $"gdp_per_capita", $"extreme_poverty", $"cardiovasc_death_rate", $"diabetes_prevalence", $"female_smokers", $"male_smokers", $"handwashing_facilities", $"hospital_beds_per_thousand", $"life_expectancy", $"human_development_index")
                 .writeStream
                 //.trigger(Trigger.ProcessingTime("20 seconds")) // debugging
                 .trigger(Trigger.ProcessingTime("216000 seconds")) // for each day
                 .option("checkpointLocation", "/datasets/group12/chkpoint")
                 .format("parquet")  
                 .option("path", "/datasets/group12/analysis")
                 .start()
                 
query.awaitTermination() // hit cancel to terminate

check saved parquet file contents

display(dbutils.fs.ls("/datasets/group12/analysis"))
val parquetFileDF = spark.read.parquet("dbfs:/datasets/group12/analysis/*.parquet")
parquetFileDF: org.apache.spark.sql.DataFrame = [iso_code: string, continent: string ... 48 more fields]
display(parquetFileDF.describe())
summary iso_code continent location date total_cases new_cases new_cases_smoothed total_deaths new_deaths new_deaths_smoothed total_cases_per_million new_cases_per_million new_cases_smoothed_per_million total_deaths_per_million new_deaths_per_million new_deaths_smoothed_per_million reproduction_rate icu_patients icu_patients_per_million hosp_patients hosp_patients_per_million weekly_icu_admissions weekly_icu_admissions_per_million weekly_hosp_admissions weekly_hosp_admissions_per_million total_tests new_tests total_tests_per_thousand new_tests_per_thousand new_tests_smoothed new_tests_smoothed_per_thousand tests_per_case positive_rate tests_units stringency_index population population_density median_age aged_65_older aged_70_older gdp_per_capita extreme_poverty cardiovasc_death_rate diabetes_prevalence female_smokers male_smokers handwashing_facilities hospital_beds_per_thousand life_expectancy human_development_index
count 62184 61867 62500 62500 53761 62377 61421 45901 62377 61421 53460 62061 61110 45613 62061 61110 41680 5582 5582 6730 6730 538 538 885 885 25647 25606 25647 25606 28408 28408 26927 26336 0 54546 62184 60592 59340 58383 59024 59329 40812 59902 60595 46712 46080 30696 54818 61868 59646
mean null null null null 235416.1718904038 2397.8407425814003 2367.056380553872 8908.311256835363 55.19233050643667 54.80704391006325 3798.2407507482267 43.13754119334201 42.41964747177225 108.27332865630407 0.761575578865955 0.7475491572574053 1.049197216890596 1171.3697599426728 18.08625438910784 5656.88558692422 104.44401604754825 320.92 10.748325278810405 2961.1632870056505 81.23565310734462 3233707.097438297 37267.77532609545 103.08093254571695 1.1276385222213543 35891.313679245286 1.0969730005632223 0.07456953986704766 171.59424362089905 null 53.312537674623144 8.17052256214782E7 307.3537588625577 30.42868217054277 8.773048729938651 5.539936330984043 19213.552059313115 13.465191610310459 258.8075386297629 7.891889759881105 10.66726918136675 32.47178591579883 50.99304310007871 3.0230664197891057 73.0352230555375 0.7137129396774315
stddev null null null null 2063317.939351184 20722.406766273503 20188.721091973344 62835.10460453239 420.0315418848828 404.7550597418351 7537.842859063191 144.63668342422923 115.82591210719889 201.80463357477137 3.003047037105768 2.17917077365699 0.38591731058680295 2792.1448587365685 24.026044243287902 13218.496002944645 149.63443300155708 596.5733008531096 25.857485153686934 6410.029575144745 233.24453357627317 1.45235647300503E7 144743.48405262225 209.81522035476976 2.1450449484564924 134865.99995119465 1.9561605087188363 0.09616514922644583 861.1206108534275 null 27.591030251926593 5.716608354719421E8 1525.663259783739 9.09237871864885 6.230404954288745 4.248965298848258 20089.06946024988 19.988201128228035 120.99438607360678 4.172554929946806 10.429062017422085 13.415141793587788 31.83056463184429 2.4310237026014865 7.529274919111957 0.15483301083721898
min AFG Africa Afghanistan 2020-01-01 1.0 -10034.0 -525.0 1.0 -1918.0 -232.143 0.001 -2153.437 -276.825 0.001 -76.445 -10.921 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 1.0 -3743.0 0.0 -0.398 0.0 0.0 0.0 1.4 null 0.0 809.0 1.98 15.1 1.144 0.526 661.24 0.1 79.37 0.99 0.1 7.7 1.188 0.1 53.28 0.354
max ZWE South America Zimbabwe 2020-12-03 6.5220557E7 690127.0 600775.429 1506251.0 12825.0 10538.571 89354.818 8652.658 2648.773 1469.678 218.329 63.14 6.68 19396.0 127.183 100226.0 988.567 4375.407 190.051 50887.393 2645.194 1.92122147E8 1949384.0 2248.144 26.154 1703410.0 19.244 0.729 44258.7 null 100.0 7.794798729E9 19347.5 48.2 27.049 18.493 116935.6 77.6 724.417 30.53 44.0 78.1 98.999 13.8 86.75 0.953
display(parquetFileDF.orderBy($"date".desc))
parquetFileDF.count()
res5: Long = 62500

Query to File (2021)

query that saves file into a parquet file at periodic intervalls.

// remove any previous folders if exists
dbutils.fs.rm("datasets/group12/chkpoint2021",recurse=true)
dbutils.fs.rm("datasets/group12/analysis2021",recurse=true)
dbutils.fs.mkdirs("datasets/group12/chkpoint2021")
dbutils.fs.mkdirs("datasets/group12/analysis2021")
res18: Boolean = true
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger

val query = OurWorldinDataStream2021
                 .select($"iso_code", $"continent", $"location", $"date", $"total_cases", $"new_cases", $"new_cases_smoothed", $"total_deaths", $"new_deaths",$"new_deaths_smoothed", $"total_cases_per_million", $"new_cases_per_million", $"new_cases_smoothed_per_million", $"total_deaths_per_million", $"new_deaths_per_million", $"new_deaths_smoothed_per_million", $"reproduction_rate", $"icu_patients", $"icu_patients_per_million", $"hosp_patients", $"hosp_patients_per_million", $"weekly_icu_admissions", $"weekly_icu_admissions_per_million", $"weekly_hosp_admissions", $"weekly_hosp_admissions_per_million", $"total_tests",$"new_tests", $"total_tests_per_thousand", $"new_tests_per_thousand", $"new_tests_smoothed",$"new_tests_smoothed_per_thousand", $"tests_per_case", $"positive_rate", $"tests_units", $"stringency_index", $"population", $"population_density", $"median_age", $"aged_65_older", $"aged_70_older", $"gdp_per_capita", $"extreme_poverty", $"cardiovasc_death_rate", $"diabetes_prevalence", $"female_smokers", $"male_smokers", $"handwashing_facilities", $"hospital_beds_per_thousand", $"life_expectancy", $"human_development_index")
                 .writeStream
                 //.trigger(Trigger.ProcessingTime("20 seconds")) // debugging
                 .trigger(Trigger.ProcessingTime("216000 seconds")) // each day
                 .option("checkpointLocation", "/datasets/group12/chkpoint2021")
                 .format("parquet")  
                 .option("path", "/datasets/group12/analysis2021")
                 .start()
                 
query.awaitTermination() // hit cancel to terminate
val parquetFile2021DF = spark.read.parquet("dbfs:/datasets/group12/analysis2021/*.parquet")
parquetFile2021DF: org.apache.spark.sql.DataFrame = [iso_code: string, continent: string ... 48 more fields]
display(parquetFile2021DF.describe())
summary iso_code continent location date total_cases new_cases new_cases_smoothed total_deaths new_deaths new_deaths_smoothed total_cases_per_million new_cases_per_million new_cases_smoothed_per_million total_deaths_per_million new_deaths_per_million new_deaths_smoothed_per_million reproduction_rate icu_patients icu_patients_per_million hosp_patients hosp_patients_per_million weekly_icu_admissions weekly_icu_admissions_per_million weekly_hosp_admissions weekly_hosp_admissions_per_million total_tests new_tests total_tests_per_thousand new_tests_per_thousand new_tests_smoothed new_tests_smoothed_per_thousand tests_per_case positive_rate tests_units stringency_index population population_density median_age aged_65_older aged_70_older gdp_per_capita extreme_poverty cardiovasc_death_rate diabetes_prevalence female_smokers male_smokers handwashing_facilities hospital_beds_per_thousand life_expectancy human_development_index
count 58196 57845 58531 58531 58008 58001 57046 49657 49656 57046 57673 57666 56716 49335 49334 56716 44995 6086 6086 6845 6845 544 544 878 878 27006 27145 27006 27145 30478 30478 28331 28797 0 241 278 52358 58196 56964 55692 55044 55376 55710 38159 56294 56942 44455 43846 28176 51737
mean null null null null 280856.96293614677 2977.974862502371 2957.9528833748072 9440.825059910989 75.87175769292735 64.76055386880755 4803.329924592101 55.43429124614151 54.92539389237602 123.92782108036903 1.1725306887744764 0.9930127300938001 1.0397301922435842 924.7643772592836 17.942194544857045 4767.396055514974 112.35477647918188 389.18459375 23.096417279411764 2770.236054669703 90.94211161731207 3296570.453047471 33334.96798673789 113.1671816263053 1.1168154724627004 32438.389395629634 1.0890064636787196 166.36620662878127 0.08254411917908085 null 0.9067634854771786 849.6284532374101 59.32707265365355 9.113860612012853E7 323.8082390457141 30.631115779645334 8.842210231814642 5.6059546915631415 19206.748390809662 13.14487276920222 256.0564316090542 7.763601383864184 10.57440184456196 32.64338559959862 51.16526313174358 3.042046311150612
stddev null null null null 2748559.5139726754 26447.176041635634 25947.18346899017 74852.02463395565 554.179232738698 496.3241880037069 9770.196467368594 159.8367692227782 130.23859384700643 231.3831543436838 3.814136613643414 2.592627124113058 0.37314466501106797 2607.447792980141 23.096588581017723 13142.595210484165 162.19963589630862 1062.8669629409615 84.43280553001314 6258.389649510214 237.96388901304604 1.5192379103534836E7 135667.97569510888 232.08417747765577 2.101753078690164 126446.58519497044 1.9266264256088346 842.7776724543986 0.09966680167946733 null 2.447811842168661 2276.8853139301373 22.403193236864983 6.204846769231012E8 1577.4914969226447 9.117030300574706 6.256325795927324 4.27334121579429 19680.321466247937 19.8684806478708 117.99197986122954 3.878859778205247 10.419107338482048 13.45549336530331 31.775955000809844 2.473749904640983
min AFG Africa Afghanistan 2020-01-01 1.0 -46076.0 -1121.714 1.0 -1918.0 -232.143 0.001 -2153.437 -276.825 0.001 -76.445 -10.921 -0.01 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 1.0 -47510.0 0.0 -0.786 0.0 0.0 1.6 0.0 null 0.0 1.57 0.0 809.0 1.98 15.1 1.144 0.526 661.24 0.1 79.37 0.99 0.1 7.7 1.188 0.1
max ZWE South America Zimbabwe 2021-01-06 8.718654E7 780613.0 646630.286 1883761.0 15525.0 11670.429 108043.746 8652.658 2648.773 1826.861 218.329 63.14 6.74 23707.0 127.183 132476.0 922.018 10301.189 888.829 50887.393 2645.194 2.4798603E8 2133928.0 2705.599 28.716 1840248.0 23.701 44258.7 0.636 null 17.14 14497.77 100.0 7.794798729E9 19347.5 48.2 27.049 18.493 116935.6 77.6 724.417 30.53 44.0 78.1 98.999 13.8