Stream to parquet file
This notebook allows for setup and execution of the data streaming and querying into a parquet file. The idea is thereafter to perform analysis on the parquet file.
Note that this notebooks assumes one has already has downloaded several "Our World in Data" dataset csv files. This can be done by first running "DownloadFilesPeriodicallyScript" at least once.
Content is based on "038_StructuredStreamingProgGuide" by Raazesh Sainudiin.
start by copying latest downloaded csv data to data analysis folder
dbutils.fs.cp("file:///databricks/driver/projects/group12/logsEveryXSecs/","/datasets/group12/",true)
res0: Boolean = true
check that data is in the group12 folder
display(dbutils.fs.ls("/datasets/group12/"))
path | name | size |
---|---|---|
dbfs:/datasets/group12/20_12_04_08_31_44.csv | 20_12_04_08_31_44.csv | 1.4181338e7 |
dbfs:/datasets/group12/20_12_04_08_32_40.csv | 20_12_04_08_32_40.csv | 1.4181338e7 |
dbfs:/datasets/group12/20_12_04_10_47_08.csv | 20_12_04_10_47_08.csv | 1.4190774e7 |
dbfs:/datasets/group12/21_01_07_08_50_05.csv | 21_01_07_08_50_05.csv | 1.4577033e7 |
dbfs:/datasets/group12/21_01_07_09_05_33.csv | 21_01_07_09_05_33.csv | 1.4577033e7 |
dbfs:/datasets/group12/analysis/ | analysis/ | 0.0 |
dbfs:/datasets/group12/chkpoint/ | chkpoint/ | 0.0 |
check the schema for the csv files.
val df_csv = spark.read.format("csv").option("header", "true").option("inferSchema", "true").csv("/datasets/group12/21_01_07_09_05_33.csv")
df_csv: org.apache.spark.sql.DataFrame = [iso_code: string, continent: string ... 52 more fields]
df_csv.printSchema
The stream requires a user defined schema. Note that the January 2021 schema is different compared to the December 2020 schema. Below, the user defined schemas are created.
import org.apache.spark.sql.types._
val OurWorldinDataSchema2021 = new StructType()
.add("iso_code", "string")
.add("continent", "string")
.add("location", "string")
.add("date", "string")
.add("total_cases","double")
.add("new_cases","double")
.add("new_cases_smoothed","double")
.add("total_deaths","double")
.add("new_deaths","double")
.add("new_deaths_smoothed","double")
.add("total_cases_per_million","double")
.add("new_cases_per_million","double")
.add("new_cases_smoothed_per_million","double")
.add("total_deaths_per_million","double")
.add("new_deaths_per_million","double")
.add("new_deaths_smoothed_per_million","double")
.add("reproduction_rate", "double")
.add("icu_patients", "double")
.add("icu_patients_per_million", "double")
.add("hosp_patients", "double")
.add("hosp_patients_per_million", "double")
.add("weekly_icu_admissions", "double")
.add("weekly_icu_admissions_per_million", "double")
.add("weekly_hosp_admissions", "double")
.add("weekly_hosp_admissions_per_million", "double")
.add("new_tests", "double")
.add("total_tests", "double")
.add("total_tests_per_thousand", "double")
.add("new_tests_per_thousand", "double")
.add("new_tests_smoothed", "double")
.add("new_tests_smoothed_per_thousand", "double")
.add("positive_rate", "double")
.add("tests_per_case", "double")
.add("tests_units", "double")
.add("total_vaccinations", "double")
.add("new_vaccinations", "double")
.add("stringency_index","double")
.add("population","double")
.add("population_density","double")
.add("median_age", "double")
.add("aged_65_older", "double")
.add("aged_70_older", "double")
.add("gdp_per_capita","double")
.add("extreme_poverty","double")
.add("cardiovasc_death_rate","double")
.add("diabetes_prevalence","double")
.add("female_smokers", "double")
.add("male_smokers", "double")
.add("handwashing_facilities", "double")
.add("hospital_beds_per_thousand", "double")
.add("life_expectancy","double")
.add("human_development_index","double")
val OurWorldinDataSchema2020 = new StructType()
.add("iso_code", "string")
.add("continent", "string")
.add("location", "string")
.add("date", "string")
.add("total_cases","double")
.add("new_cases","double")
.add("new_cases_smoothed","double")
.add("total_deaths","double")
.add("new_deaths","double")
.add("new_deaths_smoothed","double")
.add("total_cases_per_million","double")
.add("new_cases_per_million","double")
.add("new_cases_smoothed_per_million","double")
.add("total_deaths_per_million","double")
.add("new_deaths_per_million","double")
.add("new_deaths_smoothed_per_million","double")
.add("reproduction_rate", "double")
.add("icu_patients", "double")
.add("icu_patients_per_million", "double")
.add("hosp_patients", "double")
.add("hosp_patients_per_million", "double")
.add("weekly_icu_admissions", "double")
.add("weekly_icu_admissions_per_million", "double")
.add("weekly_hosp_admissions", "double")
.add("weekly_hosp_admissions_per_million", "double")
.add("total_tests", "double")
.add("new_tests", "double")
.add("total_tests_per_thousand", "double")
.add("new_tests_per_thousand", "double")
.add("new_tests_smoothed", "double")
.add("new_tests_smoothed_per_thousand", "double")
.add("tests_per_case", "double")
.add("positive_rate", "double")
.add("tests_units", "double")
.add("stringency_index","double")
.add("population","double")
.add("population_density","double")
.add("median_age", "double")
.add("aged_65_older", "double")
.add("aged_70_older", "double")
.add("gdp_per_capita","double")
.add("extreme_poverty","double")
.add("cardiovasc_death_rate","double")
.add("diabetes_prevalence","double")
.add("female_smokers", "double")
.add("male_smokers", "double")
.add("handwashing_facilities", "double")
.add("hospital_beds_per_thousand", "double")
.add("life_expectancy","double")
.add("human_development_index","double")
Start stream
In January 2021, the schema was updated compared to the schema in December 2020. Below, one can choose which type of csv files to stream below.
Stream for 2020
import org.apache.spark.sql.types._
val OurWorldinDataStream = spark
.readStream
.schema(OurWorldinDataSchema2020)
.option("MaxFilesPerTrigger", 1)
.option("latestFirst", "true")
.format("csv")
.option("header", "true")
.load("/datasets/group12/20*.csv")
.dropDuplicates()
import org.apache.spark.sql.types._
OurWorldinDataStream: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [iso_code: string, continent: string ... 48 more fields]
Stream for 2021
import org.apache.spark.sql.types._
val OurWorldinDataStream2021 = spark
.readStream
.schema(OurWorldinDataSchema2021)
.option("MaxFilesPerTrigger", 1)
.option("latestFirst", "true")
.format("csv")
.option("header", "true")
.load("/datasets/group12/21*.csv")
.dropDuplicates()
import org.apache.spark.sql.types._
OurWorldinDataStream2021: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [iso_code: string, continent: string ... 50 more fields]
display stream 2020
OurWorldinDataStream.isStreaming
res81: Boolean = true
display(OurWorldinDataStream)
Query to File (2020)
query that saves file into a parquet file at periodic intervalls. Analysis will thereafter be performed on the parquet file
create folders for parquet file and checkpoint data
// remove any previous folders if exists
dbutils.fs.rm("datasets/group12/chkpoint",recurse=true)
dbutils.fs.rm("datasets/group12/analysis",recurse=true)
res14: Boolean = true
dbutils.fs.mkdirs("datasets/group12/chkpoint")
res15: Boolean = true
dbutils.fs.mkdirs("/datasets/group12/analysis")
res16: Boolean = true
initialize query to store data in parquet files based on column selection
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
val query = OurWorldinDataStream
.select($"iso_code", $"continent", $"location", $"date", $"total_cases", $"new_cases", $"new_cases_smoothed", $"total_deaths", $"new_deaths",$"new_deaths_smoothed", $"total_cases_per_million", $"new_cases_per_million", $"new_cases_smoothed_per_million", $"total_deaths_per_million", $"new_deaths_per_million", $"new_deaths_smoothed_per_million", $"reproduction_rate", $"icu_patients", $"icu_patients_per_million", $"hosp_patients", $"hosp_patients_per_million", $"weekly_icu_admissions", $"weekly_icu_admissions_per_million", $"weekly_hosp_admissions", $"weekly_hosp_admissions_per_million", $"total_tests",$"new_tests", $"total_tests_per_thousand", $"new_tests_per_thousand", $"new_tests_smoothed",$"new_tests_smoothed_per_thousand", $"tests_per_case", $"positive_rate", $"tests_units", $"stringency_index", $"population", $"population_density", $"median_age", $"aged_65_older", $"aged_70_older", $"gdp_per_capita", $"extreme_poverty", $"cardiovasc_death_rate", $"diabetes_prevalence", $"female_smokers", $"male_smokers", $"handwashing_facilities", $"hospital_beds_per_thousand", $"life_expectancy", $"human_development_index")
.writeStream
//.trigger(Trigger.ProcessingTime("20 seconds")) // debugging
.trigger(Trigger.ProcessingTime("216000 seconds")) // for each day
.option("checkpointLocation", "/datasets/group12/chkpoint")
.format("parquet")
.option("path", "/datasets/group12/analysis")
.start()
query.awaitTermination() // hit cancel to terminate
check saved parquet file contents
display(dbutils.fs.ls("/datasets/group12/analysis"))
val parquetFileDF = spark.read.parquet("dbfs:/datasets/group12/analysis/*.parquet")
parquetFileDF: org.apache.spark.sql.DataFrame = [iso_code: string, continent: string ... 48 more fields]
display(parquetFileDF.describe())
summary | iso_code | continent | location | date | total_cases | new_cases | new_cases_smoothed | total_deaths | new_deaths | new_deaths_smoothed | total_cases_per_million | new_cases_per_million | new_cases_smoothed_per_million | total_deaths_per_million | new_deaths_per_million | new_deaths_smoothed_per_million | reproduction_rate | icu_patients | icu_patients_per_million | hosp_patients | hosp_patients_per_million | weekly_icu_admissions | weekly_icu_admissions_per_million | weekly_hosp_admissions | weekly_hosp_admissions_per_million | total_tests | new_tests | total_tests_per_thousand | new_tests_per_thousand | new_tests_smoothed | new_tests_smoothed_per_thousand | tests_per_case | positive_rate | tests_units | stringency_index | population | population_density | median_age | aged_65_older | aged_70_older | gdp_per_capita | extreme_poverty | cardiovasc_death_rate | diabetes_prevalence | female_smokers | male_smokers | handwashing_facilities | hospital_beds_per_thousand | life_expectancy | human_development_index |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
count | 62184 | 61867 | 62500 | 62500 | 53761 | 62377 | 61421 | 45901 | 62377 | 61421 | 53460 | 62061 | 61110 | 45613 | 62061 | 61110 | 41680 | 5582 | 5582 | 6730 | 6730 | 538 | 538 | 885 | 885 | 25647 | 25606 | 25647 | 25606 | 28408 | 28408 | 26927 | 26336 | 0 | 54546 | 62184 | 60592 | 59340 | 58383 | 59024 | 59329 | 40812 | 59902 | 60595 | 46712 | 46080 | 30696 | 54818 | 61868 | 59646 |
mean | null | null | null | null | 235416.1718904038 | 2397.8407425814003 | 2367.056380553872 | 8908.311256835363 | 55.19233050643667 | 54.80704391006325 | 3798.2407507482267 | 43.13754119334201 | 42.41964747177225 | 108.27332865630407 | 0.761575578865955 | 0.7475491572574053 | 1.049197216890596 | 1171.3697599426728 | 18.08625438910784 | 5656.88558692422 | 104.44401604754825 | 320.92 | 10.748325278810405 | 2961.1632870056505 | 81.23565310734462 | 3233707.097438297 | 37267.77532609545 | 103.08093254571695 | 1.1276385222213543 | 35891.313679245286 | 1.0969730005632223 | 0.07456953986704766 | 171.59424362089905 | null | 53.312537674623144 | 8.17052256214782E7 | 307.3537588625577 | 30.42868217054277 | 8.773048729938651 | 5.539936330984043 | 19213.552059313115 | 13.465191610310459 | 258.8075386297629 | 7.891889759881105 | 10.66726918136675 | 32.47178591579883 | 50.99304310007871 | 3.0230664197891057 | 73.0352230555375 | 0.7137129396774315 |
stddev | null | null | null | null | 2063317.939351184 | 20722.406766273503 | 20188.721091973344 | 62835.10460453239 | 420.0315418848828 | 404.7550597418351 | 7537.842859063191 | 144.63668342422923 | 115.82591210719889 | 201.80463357477137 | 3.003047037105768 | 2.17917077365699 | 0.38591731058680295 | 2792.1448587365685 | 24.026044243287902 | 13218.496002944645 | 149.63443300155708 | 596.5733008531096 | 25.857485153686934 | 6410.029575144745 | 233.24453357627317 | 1.45235647300503E7 | 144743.48405262225 | 209.81522035476976 | 2.1450449484564924 | 134865.99995119465 | 1.9561605087188363 | 0.09616514922644583 | 861.1206108534275 | null | 27.591030251926593 | 5.716608354719421E8 | 1525.663259783739 | 9.09237871864885 | 6.230404954288745 | 4.248965298848258 | 20089.06946024988 | 19.988201128228035 | 120.99438607360678 | 4.172554929946806 | 10.429062017422085 | 13.415141793587788 | 31.83056463184429 | 2.4310237026014865 | 7.529274919111957 | 0.15483301083721898 |
min | AFG | Africa | Afghanistan | 2020-01-01 | 1.0 | -10034.0 | -525.0 | 1.0 | -1918.0 | -232.143 | 0.001 | -2153.437 | -276.825 | 0.001 | -76.445 | -10.921 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 1.0 | -3743.0 | 0.0 | -0.398 | 0.0 | 0.0 | 0.0 | 1.4 | null | 0.0 | 809.0 | 1.98 | 15.1 | 1.144 | 0.526 | 661.24 | 0.1 | 79.37 | 0.99 | 0.1 | 7.7 | 1.188 | 0.1 | 53.28 | 0.354 |
max | ZWE | South America | Zimbabwe | 2020-12-03 | 6.5220557E7 | 690127.0 | 600775.429 | 1506251.0 | 12825.0 | 10538.571 | 89354.818 | 8652.658 | 2648.773 | 1469.678 | 218.329 | 63.14 | 6.68 | 19396.0 | 127.183 | 100226.0 | 988.567 | 4375.407 | 190.051 | 50887.393 | 2645.194 | 1.92122147E8 | 1949384.0 | 2248.144 | 26.154 | 1703410.0 | 19.244 | 0.729 | 44258.7 | null | 100.0 | 7.794798729E9 | 19347.5 | 48.2 | 27.049 | 18.493 | 116935.6 | 77.6 | 724.417 | 30.53 | 44.0 | 78.1 | 98.999 | 13.8 | 86.75 | 0.953 |
display(parquetFileDF.orderBy($"date".desc))
parquetFileDF.count()
res5: Long = 62500
Query to File (2021)
query that saves file into a parquet file at periodic intervalls.
// remove any previous folders if exists
dbutils.fs.rm("datasets/group12/chkpoint2021",recurse=true)
dbutils.fs.rm("datasets/group12/analysis2021",recurse=true)
dbutils.fs.mkdirs("datasets/group12/chkpoint2021")
dbutils.fs.mkdirs("datasets/group12/analysis2021")
res18: Boolean = true
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
val query = OurWorldinDataStream2021
.select($"iso_code", $"continent", $"location", $"date", $"total_cases", $"new_cases", $"new_cases_smoothed", $"total_deaths", $"new_deaths",$"new_deaths_smoothed", $"total_cases_per_million", $"new_cases_per_million", $"new_cases_smoothed_per_million", $"total_deaths_per_million", $"new_deaths_per_million", $"new_deaths_smoothed_per_million", $"reproduction_rate", $"icu_patients", $"icu_patients_per_million", $"hosp_patients", $"hosp_patients_per_million", $"weekly_icu_admissions", $"weekly_icu_admissions_per_million", $"weekly_hosp_admissions", $"weekly_hosp_admissions_per_million", $"total_tests",$"new_tests", $"total_tests_per_thousand", $"new_tests_per_thousand", $"new_tests_smoothed",$"new_tests_smoothed_per_thousand", $"tests_per_case", $"positive_rate", $"tests_units", $"stringency_index", $"population", $"population_density", $"median_age", $"aged_65_older", $"aged_70_older", $"gdp_per_capita", $"extreme_poverty", $"cardiovasc_death_rate", $"diabetes_prevalence", $"female_smokers", $"male_smokers", $"handwashing_facilities", $"hospital_beds_per_thousand", $"life_expectancy", $"human_development_index")
.writeStream
//.trigger(Trigger.ProcessingTime("20 seconds")) // debugging
.trigger(Trigger.ProcessingTime("216000 seconds")) // each day
.option("checkpointLocation", "/datasets/group12/chkpoint2021")
.format("parquet")
.option("path", "/datasets/group12/analysis2021")
.start()
query.awaitTermination() // hit cancel to terminate
val parquetFile2021DF = spark.read.parquet("dbfs:/datasets/group12/analysis2021/*.parquet")
parquetFile2021DF: org.apache.spark.sql.DataFrame = [iso_code: string, continent: string ... 48 more fields]
display(parquetFile2021DF.describe())
summary | iso_code | continent | location | date | total_cases | new_cases | new_cases_smoothed | total_deaths | new_deaths | new_deaths_smoothed | total_cases_per_million | new_cases_per_million | new_cases_smoothed_per_million | total_deaths_per_million | new_deaths_per_million | new_deaths_smoothed_per_million | reproduction_rate | icu_patients | icu_patients_per_million | hosp_patients | hosp_patients_per_million | weekly_icu_admissions | weekly_icu_admissions_per_million | weekly_hosp_admissions | weekly_hosp_admissions_per_million | total_tests | new_tests | total_tests_per_thousand | new_tests_per_thousand | new_tests_smoothed | new_tests_smoothed_per_thousand | tests_per_case | positive_rate | tests_units | stringency_index | population | population_density | median_age | aged_65_older | aged_70_older | gdp_per_capita | extreme_poverty | cardiovasc_death_rate | diabetes_prevalence | female_smokers | male_smokers | handwashing_facilities | hospital_beds_per_thousand | life_expectancy | human_development_index |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
count | 58196 | 57845 | 58531 | 58531 | 58008 | 58001 | 57046 | 49657 | 49656 | 57046 | 57673 | 57666 | 56716 | 49335 | 49334 | 56716 | 44995 | 6086 | 6086 | 6845 | 6845 | 544 | 544 | 878 | 878 | 27006 | 27145 | 27006 | 27145 | 30478 | 30478 | 28331 | 28797 | 0 | 241 | 278 | 52358 | 58196 | 56964 | 55692 | 55044 | 55376 | 55710 | 38159 | 56294 | 56942 | 44455 | 43846 | 28176 | 51737 |
mean | null | null | null | null | 280856.96293614677 | 2977.974862502371 | 2957.9528833748072 | 9440.825059910989 | 75.87175769292735 | 64.76055386880755 | 4803.329924592101 | 55.43429124614151 | 54.92539389237602 | 123.92782108036903 | 1.1725306887744764 | 0.9930127300938001 | 1.0397301922435842 | 924.7643772592836 | 17.942194544857045 | 4767.396055514974 | 112.35477647918188 | 389.18459375 | 23.096417279411764 | 2770.236054669703 | 90.94211161731207 | 3296570.453047471 | 33334.96798673789 | 113.1671816263053 | 1.1168154724627004 | 32438.389395629634 | 1.0890064636787196 | 166.36620662878127 | 0.08254411917908085 | null | 0.9067634854771786 | 849.6284532374101 | 59.32707265365355 | 9.113860612012853E7 | 323.8082390457141 | 30.631115779645334 | 8.842210231814642 | 5.6059546915631415 | 19206.748390809662 | 13.14487276920222 | 256.0564316090542 | 7.763601383864184 | 10.57440184456196 | 32.64338559959862 | 51.16526313174358 | 3.042046311150612 |
stddev | null | null | null | null | 2748559.5139726754 | 26447.176041635634 | 25947.18346899017 | 74852.02463395565 | 554.179232738698 | 496.3241880037069 | 9770.196467368594 | 159.8367692227782 | 130.23859384700643 | 231.3831543436838 | 3.814136613643414 | 2.592627124113058 | 0.37314466501106797 | 2607.447792980141 | 23.096588581017723 | 13142.595210484165 | 162.19963589630862 | 1062.8669629409615 | 84.43280553001314 | 6258.389649510214 | 237.96388901304604 | 1.5192379103534836E7 | 135667.97569510888 | 232.08417747765577 | 2.101753078690164 | 126446.58519497044 | 1.9266264256088346 | 842.7776724543986 | 0.09966680167946733 | null | 2.447811842168661 | 2276.8853139301373 | 22.403193236864983 | 6.204846769231012E8 | 1577.4914969226447 | 9.117030300574706 | 6.256325795927324 | 4.27334121579429 | 19680.321466247937 | 19.8684806478708 | 117.99197986122954 | 3.878859778205247 | 10.419107338482048 | 13.45549336530331 | 31.775955000809844 | 2.473749904640983 |
min | AFG | Africa | Afghanistan | 2020-01-01 | 1.0 | -46076.0 | -1121.714 | 1.0 | -1918.0 | -232.143 | 0.001 | -2153.437 | -276.825 | 0.001 | -76.445 | -10.921 | -0.01 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 1.0 | -47510.0 | 0.0 | -0.786 | 0.0 | 0.0 | 1.6 | 0.0 | null | 0.0 | 1.57 | 0.0 | 809.0 | 1.98 | 15.1 | 1.144 | 0.526 | 661.24 | 0.1 | 79.37 | 0.99 | 0.1 | 7.7 | 1.188 | 0.1 |
max | ZWE | South America | Zimbabwe | 2021-01-06 | 8.718654E7 | 780613.0 | 646630.286 | 1883761.0 | 15525.0 | 11670.429 | 108043.746 | 8652.658 | 2648.773 | 1826.861 | 218.329 | 63.14 | 6.74 | 23707.0 | 127.183 | 132476.0 | 922.018 | 10301.189 | 888.829 | 50887.393 | 2645.194 | 2.4798603E8 | 2133928.0 | 2705.599 | 28.716 | 1840248.0 | 23.701 | 44258.7 | 0.636 | null | 17.14 | 14497.77 | 100.0 | 7.794798729E9 | 19347.5 | 48.2 | 27.049 | 18.493 | 116935.6 | 77.6 | 724.417 | 30.53 | 44.0 | 78.1 | 98.999 | 13.8 |