ScaDaMaLe Course site and book

Overview

Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. You can use the Dataset/DataFrame API in Scala, Java, Python or R to express streaming aggregations, event-time windows, stream-to-batch joins, etc. The computation is executed on the same optimized Spark SQL engine. Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write Ahead Logs. In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.

In this guide, we are going to walk you through the programming model and the APIs. First, let’s start with a simple example - a streaming word count.

Programming Model

The key idea in Structured Streaming is to treat a live data stream as a table that is being continuously appended. This leads to a new stream processing model that is very similar to a batch processing model. You will express your streaming computation as standard batch-like query as on a static table, and Spark runs it as an incremental query on the unbounded input table. Let’s understand this model in more detail.

Basic Concepts

Consider the input data stream as the “Input Table”. Every data item that is arriving on the stream is like a new row being appended to the Input Table.

Stream as a Table

A query on the input will generate the “Result Table”. Every trigger interval (say, every 1 second), new rows get appended to the Input Table, which eventually updates the Result Table. Whenever the result table gets updated, we would want to write the changed result rows to an external sink.

Model

The “Output” is defined as what gets written out to the external storage. The output can be defined in a different mode:

  • Complete Mode - The entire updated Result Table will be written to the external storage. It is up to the storage connector to decide how to handle writing of the entire table.

  • Append Mode - Only the new rows appended in the Result Table since the last trigger will be written to the external storage. This is applicable only on the queries where existing rows in the Result Table are not expected to change.

  • Update Mode - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (available since Spark 2.1.1). Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger. If the query doesn’t contain aggregations, it will be equivalent to Append mode.

Note that each mode is applicable on certain types of queries. This is discussed in detail later on output-modes. To illustrate the use of this model, let’s understand the model in context of the Quick Example above.

The first streamingLines DataFrame is the input table, and the final wordCounts DataFrame is the result table. Note that the query on streamingLines DataFrame to generate wordCounts is exactly the same as it would be a static DataFrame. However, when this query is started, Spark will continuously check for new data from the directory. If there is new data, Spark will run an “incremental” query that combines the previous running counts with the new data to compute updated counts, as shown below.

Model

This model is significantly different from many other stream processing engines. Many streaming systems require the user to maintain running aggregations themselves, thus having to reason about fault-tolerance, and data consistency (at-least-once, or at-most-once, or exactly-once). In this model, Spark is responsible for updating the Result Table when there is new data, thus relieving the users from reasoning about it. As an example, let’s see how this model handles event-time based processing and late arriving data.

Quick Example

Let’s say you want to maintain a running word count of text data received from a file writer that is writing files into a directory datasets/streamingFiles in the distributed file system. Let’s see how you can express this using Structured Streaming.

Let’s walk through the example step-by-step and understand how it works.

First we need to start a file writing job in the companion notebook 037a_AnimalNamesStructStreamingFiles and then return here.

display(dbutils.fs.ls("/datasets/streamingFiles"))
path name size
dbfs:/datasets/streamingFiles/20_10.log 20_10.log 35.0
dbfs:/datasets/streamingFiles/20_12.log 20_12.log 35.0
dbfs:/datasets/streamingFiles/20_14.log 20_14.log 35.0
dbfs:/datasets/streamingFiles/20_16.log 20_16.log 35.0
dbfs:/datasets/streamingFiles/20_18.log 20_18.log 35.0
dbfs:/datasets/streamingFiles/20_20.log 20_20.log 35.0
dbfs:/datasets/streamingFiles/20_22.log 20_22.log 35.0
dbfs:/datasets/streamingFiles/20_24.log 20_24.log 35.0
dbfs:/datasets/streamingFiles/20_26.log 20_26.log 35.0
dbfs:/datasets/streamingFiles/20_28.log 20_28.log 35.0
dbfs:/datasets/streamingFiles/20_30.log 20_30.log 35.0
dbfs:/datasets/streamingFiles/20_32.log 20_32.log 35.0
dbfs:/datasets/streamingFiles/20_34.log 20_34.log 35.0
dbfs:/datasets/streamingFiles/20_36.log 20_36.log 35.0
dbfs:/datasets/streamingFiles/20_38.log 20_38.log 35.0
dbfs:/datasets/streamingFiles/20_40.log 20_40.log 35.0
dbfs:/datasets/streamingFiles/20_42.log 20_42.log 35.0
dbfs:/datasets/streamingFiles/20_44.log 20_44.log 35.0
dbfs:/datasets/streamingFiles/20_46.log 20_46.log 35.0
dbfs:/datasets/streamingFiles/20_47.log 20_47.log 35.0
dbfs:/datasets/streamingFiles/20_48.log 20_48.log 35.0
dbfs:/datasets/streamingFiles/20_49.log 20_49.log 35.0
dbfs:/datasets/streamingFiles/20_50.log 20_50.log 35.0
dbfs:/datasets/streamingFiles/20_51.log 20_51.log 35.0
dbfs:/datasets/streamingFiles/20_52.log 20_52.log 35.0
dbfs:/datasets/streamingFiles/20_53.log 20_53.log 35.0
dbfs:/datasets/streamingFiles/20_54.log 20_54.log 35.0
dbfs:/datasets/streamingFiles/20_55.log 20_55.log 35.0
dbfs:/datasets/streamingFiles/20_56.log 20_56.log 35.0
dbfs:/datasets/streamingFiles/20_57.log 20_57.log 35.0
dbfs:/datasets/streamingFiles/20_58.log 20_58.log 35.0
dbfs:/datasets/streamingFiles/20_59.log 20_59.log 35.0
dbfs:/datasets/streamingFiles/21_00.log 21_00.log 35.0
dbfs:/datasets/streamingFiles/21_01.log 21_01.log 35.0
dbfs:/datasets/streamingFiles/21_02.log 21_02.log 35.0
dbfs:/datasets/streamingFiles/21_03.log 21_03.log 35.0
dbfs:/datasets/streamingFiles/21_04.log 21_04.log 35.0
dbfs:/datasets/streamingFiles/21_05.log 21_05.log 35.0
dbfs:/datasets/streamingFiles/21_06.log 21_06.log 35.0
dbfs:/datasets/streamingFiles/21_07.log 21_07.log 35.0
dbfs:/datasets/streamingFiles/21_08.log 21_08.log 35.0
dbfs:/datasets/streamingFiles/21_09.log 21_09.log 35.0
dbfs:/datasets/streamingFiles/21_10.log 21_10.log 35.0
dbfs:/datasets/streamingFiles/21_11.log 21_11.log 35.0
dbfs:/datasets/streamingFiles/21_12.log 21_12.log 35.0
dbfs:/datasets/streamingFiles/21_13.log 21_13.log 35.0
dbfs:/datasets/streamingFiles/21_14.log 21_14.log 35.0
dbfs:/datasets/streamingFiles/21_15.log 21_15.log 35.0
dbfs:/datasets/streamingFiles/21_16.log 21_16.log 35.0
dbfs:/datasets/streamingFiles/21_17.log 21_17.log 35.0
dbfs:/datasets/streamingFiles/21_18.log 21_18.log 35.0
dbfs:/datasets/streamingFiles/21_19.log 21_19.log 35.0
dbfs:/datasets/streamingFiles/21_20.log 21_20.log 35.0
dbfs:/datasets/streamingFiles/21_21.log 21_21.log 35.0
dbfs:/datasets/streamingFiles/21_22.log 21_22.log 35.0
dbfs:/datasets/streamingFiles/21_23.log 21_23.log 35.0
dbfs:/datasets/streamingFiles/21_24.log 21_24.log 35.0
dbfs:/datasets/streamingFiles/21_25.log 21_25.log 35.0
dbfs:/datasets/streamingFiles/21_26.log 21_26.log 35.0
dbfs:/datasets/streamingFiles/21_27.log 21_27.log 35.0
dbfs:/datasets/streamingFiles/21_28.log 21_28.log 35.0
dbfs:/datasets/streamingFiles/21_29.log 21_29.log 35.0
dbfs:/datasets/streamingFiles/21_30.log 21_30.log 35.0
dbfs:/datasets/streamingFiles/21_31.log 21_31.log 35.0
dbfs:/datasets/streamingFiles/21_32.log 21_32.log 35.0
dbfs:/datasets/streamingFiles/21_33.log 21_33.log 35.0
dbfs:/datasets/streamingFiles/21_34.log 21_34.log 35.0
dbfs:/datasets/streamingFiles/21_35.log 21_35.log 35.0
dbfs:/datasets/streamingFiles/21_36.log 21_36.log 35.0
dbfs:/datasets/streamingFiles/21_37.log 21_37.log 35.0
dbfs:/datasets/streamingFiles/21_38.log 21_38.log 35.0
dbfs:/datasets/streamingFiles/21_39.log 21_39.log 35.0
dbfs:/datasets/streamingFiles/21_40.log 21_40.log 35.0
dbfs:/datasets/streamingFiles/21_41.log 21_41.log 35.0
dbfs:/datasets/streamingFiles/21_42.log 21_42.log 35.0
dbfs:/datasets/streamingFiles/21_43.log 21_43.log 35.0
dbfs:/datasets/streamingFiles/21_44.log 21_44.log 35.0
dbfs:/datasets/streamingFiles/21_45.log 21_45.log 35.0
dbfs:/datasets/streamingFiles/21_46.log 21_46.log 35.0
dbfs:/datasets/streamingFiles/21_47.log 21_47.log 35.0
dbfs:/datasets/streamingFiles/21_48.log 21_48.log 35.0
dbfs:/datasets/streamingFiles/21_49.log 21_49.log 35.0
dbfs:/datasets/streamingFiles/21_50.log 21_50.log 35.0
dbfs:/datasets/streamingFiles/21_51.log 21_51.log 35.0
dbfs:/datasets/streamingFiles/21_52.log 21_52.log 35.0
dbfs:/datasets/streamingFiles/21_53.log 21_53.log 35.0
dbfs:/datasets/streamingFiles/21_54.log 21_54.log 35.0
dbfs:/datasets/streamingFiles/21_55.log 21_55.log 35.0
dbfs:/datasets/streamingFiles/21_56.log 21_56.log 35.0
dbfs:/datasets/streamingFiles/21_57.log 21_57.log 35.0
dbfs:/datasets/streamingFiles/21_58.log 21_58.log 35.0
dbfs:/datasets/streamingFiles/21_59.log 21_59.log 35.0
dbfs:/datasets/streamingFiles/22_00.log 22_00.log 35.0
dbfs:/datasets/streamingFiles/22_01.log 22_01.log 35.0
dbfs:/datasets/streamingFiles/22_02.log 22_02.log 35.0
dbfs:/datasets/streamingFiles/22_03.log 22_03.log 35.0
dbfs:/datasets/streamingFiles/22_04.log 22_04.log 35.0
dbfs:/datasets/streamingFiles/22_05.log 22_05.log 35.0
dbfs:/datasets/streamingFiles/22_06.log 22_06.log 35.0
dbfs:/datasets/streamingFiles/22_07.log 22_07.log 35.0
dbfs:/datasets/streamingFiles/22_08.log 22_08.log 35.0
dbfs:/datasets/streamingFiles/22_09.log 22_09.log 35.0
dbfs:/datasets/streamingFiles/22_10.log 22_10.log 35.0
dbfs:/datasets/streamingFiles/22_11.log 22_11.log 35.0
dbfs:/datasets/streamingFiles/22_12.log 22_12.log 35.0
dbfs:/datasets/streamingFiles/22_13.log 22_13.log 35.0
dbfs:/datasets/streamingFiles/22_14.log 22_14.log 35.0
dbfs:/datasets/streamingFiles/22_15.log 22_15.log 35.0
dbfs:/datasets/streamingFiles/22_16.log 22_16.log 35.0
dbfs:/datasets/streamingFiles/22_17.log 22_17.log 35.0
dbfs:/datasets/streamingFiles/22_18.log 22_18.log 35.0
dbfs:/datasets/streamingFiles/22_19.log 22_19.log 35.0
dbfs:/datasets/streamingFiles/22_20.log 22_20.log 35.0
dbfs:/datasets/streamingFiles/22_21.log 22_21.log 35.0
dbfs:/datasets/streamingFiles/22_22.log 22_22.log 35.0
dbfs:/datasets/streamingFiles/22_23.log 22_23.log 35.0
dbfs:/datasets/streamingFiles/22_24.log 22_24.log 35.0
dbfs:/datasets/streamingFiles/22_25.log 22_25.log 35.0
dbfs:/datasets/streamingFiles/22_26.log 22_26.log 35.0
dbfs:/datasets/streamingFiles/22_27.log 22_27.log 35.0
dbfs:/datasets/streamingFiles/22_28.log 22_28.log 35.0
dbfs:/datasets/streamingFiles/22_29.log 22_29.log 35.0
dbfs:/datasets/streamingFiles/22_30.log 22_30.log 35.0
dbfs:/datasets/streamingFiles/22_31.log 22_31.log 35.0
dbfs:/datasets/streamingFiles/22_32.log 22_32.log 35.0
dbfs:/datasets/streamingFiles/22_33.log 22_33.log 35.0
dbfs:/datasets/streamingFiles/22_34.log 22_34.log 35.0
dbfs:/datasets/streamingFiles/22_35.log 22_35.log 35.0
dbfs:/datasets/streamingFiles/22_36.log 22_36.log 35.0
dbfs:/datasets/streamingFiles/22_37.log 22_37.log 35.0
dbfs:/datasets/streamingFiles/22_38.log 22_38.log 35.0
dbfs:/datasets/streamingFiles/22_39.log 22_39.log 35.0
dbfs:/datasets/streamingFiles/22_40.log 22_40.log 35.0
dbfs:/datasets/streamingFiles/22_41.log 22_41.log 35.0
dbfs:/datasets/streamingFiles/22_42.log 22_42.log 35.0
dbfs:/datasets/streamingFiles/22_43.log 22_43.log 35.0
dbfs:/datasets/streamingFiles/22_44.log 22_44.log 35.0
dbfs:/datasets/streamingFiles/22_45.log 22_45.log 35.0
dbfs:/datasets/streamingFiles/22_46.log 22_46.log 35.0
dbfs:/datasets/streamingFiles/22_47.log 22_47.log 35.0
dbfs:/datasets/streamingFiles/22_48.log 22_48.log 35.0
dbfs:/datasets/streamingFiles/22_49.log 22_49.log 35.0
dbfs:/datasets/streamingFiles/22_50.log 22_50.log 35.0
dbfs:/datasets/streamingFiles/22_51.log 22_51.log 35.0
dbfs:/datasets/streamingFiles/22_52.log 22_52.log 35.0
dbfs:/datasets/streamingFiles/22_53.log 22_53.log 35.0
dbfs:/datasets/streamingFiles/22_54.log 22_54.log 35.0
dbfs:/datasets/streamingFiles/22_55.log 22_55.log 35.0
dbfs:/datasets/streamingFiles/22_56.log 22_56.log 35.0
dbfs:/datasets/streamingFiles/22_57.log 22_57.log 35.0
dbfs:/datasets/streamingFiles/22_58.log 22_58.log 35.0
dbfs:/datasets/streamingFiles/22_59.log 22_59.log 35.0
dbfs:/datasets/streamingFiles/23_00.log 23_00.log 35.0
dbfs:/datasets/streamingFiles/23_01.log 23_01.log 35.0
dbfs:/datasets/streamingFiles/23_02.log 23_02.log 35.0
dbfs:/datasets/streamingFiles/23_03.log 23_03.log 35.0
dbfs:/datasets/streamingFiles/23_04.log 23_04.log 35.0
dbfs:/datasets/streamingFiles/23_05.log 23_05.log 35.0
dbfs:/datasets/streamingFiles/23_06.log 23_06.log 35.0
dbfs:/datasets/streamingFiles/23_07.log 23_07.log 35.0
dbfs:/datasets/streamingFiles/23_08.log 23_08.log 35.0
dbfs:/datasets/streamingFiles/23_09.log 23_09.log 35.0
dbfs:/datasets/streamingFiles/23_10.log 23_10.log 35.0
dbfs:/datasets/streamingFiles/23_11.log 23_11.log 35.0
dbfs:/datasets/streamingFiles/23_12.log 23_12.log 35.0
dbfs:/datasets/streamingFiles/23_13.log 23_13.log 35.0
dbfs:/datasets/streamingFiles/23_14.log 23_14.log 35.0
dbfs:/datasets/streamingFiles/23_15.log 23_15.log 35.0
dbfs:/datasets/streamingFiles/23_16.log 23_16.log 35.0
dbfs:/datasets/streamingFiles/23_17.log 23_17.log 35.0
dbfs:/datasets/streamingFiles/23_18.log 23_18.log 35.0
dbfs:/datasets/streamingFiles/23_19.log 23_19.log 35.0
dbfs:/datasets/streamingFiles/23_20.log 23_20.log 35.0
dbfs:/datasets/streamingFiles/23_21.log 23_21.log 35.0
dbfs:/datasets/streamingFiles/23_22.log 23_22.log 35.0
dbfs:/datasets/streamingFiles/23_23.log 23_23.log 35.0
dbfs:/datasets/streamingFiles/23_24.log 23_24.log 35.0
dbfs:/datasets/streamingFiles/23_25.log 23_25.log 35.0
dbfs:/datasets/streamingFiles/23_26.log 23_26.log 35.0
dbfs:/datasets/streamingFiles/23_27.log 23_27.log 35.0
dbfs:/datasets/streamingFiles/23_28.log 23_28.log 35.0
dbfs:/datasets/streamingFiles/23_29.log 23_29.log 35.0
dbfs:/datasets/streamingFiles/23_30.log 23_30.log 35.0
dbfs:/datasets/streamingFiles/23_31.log 23_31.log 35.0
dbfs:/datasets/streamingFiles/23_32.log 23_32.log 35.0
dbfs:/datasets/streamingFiles/23_33.log 23_33.log 35.0
dbfs:/datasets/streamingFiles/23_34.log 23_34.log 35.0
dbfs:/datasets/streamingFiles/23_35.log 23_35.log 35.0
dbfs:/datasets/streamingFiles/23_36.log 23_36.log 35.0
dbfs:/datasets/streamingFiles/23_37.log 23_37.log 35.0
dbfs:/datasets/streamingFiles/23_38.log 23_38.log 35.0
dbfs:/datasets/streamingFiles/23_39.log 23_39.log 35.0
dbfs:/datasets/streamingFiles/23_40.log 23_40.log 35.0
dbfs:/datasets/streamingFiles/23_41.log 23_41.log 35.0
dbfs:/datasets/streamingFiles/23_42.log 23_42.log 35.0
dbfs:/datasets/streamingFiles/23_43.log 23_43.log 35.0
dbfs:/datasets/streamingFiles/23_44.log 23_44.log 35.0
dbfs:/datasets/streamingFiles/23_45.log 23_45.log 35.0
dbfs:/datasets/streamingFiles/23_46.log 23_46.log 35.0
dbfs:/datasets/streamingFiles/23_47.log 23_47.log 35.0
dbfs:/datasets/streamingFiles/23_48.log 23_48.log 35.0
dbfs:/datasets/streamingFiles/23_49.log 23_49.log 35.0
dbfs:/datasets/streamingFiles/23_50.log 23_50.log 35.0
dbfs:/datasets/streamingFiles/23_51.log 23_51.log 35.0
dbfs:/datasets/streamingFiles/23_52.log 23_52.log 35.0
dbfs:/datasets/streamingFiles/23_53.log 23_53.log 35.0
dbfs:/datasets/streamingFiles/23_54.log 23_54.log 35.0
dbfs:/datasets/streamingFiles/23_55.log 23_55.log 35.0
dbfs:/datasets/streamingFiles/23_56.log 23_56.log 35.0
dbfs:/datasets/streamingFiles/23_57.log 23_57.log 35.0
dbfs:/datasets/streamingFiles/23_58.log 23_58.log 35.0
dbfs:/datasets/streamingFiles/23_59.log 23_59.log 35.0
dbfs:/datasets/streamingFiles/24_00.log 24_00.log 35.0
dbfs:/datasets/streamingFiles/24_01.log 24_01.log 35.0
dbfs:/datasets/streamingFiles/24_02.log 24_02.log 35.0
dbfs:/datasets/streamingFiles/24_03.log 24_03.log 35.0
dbfs:/datasets/streamingFiles/24_04.log 24_04.log 35.0
dbfs:/datasets/streamingFiles/24_05.log 24_05.log 35.0
dbfs:/datasets/streamingFiles/24_06.log 24_06.log 35.0
dbfs:/datasets/streamingFiles/24_07.log 24_07.log 35.0
dbfs:/datasets/streamingFiles/24_08.log 24_08.log 35.0
dbfs:/datasets/streamingFiles/24_09.log 24_09.log 35.0
dbfs:/datasets/streamingFiles/24_10.log 24_10.log 35.0
dbfs:/datasets/streamingFiles/24_11.log 24_11.log 35.0
dbfs:/datasets/streamingFiles/24_12.log 24_12.log 35.0
dbfs:/datasets/streamingFiles/24_13.log 24_13.log 35.0
dbfs:/datasets/streamingFiles/24_14.log 24_14.log 35.0
dbfs:/datasets/streamingFiles/24_15.log 24_15.log 35.0
dbfs:/datasets/streamingFiles/24_16.log 24_16.log 35.0
dbfs:/datasets/streamingFiles/24_17.log 24_17.log 35.0
dbfs:/datasets/streamingFiles/24_18.log 24_18.log 35.0
dbfs:/datasets/streamingFiles/24_19.log 24_19.log 35.0
dbfs:/datasets/streamingFiles/24_20.log 24_20.log 35.0
dbfs:/datasets/streamingFiles/24_21.log 24_21.log 35.0
dbfs:/datasets/streamingFiles/24_22.log 24_22.log 35.0
dbfs:/datasets/streamingFiles/24_23.log 24_23.log 35.0
dbfs:/datasets/streamingFiles/24_24.log 24_24.log 35.0
dbfs:/datasets/streamingFiles/24_25.log 24_25.log 35.0
dbfs:/datasets/streamingFiles/24_27.log 24_27.log 35.0
dbfs:/datasets/streamingFiles/24_29.log 24_29.log 35.0
dbfs:/datasets/streamingFiles/24_31.log 24_31.log 35.0
dbfs:/datasets/streamingFiles/24_33.log 24_33.log 35.0
dbfs:/datasets/streamingFiles/24_35.log 24_35.log 35.0
dbfs:/datasets/streamingFiles/24_37.log 24_37.log 35.0
dbfs:/datasets/streamingFiles/24_39.log 24_39.log 35.0
dbfs:/datasets/streamingFiles/24_41.log 24_41.log 35.0
dbfs:/datasets/streamingFiles/24_43.log 24_43.log 35.0
dbfs:/datasets/streamingFiles/24_45.log 24_45.log 35.0
dbfs:/datasets/streamingFiles/24_47.log 24_47.log 35.0
dbfs:/datasets/streamingFiles/24_49.log 24_49.log 35.0
dbfs:/datasets/streamingFiles/24_50.log 24_50.log 35.0
dbfs:/datasets/streamingFiles/24_51.log 24_51.log 35.0
dbfs:/datasets/streamingFiles/24_53.log 24_53.log 35.0
dbfs:/datasets/streamingFiles/24_55.log 24_55.log 35.0
dbfs:/datasets/streamingFiles/24_57.log 24_57.log 35.0
dbfs:/datasets/streamingFiles/24_59.log 24_59.log 35.0
dbfs:/datasets/streamingFiles/25_01.log 25_01.log 35.0
dbfs:/datasets/streamingFiles/25_03.log 25_03.log 35.0
dbfs:/datasets/streamingFiles/25_05.log 25_05.log 35.0
dbfs:/datasets/streamingFiles/25_07.log 25_07.log 35.0
dbfs:/datasets/streamingFiles/25_09.log 25_09.log 35.0
dbfs:/datasets/streamingFiles/25_11.log 25_11.log 35.0
dbfs:/datasets/streamingFiles/25_13.log 25_13.log 35.0
dbfs:/datasets/streamingFiles/25_15.log 25_15.log 35.0
dbfs:/datasets/streamingFiles/25_17.log 25_17.log 35.0
dbfs:/datasets/streamingFiles/25_19.log 25_19.log 35.0
dbfs:/datasets/streamingFiles/25_21.log 25_21.log 35.0
dbfs:/datasets/streamingFiles/25_23.log 25_23.log 35.0
dbfs:/datasets/streamingFiles/25_25.log 25_25.log 35.0
dbfs:/datasets/streamingFiles/25_27.log 25_27.log 35.0
dbfs:/datasets/streamingFiles/25_29.log 25_29.log 35.0
dbfs:/datasets/streamingFiles/25_31.log 25_31.log 35.0
dbfs:/datasets/streamingFiles/25_33.log 25_33.log 35.0
dbfs:/datasets/streamingFiles/25_35.log 25_35.log 35.0
dbfs:/datasets/streamingFiles/25_37.log 25_37.log 35.0
dbfs:/datasets/streamingFiles/25_39.log 25_39.log 35.0
dbfs:/datasets/streamingFiles/25_41.log 25_41.log 35.0
dbfs:/datasets/streamingFiles/25_43.log 25_43.log 35.0
dbfs:/datasets/streamingFiles/25_45.log 25_45.log 35.0
dbfs:/datasets/streamingFiles/25_47.log 25_47.log 35.0
dbfs:/datasets/streamingFiles/25_49.log 25_49.log 35.0
dbfs:/datasets/streamingFiles/25_51.log 25_51.log 35.0
dbfs:/datasets/streamingFiles/25_53.log 25_53.log 35.0
dbfs:/datasets/streamingFiles/25_55.log 25_55.log 35.0
dbfs:/datasets/streamingFiles/25_57.log 25_57.log 35.0
dbfs:/datasets/streamingFiles/25_59.log 25_59.log 35.0
dbfs:/datasets/streamingFiles/26_01.log 26_01.log 35.0
dbfs:/datasets/streamingFiles/26_03.log 26_03.log 35.0
dbfs:/datasets/streamingFiles/26_05.log 26_05.log 35.0
dbfs:/datasets/streamingFiles/26_07.log 26_07.log 35.0
dbfs:/datasets/streamingFiles/26_09.log 26_09.log 35.0
dbfs:/datasets/streamingFiles/26_11.log 26_11.log 35.0
dbfs:/datasets/streamingFiles/26_13.log 26_13.log 35.0
dbfs:/datasets/streamingFiles/26_15.log 26_15.log 35.0
dbfs:/datasets/streamingFiles/26_17.log 26_17.log 35.0
dbfs:/datasets/streamingFiles/26_19.log 26_19.log 35.0
dbfs:/datasets/streamingFiles/26_21.log 26_21.log 35.0
dbfs:/datasets/streamingFiles/26_23.log 26_23.log 35.0
dbfs:/datasets/streamingFiles/26_25.log 26_25.log 35.0
dbfs:/datasets/streamingFiles/26_27.log 26_27.log 35.0
dbfs:/datasets/streamingFiles/26_29.log 26_29.log 35.0
dbfs:/datasets/streamingFiles/26_31.log 26_31.log 35.0
dbfs:/datasets/streamingFiles/26_33.log 26_33.log 35.0
dbfs:/datasets/streamingFiles/26_35.log 26_35.log 35.0
dbfs:/datasets/streamingFiles/26_37.log 26_37.log 35.0
dbfs:/datasets/streamingFiles/26_39.log 26_39.log 35.0
dbfs:/datasets/streamingFiles/26_41.log 26_41.log 35.0
dbfs:/datasets/streamingFiles/26_43.log 26_43.log 35.0
dbfs:/datasets/streamingFiles/26_45.log 26_45.log 35.0
dbfs:/datasets/streamingFiles/26_47.log 26_47.log 35.0
dbfs:/datasets/streamingFiles/26_49.log 26_49.log 35.0
dbfs:/datasets/streamingFiles/26_51.log 26_51.log 35.0
dbfs:/datasets/streamingFiles/26_53.log 26_53.log 35.0
dbfs:/datasets/streamingFiles/26_55.log 26_55.log 35.0
dbfs:/datasets/streamingFiles/26_57.log 26_57.log 35.0
dbfs:/datasets/streamingFiles/26_59.log 26_59.log 35.0
dbfs:/datasets/streamingFiles/27_01.log 27_01.log 35.0
dbfs:/datasets/streamingFiles/27_03.log 27_03.log 35.0
dbfs:/datasets/streamingFiles/27_05.log 27_05.log 35.0
dbfs:/datasets/streamingFiles/27_07.log 27_07.log 35.0
dbfs:/datasets/streamingFiles/27_09.log 27_09.log 35.0
dbfs:/datasets/streamingFiles/27_11.log 27_11.log 35.0
dbfs:/datasets/streamingFiles/27_13.log 27_13.log 35.0
dbfs:/datasets/streamingFiles/27_15.log 27_15.log 35.0
dbfs:/datasets/streamingFiles/27_17.log 27_17.log 35.0
dbfs:/datasets/streamingFiles/27_19.log 27_19.log 35.0
dbfs:/datasets/streamingFiles/27_21.log 27_21.log 35.0
dbfs:/datasets/streamingFiles/27_23.log 27_23.log 35.0
dbfs:/datasets/streamingFiles/27_25.log 27_25.log 35.0
dbfs:/datasets/streamingFiles/27_27.log 27_27.log 35.0
dbfs:/datasets/streamingFiles/27_29.log 27_29.log 35.0
dbfs:/datasets/streamingFiles/27_31.log 27_31.log 35.0
dbfs:/datasets/streamingFiles/27_33.log 27_33.log 35.0
dbfs:/datasets/streamingFiles/27_35.log 27_35.log 35.0
dbfs:/datasets/streamingFiles/27_37.log 27_37.log 35.0
dbfs:/datasets/streamingFiles/27_39.log 27_39.log 35.0
dbfs:/datasets/streamingFiles/27_41.log 27_41.log 35.0
dbfs:/datasets/streamingFiles/27_43.log 27_43.log 35.0
dbfs:/datasets/streamingFiles/27_45.log 27_45.log 35.0
dbfs:/datasets/streamingFiles/27_47.log 27_47.log 35.0
dbfs:/datasets/streamingFiles/27_50.log 27_50.log 35.0
dbfs:/datasets/streamingFiles/27_52.log 27_52.log 35.0
dbfs:/datasets/streamingFiles/27_54.log 27_54.log 35.0
dbfs:/datasets/streamingFiles/27_56.log 27_56.log 35.0
dbfs:/datasets/streamingFiles/27_58.log 27_58.log 35.0
dbfs:/datasets/streamingFiles/28_00.log 28_00.log 35.0
dbfs:/datasets/streamingFiles/28_02.log 28_02.log 35.0
dbfs:/datasets/streamingFiles/28_04.log 28_04.log 35.0
dbfs:/datasets/streamingFiles/28_06.log 28_06.log 35.0
dbfs:/datasets/streamingFiles/28_08.log 28_08.log 35.0
dbfs:/datasets/streamingFiles/28_10.log 28_10.log 35.0
dbfs:/datasets/streamingFiles/28_12.log 28_12.log 35.0
dbfs:/datasets/streamingFiles/28_14.log 28_14.log 35.0
dbfs:/datasets/streamingFiles/28_16.log 28_16.log 35.0
dbfs:/datasets/streamingFiles/28_18.log 28_18.log 35.0
dbfs:/datasets/streamingFiles/28_20.log 28_20.log 35.0
dbfs:/datasets/streamingFiles/28_22.log 28_22.log 35.0
dbfs:/datasets/streamingFiles/28_24.log 28_24.log 35.0
dbfs:/datasets/streamingFiles/28_26.log 28_26.log 35.0
dbfs:/datasets/streamingFiles/28_28.log 28_28.log 35.0
dbfs:/datasets/streamingFiles/28_30.log 28_30.log 35.0
dbfs:/datasets/streamingFiles/28_32.log 28_32.log 35.0
dbfs:/datasets/streamingFiles/28_34.log 28_34.log 35.0
dbfs:/datasets/streamingFiles/28_36.log 28_36.log 35.0
dbfs:/datasets/streamingFiles/28_38.log 28_38.log 35.0
dbfs:/datasets/streamingFiles/28_40.log 28_40.log 35.0
dbfs:/datasets/streamingFiles/28_42.log 28_42.log 35.0
dbfs:/datasets/streamingFiles/28_44.log 28_44.log 35.0
dbfs:/datasets/streamingFiles/28_46.log 28_46.log 35.0
dbfs:/datasets/streamingFiles/28_48.log 28_48.log 35.0
dbfs:/datasets/streamingFiles/28_50.log 28_50.log 35.0
dbfs:/datasets/streamingFiles/28_52.log 28_52.log 35.0
dbfs:/datasets/streamingFiles/28_54.log 28_54.log 35.0
dbfs:/datasets/streamingFiles/28_56.log 28_56.log 35.0
dbfs:/datasets/streamingFiles/28_58.log 28_58.log 35.0
dbfs:/datasets/streamingFiles/29_00.log 29_00.log 35.0
dbfs:/datasets/streamingFiles/29_02.log 29_02.log 35.0
dbfs:/datasets/streamingFiles/29_04.log 29_04.log 35.0
dbfs:/datasets/streamingFiles/29_06.log 29_06.log 35.0
dbfs:/datasets/streamingFiles/29_08.log 29_08.log 35.0
dbfs:/datasets/streamingFiles/29_10.log 29_10.log 35.0
dbfs:/datasets/streamingFiles/29_12.log 29_12.log 35.0
dbfs:/datasets/streamingFiles/29_14.log 29_14.log 35.0
dbfs:/datasets/streamingFiles/29_16.log 29_16.log 35.0
dbfs:/datasets/streamingFiles/29_28.log 29_28.log 35.0
dbutils.fs.head("/datasets/streamingFiles/20_16.log")
res1: String =
"2020-11-20 13:20:16+00:00; cat pig
"

Next, let’s create a streaming DataFrame that represents text data received from the directory, and transform the DataFrame to calculate word counts.

import org.apache.spark.sql.types._

// Create DataFrame representing the stream of input lines from files in distributed file store
//val textFileSchema = new StructType().add("line", "string") // for a custom schema

val streamingLines = spark
  .readStream
  //.schema(textFileSchema) // using default -> makes a column of String named value
  .option("MaxFilesPerTrigger", 1) //  maximum number of new files to be considered in every trigger (default: no max) 
  .format("text")
  .load("/datasets/streamingFiles")
import org.apache.spark.sql.types._
streamingLines: org.apache.spark.sql.DataFrame = [value: string]

This streamingLines DataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named “value”, and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it.

display(streamingLines)  // display will show you the contents of the DF
value
2020-11-20 13:26:26+00:00; bat owl
2020-11-20 13:28:23+00:00; owl cat
2020-11-20 13:29:31+00:00; dog rat
2020-11-20 13:30:19+00:00; cat dog
2020-11-20 13:28:01+00:00; bat cat
2020-11-20 13:30:11+00:00; cat rat
2020-11-20 13:22:07+00:00; cat dog
2020-11-20 13:22:48+00:00; bat rat
2020-11-20 13:27:22+00:00; owl bat
2020-11-20 13:29:15+00:00; dog cat
2020-11-20 13:29:47+00:00; dog cat
2020-11-20 13:30:09+00:00; cat owl
2020-11-20 13:21:45+00:00; bat rat
2020-11-20 13:22:30+00:00; rat pig
2020-11-20 13:22:42+00:00; bat cat
2020-11-20 13:24:50+00:00; dog bat
2020-11-20 13:25:14+00:00; bat rat
2020-11-20 13:21:25+00:00; dog owl
2020-11-20 13:22:17+00:00; rat owl
2020-11-20 13:27:57+00:00; cat pig
2020-11-20 13:28:19+00:00; pig bat
2020-11-20 13:29:13+00:00; cat pig
2020-11-20 13:30:07+00:00; pig dog
2020-11-20 13:26:04+00:00; dog bat
2020-11-20 13:26:58+00:00; rat bat
2020-11-20 13:29:49+00:00; dog pig
2020-11-20 13:24:22+00:00; cat owl
2020-11-20 13:29:35+00:00; dog owl
2020-11-20 13:30:01+00:00; owl rat
2020-11-20 13:22:32+00:00; dog rat
2020-11-20 13:24:12+00:00; bat dog
2020-11-20 13:25:26+00:00; owl dog
2020-11-20 13:28:13+00:00; owl dog
2020-11-20 13:23:12+00:00; pig owl
2020-11-20 13:24:00+00:00; owl dog
2020-11-20 13:25:22+00:00; bat pig
2020-11-20 13:22:34+00:00; bat dog
2020-11-20 13:25:36+00:00; owl bat
2020-11-20 13:23:04+00:00; owl bat
2020-11-20 13:22:54+00:00; cat dog
2020-11-20 13:23:30+00:00; owl dog
2020-11-20 13:24:58+00:00; pig bat
2020-11-20 13:22:01+00:00; dog rat
2020-11-20 13:23:16+00:00; pig cat
2020-11-20 13:20:53+00:00; pig cat
2020-11-20 13:21:21+00:00; pig owl
2020-11-20 13:24:30+00:00; owl dog
2020-11-20 13:29:07+00:00; rat pig
2020-11-20 13:21:39+00:00; rat cat
2020-11-20 13:27:59+00:00; bat rat
2020-11-20 13:26:42+00:00; bat pig
2020-11-20 13:29:09+00:00; pig cat
2020-11-20 13:25:08+00:00; rat bat
2020-11-20 13:22:58+00:00; cat pig
2020-11-20 13:29:23+00:00; rat pig
2020-11-20 13:21:27+00:00; cat dog
2020-11-20 13:22:50+00:00; rat cat
2020-11-20 13:25:42+00:00; bat owl
2020-11-20 13:28:21+00:00; cat rat
2020-11-20 13:22:52+00:00; dog rat
2020-11-20 13:22:15+00:00; pig owl
2020-11-20 13:22:46+00:00; pig rat
2020-11-20 13:27:32+00:00; pig cat
2020-11-20 13:21:13+00:00; owl cat
2020-11-20 13:20:49+00:00; owl dog
2020-11-20 13:21:37+00:00; pig bat
2020-11-20 13:24:56+00:00; owl bat
2020-11-20 13:25:10+00:00; bat dog
2020-11-20 13:21:07+00:00; dog pig
2020-11-20 13:26:36+00:00; dog bat
2020-11-20 13:27:24+00:00; pig rat
2020-11-20 13:26:24+00:00; pig cat
2020-11-20 13:21:55+00:00; owl pig
2020-11-20 13:20:55+00:00; rat pig
2020-11-20 13:22:26+00:00; bat dog
2020-11-20 13:22:25+00:00; owl dog
2020-11-20 13:24:38+00:00; bat rat
2020-11-20 13:21:19+00:00; pig cat
2020-11-20 13:23:52+00:00; bat pig
2020-11-20 13:21:03+00:00; cat rat
2020-11-20 13:20:57+00:00; bat dog
2020-11-20 13:22:09+00:00; bat rat
2020-11-20 13:21:05+00:00; owl bat
2020-11-20 13:21:23+00:00; dog pig
2020-11-20 13:21:11+00:00; cat owl
2020-11-20 13:31:29+00:00; owl dog
2020-11-20 13:31:31+00:00; dog owl
2020-11-20 13:31:41+00:00; dog owl
2020-11-20 13:28:35+00:00; rat pig
2020-11-20 13:28:09+00:00; rat dog
2020-11-20 13:28:55+00:00; pig rat
2020-11-20 13:29:51+00:00; owl pig
2020-11-20 13:27:10+00:00; pig cat
2020-11-20 13:27:36+00:00; pig owl
2020-11-20 13:31:03+00:00; owl pig
2020-11-20 13:25:46+00:00; rat bat
2020-11-20 13:28:17+00:00; rat cat
2020-11-20 13:28:53+00:00; bat pig
2020-11-20 13:21:47+00:00; owl rat
2020-11-20 13:27:16+00:00; cat bat
2020-11-20 13:21:49+00:00; cat bat
2020-11-20 13:23:44+00:00; cat bat
2020-11-20 13:22:05+00:00; owl cat
2020-11-20 13:26:28+00:00; dog pig
2020-11-20 13:24:46+00:00; cat owl
2020-11-20 13:30:27+00:00; cat dog
2020-11-20 13:27:53+00:00; owl pig
2020-11-20 13:23:10+00:00; cat pig
2020-11-20 13:24:16+00:00; pig rat
2020-11-20 13:23:22+00:00; pig rat
2020-11-20 13:23:34+00:00; pig rat
2020-11-20 13:24:42+00:00; pig owl
2020-11-20 13:22:28+00:00; pig dog
2020-11-20 13:20:47+00:00; bat owl
2020-11-20 13:25:18+00:00; pig bat
2020-11-20 13:25:28+00:00; pig rat
2020-11-20 13:23:56+00:00; owl pig
2020-11-20 13:25:30+00:00; bat cat
2020-11-20 13:32:23+00:00; rat pig
2020-11-20 13:32:37+00:00; rat owl
2020-11-20 13:32:29+00:00; cat dog
2020-11-20 13:32:41+00:00; cat rat
2020-11-20 13:33:02+00:00; rat dog
2020-11-20 13:33:00+00:00; dog owl
2020-11-20 13:31:59+00:00; pig cat
2020-11-20 13:21:09+00:00; cat owl
2020-11-20 13:24:24+00:00; pig cat
2020-11-20 13:23:46+00:00; bat owl
2020-11-20 13:31:43+00:00; dog rat
2020-11-20 13:21:53+00:00; owl dog
2020-11-20 13:32:27+00:00; pig bat
2020-11-20 13:22:38+00:00; cat pig
2020-11-20 13:20:59+00:00; rat dog
2020-11-20 13:22:19+00:00; bat cat
2020-11-20 13:31:15+00:00; bat owl
2020-11-20 13:22:23+00:00; owl pig
2020-11-20 13:32:25+00:00; rat bat
2020-11-20 13:22:44+00:00; cat rat
2020-11-20 13:32:57+00:00; cat dog
2020-11-20 13:25:40+00:00; pig rat
2020-11-20 13:30:45+00:00; pig bat
2020-11-20 13:28:39+00:00; owl cat
2020-11-20 13:30:47+00:00; pig owl
2020-11-20 13:29:17+00:00; bat dog
2020-11-20 13:26:12+00:00; cat owl
2020-11-20 13:21:15+00:00; rat pig
2020-11-20 13:22:11+00:00; pig rat
2020-11-20 13:22:40+00:00; pig owl
2020-11-20 13:21:01+00:00; owl pig
2020-11-20 13:25:50+00:00; dog pig
2020-11-20 13:23:06+00:00; dog rat
2020-11-20 13:26:38+00:00; pig owl
2020-11-20 13:23:14+00:00; cat pig
2020-11-20 13:22:36+00:00; owl cat
2020-11-20 13:22:56+00:00; dog rat
2020-11-20 13:27:38+00:00; dog rat
2020-11-20 13:21:57+00:00; cat rat
2020-11-20 13:26:30+00:00; owl bat
2020-11-20 13:26:06+00:00; rat bat
2020-11-20 13:21:17+00:00; bat cat
2020-11-20 13:21:29+00:00; pig cat
2020-11-20 13:26:40+00:00; dog owl
2020-11-20 13:27:42+00:00; cat owl
2020-11-20 13:29:25+00:00; pig owl
2020-11-20 13:21:35+00:00; pig owl
2020-11-20 13:23:32+00:00; bat owl
2020-11-20 13:23:40+00:00; rat bat
2020-11-20 13:26:16+00:00; rat pig
2020-11-20 13:28:29+00:00; owl pig
2020-11-20 13:27:51+00:00; cat rat
2020-11-20 13:30:13+00:00; owl pig
2020-11-20 13:25:04+00:00; bat cat
2020-11-20 13:24:08+00:00; rat bat
2020-11-20 13:31:27+00:00; dog bat
2020-11-20 13:26:56+00:00; pig bat
2020-11-20 13:33:40+00:00; owl cat
2020-11-20 13:33:48+00:00; owl bat
2020-11-20 13:34:14+00:00; dog bat
2020-11-20 13:33:30+00:00; bat pig
2020-11-20 13:33:38+00:00; owl pig
2020-11-20 13:33:24+00:00; cat owl
2020-11-20 13:34:08+00:00; bat owl
2020-11-20 13:33:10+00:00; dog pig
2020-11-20 13:33:32+00:00; bat cat
2020-11-20 13:33:44+00:00; bat rat
2020-11-20 13:30:03+00:00; bat rat
2020-11-20 13:30:31+00:00; cat owl
2020-11-20 13:32:33+00:00; cat pig
2020-11-20 13:24:40+00:00; pig cat
2020-11-20 13:26:22+00:00; owl bat
2020-11-20 13:28:05+00:00; cat bat
2020-11-20 13:31:11+00:00; owl cat
2020-11-20 13:23:58+00:00; owl bat
2020-11-20 13:25:06+00:00; cat pig
2020-11-20 13:31:23+00:00; rat cat
2020-11-20 13:23:48+00:00; dog rat
2020-11-20 13:26:52+00:00; owl bat
2020-11-20 13:23:36+00:00; owl rat
2020-11-20 13:21:51+00:00; rat pig
2020-11-20 13:21:43+00:00; bat owl
2020-11-20 13:32:49+00:00; owl bat
2020-11-20 13:23:20+00:00; rat pig
2020-11-20 13:30:53+00:00; bat owl
2020-11-20 13:26:46+00:00; bat cat
2020-11-20 13:21:31+00:00; cat bat
2020-11-20 13:28:43+00:00; rat pig
2020-11-20 13:30:29+00:00; owl cat
2020-11-20 13:33:22+00:00; dog pig
2020-11-20 13:31:21+00:00; bat owl
2020-11-20 13:32:39+00:00; pig dog
2020-11-20 13:21:41+00:00; pig dog
2020-11-20 13:26:10+00:00; bat rat
2020-11-20 13:27:00+00:00; pig owl
2020-11-20 13:22:03+00:00; cat dog
2020-11-20 13:22:21+00:00; dog pig
2020-11-20 13:30:05+00:00; bat dog
2020-11-20 13:25:02+00:00; cat bat
2020-11-20 13:24:02+00:00; pig bat
2020-11-20 13:27:18+00:00; cat pig
2020-11-20 13:23:08+00:00; cat bat
2020-11-20 13:28:03+00:00; pig cat
2020-11-20 13:31:05+00:00; dog pig
2020-11-20 13:26:18+00:00; owl rat
2020-11-20 13:23:42+00:00; cat dog
2020-11-20 13:23:24+00:00; bat owl
2020-11-20 13:24:14+00:00; dog rat
2020-11-20 13:25:24+00:00; dog bat
2020-11-20 13:27:40+00:00; dog rat
2020-11-20 13:23:50+00:00; cat pig
2020-11-20 13:27:48+00:00; rat cat
2020-11-20 13:30:25+00:00; dog bat
2020-11-20 13:30:21+00:00; dog bat
2020-11-20 13:30:35+00:00; cat owl
2020-11-20 13:25:38+00:00; owl rat
2020-11-20 13:31:01+00:00; owl pig
2020-11-20 13:21:33+00:00; rat bat
2020-11-20 13:28:59+00:00; owl cat
2020-11-20 13:33:16+00:00; cat owl
2020-11-20 13:35:36+00:00; dog bat
2020-11-20 13:34:36+00:00; rat pig
2020-11-20 13:34:34+00:00; pig dog
2020-11-20 13:35:26+00:00; owl rat
2020-11-20 13:34:58+00:00; pig bat
2020-11-20 13:35:24+00:00; cat bat
2020-11-20 13:35:22+00:00; dog pig
2020-11-20 13:35:44+00:00; dog pig
2020-11-20 13:34:26+00:00; cat rat
2020-11-20 13:35:18+00:00; bat dog
2020-11-20 13:29:43+00:00; owl cat
2020-11-20 13:31:39+00:00; bat rat
2020-11-20 13:25:48+00:00; bat pig
2020-11-20 13:27:08+00:00; cat rat
2020-11-20 13:28:47+00:00; pig bat
2020-11-20 13:29:03+00:00; bat rat
2020-11-20 13:31:55+00:00; bat cat
2020-11-20 13:32:43+00:00; cat dog
2020-11-20 13:35:34+00:00; cat bat
2020-11-20 13:24:52+00:00; cat pig
2020-11-20 13:28:15+00:00; cat rat
2020-11-20 13:30:37+00:00; bat cat
2020-11-20 13:24:10+00:00; owl pig
2020-11-20 13:25:34+00:00; pig rat
2020-11-20 13:31:51+00:00; dog owl
2020-11-20 13:29:39+00:00; rat dog
2020-11-20 13:24:32+00:00; bat owl

Next, we will convert the DataFrame to a Dataset of String using .as[String], so that we can apply the flatMap operation to split each line into multiple words. The resultant words Dataset contains all the words.

val words = streamingLines.as[String]
                          .map(line => line.split(";").drop(1)(0)) // this is to simply cut out the timestamp from this stream
                          .flatMap(_.split(" ")) // flat map by splitting the animal words separated by whitespace
                          .filter( _ != "") // remove empty words that may be artifacts of opening whitespace
words: org.apache.spark.sql.Dataset[String] = [value: string]

Finally, we define the wordCounts DataFrame by grouping by the unique values in the Dataset and counting them. Note that this is a streaming DataFrame which represents the running word counts of the stream.

// Generate running word count
val wordCounts = words
                  .groupBy("value").count() // this does the word count
                  .orderBy($"count".desc) // we are simply sorting by the most frequent words
wordCounts: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [value: string, count: bigint]

We have now set up the query on the streaming data. All that is left is to actually start receiving data and computing the counts. To do this, we set it up to print the complete set of counts (specified by outputMode("complete")) to the console every time they are updated. And then start the streaming computation using start().

// Start running the query that prints the running counts to the console
val query = wordCounts.writeStream
      .outputMode("complete")
      .format("console")
      .start()

query.awaitTermination() // hit cancel to terminate - killall the bash script in 037a_AnimalNamesStructStreamingFiles
-------------------------------------------
Batch: 0
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|  cat|    1|
|  owl|    1|
+-----+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|  cat|    2|
|  pig|    1|
|  owl|    1|
+-----+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|  cat|    2|
|  pig|    2|
|  owl|    1|
|  dog|    1|
+-----+-----+

-------------------------------------------
Batch: 3
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|  cat|    3|
|  dog|    2|
|  pig|    2|
|  owl|    1|
+-----+-----+

-------------------------------------------
Batch: 4
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|  cat|    4|
|  pig|    3|
|  dog|    2|
|  owl|    1|
+-----+-----+

-------------------------------------------
Batch: 5
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|  pig|    4|
|  cat|    4|
|  dog|    2|
|  owl|    1|
|  rat|    1|
+-----+-----+

-------------------------------------------
Batch: 6
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|  pig|    5|
|  cat|    4|
|  dog|    2|
|  owl|    2|
|  rat|    1|
+-----+-----+

-------------------------------------------
Batch: 7
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|  pig|    5|
|  cat|    4|
|  owl|    3|
|  dog|    2|
|  rat|    1|
|  bat|    1|
+-----+-----+

-------------------------------------------
Batch: 8
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|  pig|    5|
|  cat|    4|
|  owl|    4|
|  dog|    2|
|  bat|    2|
|  rat|    1|
+-----+-----+

-------------------------------------------
Batch: 9
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|  pig|    5|
|  owl|    5|
|  cat|    4|
|  bat|    3|
|  dog|    2|
|  rat|    1|
+-----+-----+

-------------------------------------------
Batch: 10
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|  owl|    5|
|  pig|    5|
|  bat|    4|
|  cat|    4|
|  dog|    2|
|  rat|    2|
+-----+-----+

After this code is executed, the streaming computation will have started in the background. The query object is a handle to that active streaming query, and we have decided to wait for the termination of the query using awaitTermination() to prevent the process from exiting while the query is active.

Handling Event-time and Late Data

Event-time is the time embedded in the data itself. For many applications, you may want to operate on this event-time. For example, if you want to get the number of events generated by IoT devices every minute, then you probably want to use the time when the data was generated (that is, event-time in the data), rather than the time Spark receives them. This event-time is very naturally expressed in this model – each event from the devices is a row in the table, and event-time is a column value in the row. This allows window-based aggregations (e.g. number of events every minute) to be just a special type of grouping and aggregation on the event-time column – each time window is a group and each row can belong to multiple windows/groups. Therefore, such event-time-window-based aggregation queries can be defined consistently on both a static dataset (e.g. from collected device events logs) as well as on a data stream, making the life of the user much easier.

Furthermore, this model naturally handles data that has arrived later than expected based on its event-time. Since Spark is updating the Result Table, it has full control over updating old aggregates when there is late data, as well as cleaning up old aggregates to limit the size of intermediate state data. Since Spark 2.1, we have support for watermarking which allows the user to specify the threshold of late data, and allows the engine to accordingly clean up old state. These are explained later in more detail in the Window Operations section below.

Fault Tolerance Semantics

Delivering end-to-end exactly-once semantics was one of key goals behind the design of Structured Streaming. To achieve that, we have designed the Structured Streaming sources, the sinks and the execution engine to reliably track the exact progress of the processing so that it can handle any kind of failure by restarting and/or reprocessing. Every streaming source is assumed to have offsets (similar to Kafka offsets, or Kinesis sequence numbers) to track the read position in the stream. The engine uses checkpointing and write ahead logs to record the offset range of the data being processed in each trigger. The streaming sinks are designed to be idempotent for handling reprocessing. Together, using replayable sources and idempotent sinks, Structured Streaming can ensure end-to-end exactly-once semantics under any failure.

API using Datasets and DataFrames

Since Spark 2.0, DataFrames and Datasets can represent static, bounded data, as well as streaming, unbounded data. Similar to static Datasets/DataFrames, you can use the common entry point SparkSession (Scala/Java/Python/R docs) to create streaming DataFrames/Datasets from streaming sources, and apply the same operations on them as static DataFrames/Datasets. If you are not familiar with Datasets/DataFrames, you are strongly advised to familiarize yourself with them using the DataFrame/Dataset Programming Guide.

Creating streaming DataFrames and streaming Datasets

Streaming DataFrames can be created through the DataStreamReader interface (Scala/Java/Python docs) returned by SparkSession.readStream(). In R, with the read.stream() method. Similar to the read interface for creating static DataFrame, you can specify the details of the source – data format, schema, options, etc.

Input Sources

In Spark 2.0, there are a few built-in sources.

  • File source - Reads files written in a directory as a stream of data. Supported file formats are text, csv, json, parquet. See the docs of the DataStreamReader interface for a more up-to-date list, and supported options for each file format. Note that the files must be atomically placed in the given directory, which in most file systems, can be achieved by file move operations.

  • Kafka source - Poll data from Kafka. It’s compatible with Kafka broker versions 0.10.0 or higher. See the Kafka Integration Guide for more details.

  • Socket source (for testing) - Reads UTF8 text data from a socket connection. The listening server socket is at the driver. Note that this should be used only for testing as this does not provide end-to-end fault-tolerance guarantees.

Some sources are not fault-tolerant because they do not guarantee that data can be replayed using checkpointed offsets after a failure. See the earlier section on fault-tolerance semantics. Here are the details of all the sources in Spark.

Source Options Fault-tolerant Notes
File source path: path to the input directory, and common to all file formats.
maxFilesPerTrigger: maximum number of new files to be considered in every trigger (default: no max)
latestFirst: whether to processs the latest new files first, useful when there is a large backlog of files (default: false)
fileNameOnly: whether to check new files based on only the filename instead of on the full path (default: false). With this set to `true`, the following files would be considered as the same file, because their filenames, "dataset.txt", are the same:
· "file:///dataset.txt"
· "s3://a/dataset.txt"
· "s3n://a/b/dataset.txt"
· "s3a://a/b/c/dataset.txt"

    <br />
    For file-format-specific options, see the related methods in <code>DataStreamReader</code>
    (<a href="https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.sql.streaming.DataStreamReader">Scala</a>/<a href="https://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/sql/streaming/DataStreamReader.html">Java</a>/<a href="https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.streaming.DataStreamReader">Python</a>/<a href="https://spark.apache.org/docs/2.2.0/api/R/read.stream.html">R</a>).
    E.g. for "parquet" format options see <code>DataStreamReader.parquet()</code></td>
<td>Yes</td>
<td>Supports glob paths, but does not support multiple comma-separated paths/globs.</td>
Socket Source host: host to connect to, must be specified
port: port to connect to, must be specified
No
Kafka Source See the Kafka Integration Guide. Yes

See https://spark.apache.org/docs/2.2.0/structured-streaming-programming-guide.html#input-sources.

Schema inference and partition of streaming DataFrames/Datasets

By default, Structured Streaming from file based sources requires you to specify the schema, rather than rely on Spark to infer it automatically (this is what we did with userSchema above). This restriction ensures a consistent schema will be used for the streaming query, even in the case of failures. For ad-hoc use cases, you can reenable schema inference by setting spark.sql.streaming.schemaInference to true.

Partition discovery does occur when subdirectories that are named /key=value/ are present and listing will automatically recurse into these directories. If these columns appear in the user provided schema, they will be filled in by Spark based on the path of the file being read. The directories that make up the partitioning scheme must be present when the query starts and must remain static. For example, it is okay to add /data/year=2016/ when /data/year=2015/ was present, but it is invalid to change the partitioning column (i.e. by creating the directory /data/date=2016-04-17/).

Operations on streaming DataFrames/Datasets

You can apply all kinds of operations on streaming DataFrames/Datasets – ranging from untyped, SQL-like operations (e.g. select, where, groupBy), to typed RDD-like operations (e.g. map, filter, flatMap). See the SQL programming guide for more details. Let’s take a look at a few example operations that you can use.

Basic Operations - Selection, Projection, Aggregation

Most of the common operations on DataFrame/Dataset are supported for streaming. The few operations that are not supported are discussed later in unsupported-operations section.

    case class DeviceData(device: String, deviceType: String, signal: Double, time: DateTime)

    val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: string }
    val ds: Dataset[DeviceData] = df.as[DeviceData]    // streaming Dataset with IOT device data

    // Select the devices which have signal more than 10
    df.select("device").where("signal > 10")      // using untyped APIs
    ds.filter(_.signal > 10).map(_.device)         // using typed APIs

    // Running count of the number of updates for each device type
    df.groupBy("deviceType").count()                          // using untyped API

    // Running average signal for each device type
    import org.apache.spark.sql.expressions.scalalang.typed
    ds.groupByKey(_.deviceType).agg(typed.avg(_.signal))    // using typed API

A Quick Mixture Example

We will work below with a file stream that simulates random animal names or a simple mixture of two Normal Random Variables.

The two file streams can be acieved by running the codes in the following two databricks notebooks in the same cluster:

  • 037a_AnimalNamesStructStreamingFiles
  • 037b_Mix2NormalsStructStreamingFiles

You should have the following set of csv files (it won't be exactly the same names depending on when you start the stream of files).

display(dbutils.fs.ls("/datasets/streamingFilesNormalMixture/"))
path name size
dbfs:/datasets/streamingFilesNormalMixture/48_11/ 48_11/ 0.0
dbfs:/datasets/streamingFilesNormalMixture/48_19/ 48_19/ 0.0
dbfs:/datasets/streamingFilesNormalMixture/48_26/ 48_26/ 0.0
dbfs:/datasets/streamingFilesNormalMixture/48_36/ 48_36/ 0.0
dbfs:/datasets/streamingFilesNormalMixture/48_43/ 48_43/ 0.0

Static and Streaming DataFrames

Let's check out the files and their contents both via static as well as streaming DataFrames.

This will also cement the fact that structured streaming allows interoperability between static and streaming data and can be useful for debugging.

val peekIn = spark.read.format("csv").load("/datasets/streamingFilesNormalMixture/*/*.csv")
peekIn.count() // total count of all the samples in all the files
peekIn: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string]
res8: Long = 500
peekIn.show(5, false) // let's take a quick peek at what's in the CSV files
+-----------------------+--------------------+
|_c0                    |_c1                 |
+-----------------------+--------------------+
|2020-11-16 10:48:25.294|0.21791376679544772 |
|2020-11-16 10:48:25.299|0.011291967445604012|
|2020-11-16 10:48:25.304|-0.30293144696154806|
|2020-11-16 10:48:25.309|0.4303254534802833  |
|2020-11-16 10:48:25.314|1.5521304466388752  |
+-----------------------+--------------------+
only showing top 5 rows
// Read all the csv files written atomically from a directory
import org.apache.spark.sql.types._

//make a user-specified schema - this is needed for structured streaming from files
val userSchema = new StructType()
                      .add("time", "timestamp")
                      .add("score", "Double")

// a static DF is convenient 
val csvStaticDF = spark
  .read
  .option("sep", ",") // delimiter is ','
  .schema(userSchema) // Specify schema of the csv files as pre-defined by user
  .csv("/datasets/streamingFilesNormalMixture/*/*.csv")    // Equivalent to format("csv").load("/path/to/directory")

// streaming DF
val csvStreamingDF = spark
  .readStream
  .option("sep", ",") // delimiter is ','
  .schema(userSchema) // Specify schema of the csv files as pre-defined by user
  .option("MaxFilesPerTrigger", 1) //  maximum number of new files to be considered in every trigger (default: no max) 
  .csv("/datasets/streamingFilesNormalMixture/*/*.csv")    // Equivalent to format("csv").load("/path/to/directory")
import org.apache.spark.sql.types._
userSchema: org.apache.spark.sql.types.StructType = StructType(StructField(time,TimestampType,true), StructField(score,DoubleType,true))
csvStaticDF: org.apache.spark.sql.DataFrame = [time: timestamp, score: double]
csvStreamingDF: org.apache.spark.sql.DataFrame = [time: timestamp, score: double]
csvStreamingDF.isStreaming    // Returns True for DataFrames that have streaming sources
res12: Boolean = true
csvStreamingDF.printSchema
root
 |-- time: timestamp (nullable = true)
 |-- score: double (nullable = true)
display(csvStreamingDF) // if you want to see the stream coming at you as csvDF
time score
2020-11-16T10:48:11.194+0000 0.2576188264990721
2020-11-16T10:48:11.199+0000 -0.13149698512045327
2020-11-16T10:48:11.204+0000 1.4139063973267458
2020-11-16T10:48:11.209+0000 -2.3833875968513496e-2
2020-11-16T10:48:11.215+0000 0.7274784426774964
2020-11-16T10:48:11.220+0000 -1.0658630481235276
2020-11-16T10:48:11.225+0000 0.746959841932221
2020-11-16T10:48:11.230+0000 0.30477096247050206
2020-11-16T10:48:11.235+0000 -6.407620682061621e-2
2020-11-16T10:48:11.241+0000 1.8464307210258604
2020-11-16T10:48:11.246+0000 2.0786529531264355
2020-11-16T10:48:11.251+0000 0.685838993990332
2020-11-16T10:48:11.256+0000 2.3056211153362485
2020-11-16T10:48:11.261+0000 -0.7435548094085835
2020-11-16T10:48:11.267+0000 -0.36946067155650786
2020-11-16T10:48:11.272+0000 1.1178132434092503
2020-11-16T10:48:11.277+0000 1.0672400098827672
2020-11-16T10:48:11.282+0000 2.403799182291664
2020-11-16T10:48:11.287+0000 2.7905949803662926
2020-11-16T10:48:11.293+0000 2.3901047303648846
2020-11-16T10:48:11.298+0000 2.2391322699010967
2020-11-16T10:48:11.303+0000 0.7102559487906945
2020-11-16T10:48:11.308+0000 -0.1875570296359037
2020-11-16T10:48:11.313+0000 2.0036998039560725
2020-11-16T10:48:11.318+0000 2.028162246705019
2020-11-16T10:48:11.324+0000 -1.1084782237141253
2020-11-16T10:48:11.329+0000 2.7320985336302965
2020-11-16T10:48:11.334+0000 1.7953021498619885
2020-11-16T10:48:11.339+0000 1.3332433299615185
2020-11-16T10:48:11.344+0000 1.2842120504662247
2020-11-16T10:48:11.349+0000 2.0013530061962186
2020-11-16T10:48:11.355+0000 1.2596569236824775
2020-11-16T10:48:11.360+0000 2.46479668588018
2020-11-16T10:48:11.365+0000 -0.7015927727061835
2020-11-16T10:48:11.370+0000 -0.510611131534981
2020-11-16T10:48:11.375+0000 0.9403812557496112
2020-11-16T10:48:11.381+0000 2.2306482205877427
2020-11-16T10:48:11.386+0000 -0.29781070820511246
2020-11-16T10:48:11.391+0000 4.107241990001628
2020-11-16T10:48:11.396+0000 0.7420568724108764
2020-11-16T10:48:11.401+0000 1.4652231673746594
2020-11-16T10:48:11.407+0000 0.8793849318247119
2020-11-16T10:48:11.412+0000 1.7671614106752898
2020-11-16T10:48:11.417+0000 1.1995772213743607
2020-11-16T10:48:11.422+0000 1.1351566745099897
2020-11-16T10:48:11.427+0000 0.16150528245701323
2020-11-16T10:48:11.432+0000 2.459849452657596
2020-11-16T10:48:11.438+0000 1.0796739450956971
2020-11-16T10:48:11.443+0000 -1.2079899446434252
2020-11-16T10:48:11.448+0000 0.7019279468450133
2020-11-16T10:48:11.453+0000 -2.5906759976580096e-2
2020-11-16T10:48:11.458+0000 1.025799236502406
2020-11-16T10:48:11.463+0000 2.423754193708396
2020-11-16T10:48:11.469+0000 1.0100073192180106
2020-11-16T10:48:11.474+0000 1.2308412912433588
2020-11-16T10:48:11.479+0000 2.2142939785873326
2020-11-16T10:48:11.484+0000 9.639219241219372
2020-11-16T10:48:11.489+0000 0.8964067897832677
2020-11-16T10:48:11.494+0000 2.583753664296168
2020-11-16T10:48:11.499+0000 1.7326439212827238
2020-11-16T10:48:11.505+0000 0.7516388863094139
2020-11-16T10:48:11.510+0000 0.8725633940449549
2020-11-16T10:48:11.515+0000 -0.9407676766254014
2020-11-16T10:48:11.520+0000 1.0542712925875175
2020-11-16T10:48:11.525+0000 0.794535189312687
2020-11-16T10:48:11.530+0000 0.5813794557982226
2020-11-16T10:48:11.536+0000 0.4891368786472011
2020-11-16T10:48:11.541+0000 2.3296394918008474
2020-11-16T10:48:11.546+0000 1.425296303524094
2020-11-16T10:48:11.551+0000 1.9276679925454094
2020-11-16T10:48:11.556+0000 0.6178050147872097
2020-11-16T10:48:11.561+0000 1.135269636375052
2020-11-16T10:48:11.567+0000 1.3074367248762568
2020-11-16T10:48:11.572+0000 0.6105659268751382
2020-11-16T10:48:11.577+0000 1.7812955395572572
2020-11-16T10:48:11.582+0000 -1.3547368916771827
2020-11-16T10:48:11.587+0000 1.580412775615275
2020-11-16T10:48:11.592+0000 1.5731144914401023
2020-11-16T10:48:11.597+0000 -5.725067553082108e-2
2020-11-16T10:48:11.603+0000 0.19580347035995105
2020-11-16T10:48:11.608+0000 -2.1501122555202867e-2
2020-11-16T10:48:11.613+0000 1.5783579658949254
2020-11-16T10:48:11.618+0000 1.371796305513024
2020-11-16T10:48:11.623+0000 0.648919899258448
2020-11-16T10:48:11.628+0000 -0.7875773550339058
2020-11-16T10:48:11.633+0000 1.3233945353130716
2020-11-16T10:48:11.639+0000 2.5685224032022127
2020-11-16T10:48:11.644+0000 2.7331317575905807
2020-11-16T10:48:11.649+0000 0.2521381731074053
2020-11-16T10:48:11.654+0000 2.2408918489807905
2020-11-16T10:48:11.659+0000 1.4924862197354933
2020-11-16T10:48:11.664+0000 1.194657083531184
2020-11-16T10:48:11.670+0000 0.7067352811215412
2020-11-16T10:48:11.675+0000 2.7701718519244745e-2
2020-11-16T10:48:11.681+0000 0.279797547315617
2020-11-16T10:48:11.686+0000 -0.21953266770586133
2020-11-16T10:48:11.691+0000 1.1402931320647434
2020-11-16T10:48:11.696+0000 0.904724947360263
2020-11-16T10:48:11.702+0000 0.6677145203694429
2020-11-16T10:48:11.707+0000 2.019977647420342
2020-11-16T10:48:18.539+0000 -0.5190278662580565
2020-11-16T10:48:18.545+0000 1.2549405940975034
2020-11-16T10:48:18.550+0000 2.4267606721380233
2020-11-16T10:48:18.555+0000 0.21858105660909444
2020-11-16T10:48:18.560+0000 1.7701229392924476
2020-11-16T10:48:18.566+0000 8.326770280505069e-2
2020-11-16T10:48:18.571+0000 11.539205812425335
2020-11-16T10:48:18.576+0000 0.612370126029857
2020-11-16T10:48:18.581+0000 1.299073306785623
2020-11-16T10:48:18.586+0000 2.6939073650678083
2020-11-16T10:48:18.592+0000 2.5320627406973344
2020-11-16T10:48:18.597+0000 2.781337457744293e-2
2020-11-16T10:48:18.602+0000 0.3272489908510584
2020-11-16T10:48:18.607+0000 -0.9427386544836929
2020-11-16T10:48:18.613+0000 0.9364640268126377
2020-11-16T10:48:18.618+0000 1.919225736153371
2020-11-16T10:48:18.623+0000 0.38826998132506296
2020-11-16T10:48:18.628+0000 -0.38655650387475715
2020-11-16T10:48:18.633+0000 1.0433731216978939
2020-11-16T10:48:18.638+0000 1.1500718903613745
2020-11-16T10:48:18.644+0000 -0.3661280681150447
2020-11-16T10:48:18.649+0000 0.883444064705467
2020-11-16T10:48:18.654+0000 -0.9126173899348853
2020-11-16T10:48:18.659+0000 0.3838114564837034
2020-11-16T10:48:18.665+0000 0.7935189081504388
2020-11-16T10:48:18.670+0000 1.928137393349846
2020-11-16T10:48:18.675+0000 4.7092811957255676e-2
2020-11-16T10:48:18.680+0000 0.4684849965794433
2020-11-16T10:48:18.685+0000 0.6745536358089256
2020-11-16T10:48:18.691+0000 2.100439331925503
2020-11-16T10:48:18.696+0000 1.0053957395581328
2020-11-16T10:48:18.701+0000 1.1651633690031988
2020-11-16T10:48:18.706+0000 1.1620631665685186
2020-11-16T10:48:18.711+0000 0.5686294459758102
2020-11-16T10:48:18.717+0000 5.4695916815372114e-2
2020-11-16T10:48:18.722+0000 0.3673527645506809
2020-11-16T10:48:18.727+0000 1.1825682382920246
2020-11-16T10:48:18.732+0000 2.590900208851957
2020-11-16T10:48:18.738+0000 0.9580677196122074
2020-11-16T10:48:18.743+0000 0.14058634902492095
2020-11-16T10:48:18.748+0000 1.835715236145623
2020-11-16T10:48:18.753+0000 1.0262133311924941
2020-11-16T10:48:18.758+0000 2.3956360313411276
2020-11-16T10:48:18.763+0000 -0.42622276533874537
2020-11-16T10:48:18.769+0000 1.532866051791267
2020-11-16T10:48:18.774+0000 0.33837135147986275
2020-11-16T10:48:18.779+0000 0.5993221970260502
2020-11-16T10:48:18.784+0000 0.5268259369536397
2020-11-16T10:48:18.789+0000 0.9338448405595184
2020-11-16T10:48:18.795+0000 1.5020324977316601
2020-11-16T10:48:18.800+0000 -0.21633343524824378
2020-11-16T10:48:18.805+0000 0.8387080531274844
2020-11-16T10:48:18.810+0000 1.3278878139665884e-2
2020-11-16T10:48:18.815+0000 1.3291762275434373
2020-11-16T10:48:18.820+0000 0.4837833343304839
2020-11-16T10:48:18.826+0000 0.4918446444728072
2020-11-16T10:48:18.831+0000 1.354678573169704
2020-11-16T10:48:18.836+0000 0.2524216007924791
2020-11-16T10:48:18.841+0000 0.5965026762340784
2020-11-16T10:48:18.846+0000 2.000850130836448
2020-11-16T10:48:18.851+0000 2.217169275505519
2020-11-16T10:48:18.857+0000 0.6876140376775531
2020-11-16T10:48:18.862+0000 1.0508210912529563
2020-11-16T10:48:18.867+0000 1.65676102704454
2020-11-16T10:48:18.872+0000 2.155047641017994
2020-11-16T10:48:18.877+0000 1.0866488363653375
2020-11-16T10:48:18.882+0000 1.0691398773308363
2020-11-16T10:48:18.888+0000 0.6120836384011098
2020-11-16T10:48:18.893+0000 0.24914099314834415
2020-11-16T10:48:18.898+0000 2.8691481936548744
2020-11-16T10:48:18.903+0000 0.7633561289177443
2020-11-16T10:48:18.908+0000 1.4483835248568062
2020-11-16T10:48:18.913+0000 2.6108825545691863
2020-11-16T10:48:18.918+0000 1.2751533422561458
2020-11-16T10:48:18.924+0000 1.0131179898567302
2020-11-16T10:48:18.929+0000 0.46308679994249036
2020-11-16T10:48:18.935+0000 0.7793261962344651
2020-11-16T10:48:18.940+0000 1.1671037114122738
2020-11-16T10:48:18.945+0000 2.143874895015684
2020-11-16T10:48:18.950+0000 1.2344250301306705
2020-11-16T10:48:18.955+0000 1.7402355361851662
2020-11-16T10:48:18.960+0000 1.0396911219696297
2020-11-16T10:48:18.966+0000 1.8089030277370215
2020-11-16T10:48:18.971+0000 2.1235708326267533
2020-11-16T10:48:18.976+0000 -0.33938888075466234
2020-11-16T10:48:18.981+0000 1.090463095441436
2020-11-16T10:48:18.986+0000 1.3101016219338661
2020-11-16T10:48:18.992+0000 -0.6251493773996968
2020-11-16T10:48:18.998+0000 1.7223308331307168
2020-11-16T10:48:19.003+0000 1.0299845635585438
2020-11-16T10:48:19.009+0000 1.962846046162154
2020-11-16T10:48:19.014+0000 -1.8537289273720337e-2
2020-11-16T10:48:19.019+0000 0.7977254725466605
2020-11-16T10:48:19.024+0000 -0.21427479370557312
2020-11-16T10:48:19.029+0000 -1.6661289018266037
2020-11-16T10:48:19.034+0000 1.144457447997468
2020-11-16T10:48:19.043+0000 0.6503516296653954
2020-11-16T10:48:19.048+0000 6.581335919503728e-2
2020-11-16T10:48:19.053+0000 1.5478749815243467
2020-11-16T10:48:19.058+0000 1.5497411627601851
2020-11-16T10:48:25.294+0000 0.21791376679544772
2020-11-16T10:48:25.299+0000 1.1291967445604012e-2
2020-11-16T10:48:25.304+0000 -0.30293144696154806
2020-11-16T10:48:25.309+0000 0.4303254534802833
2020-11-16T10:48:25.314+0000 1.5521304466388752
2020-11-16T10:48:25.319+0000 2.2910302464408394
2020-11-16T10:48:25.325+0000 0.4374695472538803
2020-11-16T10:48:25.330+0000 0.4085186427342812
2020-11-16T10:48:25.335+0000 -6.531316403553289e-2
2020-11-16T10:48:25.340+0000 6.39812257122474e-3
2020-11-16T10:48:25.345+0000 0.24840501087934996
2020-11-16T10:48:25.350+0000 -1.021974709142702
2020-11-16T10:48:25.355+0000 -9.233941622902653e-2
2020-11-16T10:48:25.361+0000 0.41027379764960337
2020-11-16T10:48:25.366+0000 1.864567223228712
2020-11-16T10:48:25.371+0000 1.5393474896194466
2020-11-16T10:48:25.376+0000 1.124907339909468
2020-11-16T10:48:25.381+0000 2.0206475875654997
2020-11-16T10:48:25.386+0000 -0.7058862229186389
2020-11-16T10:48:25.392+0000 1.2344926787652002
2020-11-16T10:48:25.397+0000 1.1406194673922239
2020-11-16T10:48:25.402+0000 1.4084552620839659
2020-11-16T10:48:25.407+0000 0.739931161380885
2020-11-16T10:48:25.412+0000 0.29958396894640427
2020-11-16T10:48:25.417+0000 -0.9379262816791101
2020-11-16T10:48:25.422+0000 0.8259556704405835
2020-11-16T10:48:25.428+0000 -0.3199802616466474
2020-11-16T10:48:25.433+0000 1.9656420693625898
2020-11-16T10:48:25.438+0000 0.8789984776053141
2020-11-16T10:48:25.443+0000 2.4965042040211793
2020-11-16T10:48:25.448+0000 1.714778861431627
2020-11-16T10:48:25.454+0000 0.8669641143187272
2020-11-16T10:48:25.459+0000 1.0757413525008879
2020-11-16T10:48:25.464+0000 1.9658378382249264e-2
2020-11-16T10:48:25.469+0000 0.7165095911306543
2020-11-16T10:48:25.474+0000 1.2251547673860115
2020-11-16T10:48:25.479+0000 1.5869187313570912
2020-11-16T10:48:25.485+0000 0.3928727449886338
2020-11-16T10:48:25.490+0000 1.7722759642539445
2020-11-16T10:48:25.495+0000 1.0350331272239843
2020-11-16T10:48:25.500+0000 -1.4234008750858624
2020-11-16T10:48:25.505+0000 0.6054572828043063
2020-11-16T10:48:25.511+0000 0.3024585268617903
2020-11-16T10:48:25.516+0000 2.9432999768948087e-2
2020-11-16T10:48:25.521+0000 0.9382472473173075
2020-11-16T10:48:25.526+0000 2.11287419383702
2020-11-16T10:48:25.531+0000 1.0876022969280528
2020-11-16T10:48:25.536+0000 0.36548993902899596
2020-11-16T10:48:25.542+0000 -2.005053653271253
2020-11-16T10:48:25.547+0000 2.0367928918435894
2020-11-16T10:48:25.552+0000 9.261254419611942e-2
2020-11-16T10:48:25.557+0000 2.156248406806113
2020-11-16T10:48:25.562+0000 -0.5295405173638772
2020-11-16T10:48:25.568+0000 2.452318995994742
2020-11-16T10:48:25.573+0000 0.8636413385915132
2020-11-16T10:48:25.578+0000 0.31460938814139794
2020-11-16T10:48:25.583+0000 -2.0257131370059023e-2
2020-11-16T10:48:25.588+0000 1.3213739526626505
2020-11-16T10:48:25.593+0000 0.9463001869917488
2020-11-16T10:48:25.599+0000 0.986171393681171
2020-11-16T10:48:25.604+0000 0.12492672949874628
2020-11-16T10:48:25.609+0000 0.9908400692267174
2020-11-16T10:48:25.614+0000 1.0695623856543282
2020-11-16T10:48:25.621+0000 1.0221220766637027
2020-11-16T10:48:25.627+0000 2.8492797946693904
2020-11-16T10:48:25.632+0000 1.0609742751901396
2020-11-16T10:48:25.637+0000 1.6409490831011158
2020-11-16T10:48:25.642+0000 1.5427085071446491
2020-11-16T10:48:25.647+0000 1.7312859942989034
2020-11-16T10:48:25.653+0000 1.2947069326850533
2020-11-16T10:48:25.658+0000 0.3756138591369289
2020-11-16T10:48:25.663+0000 1.4349084022701803
2020-11-16T10:48:25.668+0000 0.37649651121290106
2020-11-16T10:48:25.673+0000 0.7071860096564935
2020-11-16T10:48:25.679+0000 1.5065536846394356
2020-11-16T10:48:25.684+0000 0.15009861698305105
2020-11-16T10:48:25.689+0000 3.5084734586888766e-2
2020-11-16T10:48:25.695+0000 1.9474563946729155
2020-11-16T10:48:25.700+0000 9.423175513609095
2020-11-16T10:48:25.705+0000 2.4871634825039015
2020-11-16T10:48:25.710+0000 2.8472676324820685
2020-11-16T10:48:25.715+0000 1.5999488876250578
2020-11-16T10:48:25.720+0000 -0.2693864675719999
2020-11-16T10:48:25.725+0000 1.6304414331783441
2020-11-16T10:48:25.731+0000 0.39324529792831353
2020-11-16T10:48:25.736+0000 0.4053253263569069
2020-11-16T10:48:25.741+0000 0.9270234970247857
2020-11-16T10:48:25.746+0000 1.4509585503273819
2020-11-16T10:48:25.751+0000 0.8878267401905819
2020-11-16T10:48:25.756+0000 1.1883024549090635
2020-11-16T10:48:25.761+0000 1.0163155722641077
2020-11-16T10:48:25.767+0000 -0.8003099498427713
2020-11-16T10:48:25.772+0000 -0.9483216075980454
2020-11-16T10:48:25.777+0000 1.0437451610964232
2020-11-16T10:48:25.782+0000 2.19837214407137
2020-11-16T10:48:25.787+0000 2.070797890483533
2020-11-16T10:48:25.792+0000 1.2067096088561005
2020-11-16T10:48:25.798+0000 0.5043809533024068
2020-11-16T10:48:25.803+0000 0.3683130512293926
2020-11-16T10:48:25.808+0000 1.0968506619209946
2020-11-16T10:48:35.887+0000 -0.6602896123630477
2020-11-16T10:48:35.892+0000 6.829641971377687e-2
2020-11-16T10:48:35.898+0000 1.5578597945995134
2020-11-16T10:48:35.903+0000 0.9822629073468155
2020-11-16T10:48:35.908+0000 -0.7900771590527182
2020-11-16T10:48:35.913+0000 1.1194124344742182
2020-11-16T10:48:35.918+0000 1.1239015052468448
2020-11-16T10:48:35.924+0000 1.9447892371838207
2020-11-16T10:48:35.929+0000 2.0854603958592985
2020-11-16T10:48:35.934+0000 0.17341117815802976
2020-11-16T10:48:35.939+0000 1.5971150699056031
2020-11-16T10:48:35.944+0000 0.35646629992342993
2020-11-16T10:48:35.950+0000 1.8107324499508701
2020-11-16T10:48:35.955+0000 3.463539114641669
2020-11-16T10:48:35.960+0000 0.8683263379823365
2020-11-16T10:48:35.965+0000 1.2642821462325637
2020-11-16T10:48:35.970+0000 1.0099560176390794
2020-11-16T10:48:35.975+0000 1.1930381560126895
2020-11-16T10:48:35.981+0000 0.5433757598192581
2020-11-16T10:48:35.986+0000 1.0213782743479625
2020-11-16T10:48:35.991+0000 1.5049231054950472
2020-11-16T10:48:35.996+0000 0.22101559200796428
2020-11-16T10:48:36.001+0000 1.8743753391414122
2020-11-16T10:48:36.006+0000 0.6050230742039573
2020-11-16T10:48:36.012+0000 0.6939669876285336
2020-11-16T10:48:36.017+0000 1.5379566524515602
2020-11-16T10:48:36.022+0000 -0.6869579758877387
2020-11-16T10:48:36.027+0000 -0.4823865565169676
2020-11-16T10:48:36.032+0000 2.577388594447341
2020-11-16T10:48:36.037+0000 0.9323745950234809
2020-11-16T10:48:36.043+0000 -0.25032440836547454
2020-11-16T10:48:36.048+0000 1.1141701800611599
2020-11-16T10:48:36.053+0000 1.1577408343996396
2020-11-16T10:48:36.058+0000 0.4735089125920344
2020-11-16T10:48:36.063+0000 -1.5559289264558278
2020-11-16T10:48:36.068+0000 -0.11080485473390023
2020-11-16T10:48:36.073+0000 0.1536430200356127
2020-11-16T10:48:36.079+0000 1.2851073161790278
2020-11-16T10:48:36.084+0000 -0.9717966387140513
2020-11-16T10:48:36.089+0000 0.4604981927819666
2020-11-16T10:48:36.094+0000 0.4825924627571432
2020-11-16T10:48:36.099+0000 1.8907687599342153
2020-11-16T10:48:36.104+0000 1.5027092114554406
2020-11-16T10:48:36.110+0000 0.4892227077808574
2020-11-16T10:48:36.115+0000 2.2742380779964306
2020-11-16T10:48:36.120+0000 5.93203161994782e-3
2020-11-16T10:48:36.125+0000 0.9357077683018076
2020-11-16T10:48:36.130+0000 1.6452901327178684
2020-11-16T10:48:36.136+0000 2.5989481778450294
2020-11-16T10:48:36.141+0000 3.1233030636814103
2020-11-16T10:48:36.146+0000 2.14412876458466
2020-11-16T10:48:36.151+0000 0.8645332371791754
2020-11-16T10:48:36.157+0000 1.7396751361758789
2020-11-16T10:48:36.163+0000 3.406726808728102
2020-11-16T10:48:36.169+0000 0.27592904706426413
2020-11-16T10:48:36.174+0000 -0.47288172874607715
2020-11-16T10:48:36.179+0000 3.1581200247451022
2020-11-16T10:48:36.184+0000 2.3502844371874003
2020-11-16T10:48:36.190+0000 2.3604518998272104
2020-11-16T10:48:36.195+0000 2.875582435906723
2020-11-16T10:48:36.200+0000 1.802101533727158
2020-11-16T10:48:36.205+0000 2.158082491464444
2020-11-16T10:48:36.210+0000 -0.5284223682158626
2020-11-16T10:48:36.216+0000 1.929919317533868e-2
2020-11-16T10:48:36.221+0000 1.948485504832782
2020-11-16T10:48:36.226+0000 0.49379467644006303
2020-11-16T10:48:36.231+0000 0.33811694243690293
2020-11-16T10:48:36.236+0000 1.332171769010618
2020-11-16T10:48:36.242+0000 0.6994701270153069
2020-11-16T10:48:36.247+0000 -0.413721820026016
2020-11-16T10:48:36.252+0000 -1.5522089380783108
2020-11-16T10:48:36.257+0000 2.161396170492705
2020-11-16T10:48:36.262+0000 2.333496950423164e-2
2020-11-16T10:48:36.268+0000 -0.10913840839170796
2020-11-16T10:48:36.273+0000 1.1299228472291496
2020-11-16T10:48:36.278+0000 2.4274358384176584
2020-11-16T10:48:36.283+0000 1.9359707345891741
2020-11-16T10:48:36.288+0000 3.487722218477596
2020-11-16T10:48:36.294+0000 0.9990127159196325
2020-11-16T10:48:36.299+0000 -1.0398429191328207
2020-11-16T10:48:36.304+0000 0.3005833334887211
2020-11-16T10:48:36.309+0000 -0.7334628100431295
2020-11-16T10:48:36.314+0000 0.4835865602253189
2020-11-16T10:48:36.320+0000 0.5246945471836175
2020-11-16T10:48:36.325+0000 0.8469783573593253
2020-11-16T10:48:36.330+0000 0.8359162587262456
2020-11-16T10:48:36.335+0000 0.7772016511976113
2020-11-16T10:48:36.340+0000 -0.39849883029666944
2020-11-16T10:48:36.345+0000 1.8703097604547239
2020-11-16T10:48:36.350+0000 2.682932324516024
2020-11-16T10:48:36.356+0000 0.46996888720103236
2020-11-16T10:48:36.361+0000 -7.881388366585762e-2
2020-11-16T10:48:36.366+0000 2.1043645061434084
2020-11-16T10:48:36.371+0000 0.6195230903468327
2020-11-16T10:48:36.376+0000 -0.23170755440676594
2020-11-16T10:48:36.381+0000 0.3918168388047796
2020-11-16T10:48:36.386+0000 0.22086080450987344
2020-11-16T10:48:36.392+0000 1.5182059037248368
2020-11-16T10:48:36.397+0000 1.6442851975073318
2020-11-16T10:48:36.402+0000 0.3979663516003099
2020-11-16T10:48:42.690+0000 2.0531657985840983
2020-11-16T10:48:42.696+0000 1.7928797637680196
2020-11-16T10:48:42.701+0000 2.9329556976986013
2020-11-16T10:48:42.706+0000 1.1087520027663345
2020-11-16T10:48:42.711+0000 1.2115868818351045
2020-11-16T10:48:42.716+0000 1.9163661519192294
2020-11-16T10:48:42.722+0000 1.6917128257752045
2020-11-16T10:48:42.727+0000 1.0095879056962782
2020-11-16T10:48:42.732+0000 -0.13611276130309613
2020-11-16T10:48:42.737+0000 2.2939319088848023
2020-11-16T10:48:42.742+0000 1.0723690693732042
2020-11-16T10:48:42.748+0000 2.1452154961792393
2020-11-16T10:48:42.753+0000 0.7259078662420231
2020-11-16T10:48:42.758+0000 2.6599123456452727
2020-11-16T10:48:42.763+0000 0.2519779820647646
2020-11-16T10:48:42.768+0000 2.1670014817546175
2020-11-16T10:48:42.773+0000 0.10506784220981513
2020-11-16T10:48:42.779+0000 2.018185302480656
2020-11-16T10:48:42.784+0000 1.1665983169452525
2020-11-16T10:48:42.789+0000 0.33284879429952463
2020-11-16T10:48:42.794+0000 0.3531339079979545
2020-11-16T10:48:42.799+0000 2.1004784012229245
2020-11-16T10:48:42.805+0000 1.282680965361929
2020-11-16T10:48:42.810+0000 1.2270715852857979
2020-11-16T10:48:42.815+0000 0.858598096986649
2020-11-16T10:48:42.820+0000 2.5040344133072407
2020-11-16T10:48:42.825+0000 1.6541952933075013
2020-11-16T10:48:42.831+0000 0.5329588210461834
2020-11-16T10:48:42.836+0000 2.1274892552565134
2020-11-16T10:48:42.841+0000 1.4668875035709574
2020-11-16T10:48:42.846+0000 1.5382758818248594
2020-11-16T10:48:42.851+0000 1.7428172106530586
2020-11-16T10:48:42.856+0000 1.4727771685178368
2020-11-16T10:48:42.861+0000 1.6023481462981235
2020-11-16T10:48:42.867+0000 1.6577898477375492
2020-11-16T10:48:42.872+0000 5.892056976555449e-2
2020-11-16T10:48:42.877+0000 2.7754262543475523
2020-11-16T10:48:42.882+0000 1.2200523142327606
2020-11-16T10:48:42.887+0000 1.5903756890326521
2020-11-16T10:48:42.893+0000 -1.49547625208842
2020-11-16T10:48:42.898+0000 0.8523817097750093
2020-11-16T10:48:42.903+0000 0.5057853403549346
2020-11-16T10:48:42.908+0000 0.5683629007876065
2020-11-16T10:48:42.913+0000 1.6479513379049497
2020-11-16T10:48:42.918+0000 1.2148679515188867
2020-11-16T10:48:42.924+0000 0.6222019509815193
2020-11-16T10:48:42.929+0000 1.3255067306263184
2020-11-16T10:48:42.934+0000 0.4983375954130155
2020-11-16T10:48:42.939+0000 -8.802709440091383e-2
2020-11-16T10:48:42.944+0000 0.13831985322805507
2020-11-16T10:48:42.949+0000 -0.5487242466777436
2020-11-16T10:48:42.954+0000 -0.32058114510029334
2020-11-16T10:48:42.960+0000 1.8950590840214767
2020-11-16T10:48:42.965+0000 1.0062190610750874
2020-11-16T10:48:42.971+0000 -0.9934439161367286
2020-11-16T10:48:42.976+0000 0.3671557383587293
2020-11-16T10:48:42.981+0000 0.19986189782147756
2020-11-16T10:48:42.986+0000 -0.49653972053539497
2020-11-16T10:48:42.991+0000 0.6848255848767759
2020-11-16T10:48:42.996+0000 1.5219606199148406
2020-11-16T10:48:43.002+0000 1.455086538348867
2020-11-16T10:48:43.007+0000 2.883109155648917
2020-11-16T10:48:43.012+0000 1.8164694435868296
2020-11-16T10:48:43.017+0000 0.6742710281863775
2020-11-16T10:48:43.022+0000 0.5441958963393487
2020-11-16T10:48:43.027+0000 1.0517397813571259
2020-11-16T10:48:43.033+0000 0.8356831003190489
2020-11-16T10:48:43.038+0000 0.8227690076487093
2020-11-16T10:48:43.043+0000 1.4570119880481842
2020-11-16T10:48:43.048+0000 -0.297581775651637
2020-11-16T10:48:43.053+0000 -7.206180041345078e-2
2020-11-16T10:48:43.059+0000 -0.8739444049086391
2020-11-16T10:48:43.064+0000 2.2604530979343074
2020-11-16T10:48:43.069+0000 2.3872947344763027
2020-11-16T10:48:43.074+0000 3.3685772895980124
2020-11-16T10:48:43.079+0000 2.013534739447639
2020-11-16T10:48:43.085+0000 3.368251328412311
2020-11-16T10:48:43.090+0000 0.8953451648220483
2020-11-16T10:48:43.095+0000 9.545874578601765e-2
2020-11-16T10:48:43.100+0000 0.7718477167244377
2020-11-16T10:48:43.105+0000 1.0629106168204554
2020-11-16T10:48:43.110+0000 0.5518190802821734
2020-11-16T10:48:43.116+0000 2.9939679918505853
2020-11-16T10:48:43.121+0000 1.8726021041818661
2020-11-16T10:48:43.126+0000 0.2653885457840085
2020-11-16T10:48:43.131+0000 1.9872672471653996
2020-11-16T10:48:43.136+0000 -0.553166557898946
2020-11-16T10:48:43.141+0000 1.5640591286122745
2020-11-16T10:48:43.147+0000 2.52680639118602
2020-11-16T10:48:43.152+0000 1.80742439492357
2020-11-16T10:48:43.157+0000 2.1955997975781347
2020-11-16T10:48:43.162+0000 0.5980285235875027
2020-11-16T10:48:43.167+0000 -0.2658797956060317
2020-11-16T10:48:43.172+0000 -0.49719135472382137
2020-11-16T10:48:43.178+0000 1.180607461695498
2020-11-16T10:48:43.183+0000 -0.10430878902480734
2020-11-16T10:48:43.188+0000 0.823892717854915
2020-11-16T10:48:43.193+0000 1.666382974377688
2020-11-16T10:48:43.198+0000 3.748395965408928
2020-11-16T10:48:43.204+0000 1.7921581120532326e-2
import org.apache.spark.sql.functions._

// Start running the query that prints the running counts to the console
val query = csvStreamingDF
                 // bround simply rounds the double to the desired decimal place - 0 in our case here. 
                   // see https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html#bround-org.apache.spark.sql.Column-
                   // we are using bround to simply coarsen out data into bins for counts
                 .select(bround($"score", 0).as("binnedScore")) 
                 .groupBy($"binnedScore")
                 .agg(count($"binnedScore") as "binnedScoreCounts")
                 .orderBy($"binnedScore")
                 .writeStream
                 .outputMode("complete")
                 .format("console")
                 .start()
                 
query.awaitTermination() // hit cancel to terminate
-------------------------------------------
Batch: 0
-------------------------------------------
+-----------+-----------------+
|binnedScore|binnedScoreCounts|
+-----------+-----------------+
|       -1.0|                9|
|        0.0|               18|
|        1.0|               41|
|        2.0|               25|
|        3.0|                5|
|        4.0|                1|
|       10.0|                1|
+-----------+-----------------+

-------------------------------------------
Batch: 1
-------------------------------------------
+-----------+-----------------+
|binnedScore|binnedScoreCounts|
+-----------+-----------------+
|       -2.0|                1|
|       -1.0|               13|
|        0.0|               44|
|        1.0|               83|
|        2.0|               46|
|        3.0|               10|
|        4.0|                1|
|       10.0|                1|
|       12.0|                1|
+-----------+-----------------+

-------------------------------------------
Batch: 2
-------------------------------------------
+-----------+-----------------+
|binnedScore|binnedScoreCounts|
+-----------+-----------------+
|       -2.0|                2|
|       -1.0|               20|
|        0.0|               74|
|        1.0|              118|
|        2.0|               70|
|        3.0|               12|
|        4.0|                1|
|        9.0|                1|
|       10.0|                1|
|       12.0|                1|
+-----------+-----------------+

-------------------------------------------
Batch: 3
-------------------------------------------
+-----------+-----------------+
|binnedScore|binnedScoreCounts|
+-----------+-----------------+
|       -2.0|                4|
|       -1.0|               27|
|        0.0|              104|
|        1.0|              144|
|        2.0|               96|
|        3.0|               21|
|        4.0|                1|
|        9.0|                1|
|       10.0|                1|
|       12.0|                1|
+-----------+-----------------+

-------------------------------------------
Batch: 4
-------------------------------------------
+-----------+-----------------+
|binnedScore|binnedScoreCounts|
+-----------+-----------------+
|       -2.0|                4|
|       -1.0|               32|
|        0.0|              125|
|        1.0|              179|
|        2.0|              125|
|        3.0|               30|
|        4.0|                2|
|        9.0|                1|
|       10.0|                1|
|       12.0|                1|
+-----------+-----------------+

Once the above streaming job has processed all the files in the directory, it will continue to "listen" in for new files in the directory. You could for example return to the other notebook 037b_Mix2NormalsStructStreamingFiles and rerun the cell that writes another lot of newer files into the directory and return to this notebook to watch the above streaming job continue with additional batches.

Static and Streaming DataSets

These examples generate streaming DataFrames that are untyped, meaning that the schema of the DataFrame is not checked at compile time, only checked at runtime when the query is submitted. Some operations like map, flatMap, etc. need the type to be known at compile time. To do those, you can convert these untyped streaming DataFrames to typed streaming Datasets using the same methods as static DataFrame. See the SQL Programming Guide for more details. Additionally, more details on the supported streaming sources are discussed later in the document.

Let us make a dataset version of the streaming dataframe.

But first let us try it make the datset from the static dataframe and then apply it to the streming dataframe.

csvStaticDF.printSchema // schema of the static DF
root
 |-- time: timestamp (nullable = true)
 |-- score: double (nullable = true)
import org.apache.spark.sql.types._
import java.sql.Timestamp

// create a case class to make the datset
case class timedScores(time: Timestamp, score: Double)

val csvStaticDS = csvStaticDF.as[timedScores] // create a dataset from the dataframe
import org.apache.spark.sql.types._
import java.sql.Timestamp
defined class timedScores
csvStaticDS: org.apache.spark.sql.Dataset[timedScores] = [time: timestamp, score: double]
csvStaticDS.show(5,false) // looks like we got the dataset we want with strong typing
+-----------------------+--------------------+
|time                   |score               |
+-----------------------+--------------------+
|2020-11-16 10:48:25.294|0.21791376679544772 |
|2020-11-16 10:48:25.299|0.011291967445604012|
|2020-11-16 10:48:25.304|-0.30293144696154806|
|2020-11-16 10:48:25.309|0.4303254534802833  |
|2020-11-16 10:48:25.314|1.5521304466388752  |
+-----------------------+--------------------+
only showing top 5 rows

Now let us use the same code for making a streaming dataset.

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import java.sql.Timestamp

// create a case class to make the datset
case class timedScores(time: Timestamp, score: Double)

val csvStreamingDS = csvStreamingDF.as[timedScores] // create a dataset from the dataframe

// Start running the query that prints the running counts to the console
val query = csvStreamingDS
                  // bround simply rounds the double to the desired decimal place - 0 in our case here. 
                   // see https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html#bround-org.apache.spark.sql.Column-
                   // we are using bround to simply coarsen out data into bins for counts
                 .select(bround($"score", 0).as("binnedScore")) 
                 .groupBy($"binnedScore")
                 .agg(count($"binnedScore") as "binnedScoreCounts")
                 .orderBy($"binnedScore")
                 .writeStream
                 .outputMode("complete")
                 .format("console")
                 .start()

query.awaitTermination() // hit cancel to terminate
-------------------------------------------
Batch: 0
-------------------------------------------
+-----------+-----------------+
|binnedScore|binnedScoreCounts|
+-----------+-----------------+
|       -1.0|                9|
|        0.0|               18|
|        1.0|               41|
|        2.0|               25|
|        3.0|                5|
|        4.0|                1|
|       10.0|                1|
+-----------+-----------------+

-------------------------------------------
Batch: 1
-------------------------------------------
+-----------+-----------------+
|binnedScore|binnedScoreCounts|
+-----------+-----------------+
|       -2.0|                1|
|       -1.0|               13|
|        0.0|               44|
|        1.0|               83|
|        2.0|               46|
|        3.0|               10|
|        4.0|                1|
|       10.0|                1|
|       12.0|                1|
+-----------+-----------------+

-------------------------------------------
Batch: 2
-------------------------------------------
+-----------+-----------------+
|binnedScore|binnedScoreCounts|
+-----------+-----------------+
|       -2.0|                2|
|       -1.0|               20|
|        0.0|               74|
|        1.0|              118|
|        2.0|               70|
|        3.0|               12|
|        4.0|                1|
|        9.0|                1|
|       10.0|                1|
|       12.0|                1|
+-----------+-----------------+

-------------------------------------------
Batch: 3
-------------------------------------------
+-----------+-----------------+
|binnedScore|binnedScoreCounts|
+-----------+-----------------+
|       -2.0|                4|
|       -1.0|               27|
|        0.0|              104|
|        1.0|              144|
|        2.0|               96|
|        3.0|               21|
|        4.0|                1|
|        9.0|                1|
|       10.0|                1|
|       12.0|                1|
+-----------+-----------------+

-------------------------------------------
Batch: 4
-------------------------------------------
+-----------+-----------------+
|binnedScore|binnedScoreCounts|
+-----------+-----------------+
|       -2.0|                4|
|       -1.0|               32|
|        0.0|              125|
|        1.0|              179|
|        2.0|              125|
|        3.0|               30|
|        4.0|                2|
|        9.0|                1|
|       10.0|                1|
|       12.0|                1|
+-----------+-----------------+

Window Operations on Event Time

Aggregations over a sliding event-time window are straightforward with Structured Streaming and are very similar to grouped aggregations. In a grouped aggregation, aggregate values (e.g. counts) are maintained for each unique value in the user-specified grouping column. In case of window-based aggregations, aggregate values are maintained for each window the event-time of a row falls into. Let’s understand this with an illustration.

Imagine our quick example is modified and the stream now contains lines along with the time when the line was generated. Instead of running word counts, we want to count words within 10 minute windows, updating every 5 minutes. That is, word counts in words received between 10 minute windows 12:00 - 12:10, 12:05 - 12:15, 12:10 - 12:20, etc. Note that 12:00 - 12:10 means data that arrived after 12:00 but before 12:10. Now, consider a word that was received at 12:07. This word should increment the counts corresponding to two windows 12:00 - 12:10 and 12:05 - 12:15. So the counts will be indexed by both, the grouping key (i.e. the word) and the window (can be calculated from the event-time).

The result tables would look something like the following.

Window Operations

Since this windowing is similar to grouping, in code, you can use groupBy() and window() operations to express windowed aggregations. You can see the full code for the below examples in Scala/Java/Python.

Make sure the streaming job with animal names is running (or finished running) with files in /datasets/streamingFiles directory - this is the Quick Example in 037a_FilesForStructuredStreaming notebook.

display(dbutils.fs.ls("/datasets/streamingFiles"))
spark.read.format("text").load("/datasets/streamingFiles").show(5,false) // let's just read five  entries
+----------------------------------+
|value                             |
+----------------------------------+
|2020-11-16 10:30:04+00:00; bat rat|
|2020-11-16 10:30:06+00:00; rat bat|
|2020-11-16 10:30:08+00:00; rat dog|
|2020-11-16 10:30:10+00:00; rat cat|
|2020-11-16 10:30:12+00:00; cat bat|
+----------------------------------+
only showing top 5 rows
import spark.implicits._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import java.sql.Timestamp
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

// a static DS is convenient to work with
val csvStaticDS = spark
   .read
   .option("sep", ";") // delimiter is ';'
   .csv("/datasets/streamingFiles/*.log")    // Equivalent to format("csv").load("/path/to/directory")
   .toDF("time","animals")
   .select(unix_timestamp($"time", "yyyy-MM-dd HH:mm:ss").cast(TimestampType).as("timestamp"), $"animals")
   .as[(Timestamp, String)]
   .flatMap(
     line => line._2.split(" ")
                 .filter(_ != "") // remove empty string from leading whitespace
                 .map(animal => (line._1, animal))
    )
   .toDF("timestamp", "animal")
   .as[(Timestamp, String)]
   
import spark.implicits._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import java.sql.Timestamp
csvStaticDS: org.apache.spark.sql.Dataset[(java.sql.Timestamp, String)] = [timestamp: timestamp, animal: string]
csvStaticDS.show(5,false)
+-------------------+------+
|timestamp          |animal|
+-------------------+------+
|2020-11-16 11:00:01|owl   |
|2020-11-16 11:00:01|cat   |
|2020-11-16 11:00:03|dog   |
|2020-11-16 11:00:03|pig   |
|2020-11-16 11:00:05|rat   |
+-------------------+------+
only showing top 5 rows
//make a user-specified schema for structured streaming
val userSchema = new StructType()
                      .add("time", "String") // we will read it as String and then convert into timestamp later
                      .add("animals", "String")

// streaming DS
val csvStreamingDS = spark
// the next three lines are needed for structured streaming from file streams
  .readStream // for streaming
  .option("MaxFilesPerTrigger", 1) //  for streaming
  .schema(userSchema) // for streaming
  .option("sep", ";") // delimiter is ';'
  .csv("/datasets/streamingFiles/*.log")    // Equivalent to format("csv").load("/path/to/directory")
  .toDF("time","animals")
  .select(unix_timestamp($"time", "yyyy-MM-dd HH:mm:ss").cast(TimestampType).as("timestamp"), $"animals")
  //.toDF("time","animals")
  .as[(Timestamp, String)]
  .flatMap(
     line => line._2.split(" ").map(animal => (line._1, animal))
    )
  .filter(_._2 != "")
  .toDF("timestamp", "animal")
  .as[(Timestamp, String)]
userSchema: org.apache.spark.sql.types.StructType = StructType(StructField(time,StringType,true), StructField(animals,StringType,true))
csvStreamingDS: org.apache.spark.sql.Dataset[(java.sql.Timestamp, String)] = [timestamp: timestamp, animal: string]
display(csvStreamingDS) // evaluate to see the animal words with timestamps streaming in
timestamp animal
2020-11-16T10:30:08.000+0000 rat
2020-11-16T10:30:08.000+0000 dog
2020-11-16T10:34:23.000+0000 bat
2020-11-16T10:34:23.000+0000 pig
2020-11-16T10:45:59.000+0000 cat
2020-11-16T10:45:59.000+0000 bat
2020-11-16T10:47:54.000+0000 pig
2020-11-16T10:47:54.000+0000 dog
2020-11-16T10:50:05.000+0000 dog
2020-11-16T10:50:05.000+0000 pig
2020-11-16T10:54:10.000+0000 bat
2020-11-16T10:54:10.000+0000 rat
2020-11-16T10:58:06.000+0000 rat
2020-11-16T10:58:06.000+0000 bat
2020-11-16T10:30:54.000+0000 cat
2020-11-16T10:30:54.000+0000 pig
2020-11-16T10:31:17.000+0000 dog
2020-11-16T10:31:17.000+0000 rat
2020-11-16T10:32:05.000+0000 cat
2020-11-16T10:32:05.000+0000 dog
2020-11-16T10:35:07.000+0000 pig
2020-11-16T10:35:07.000+0000 rat
2020-11-16T10:35:55.000+0000 rat
2020-11-16T10:35:55.000+0000 pig
2020-11-16T10:37:10.000+0000 dog
2020-11-16T10:37:10.000+0000 cat
2020-11-16T10:38:58.000+0000 owl
2020-11-16T10:38:58.000+0000 dog
2020-11-16T10:41:57.000+0000 rat
2020-11-16T10:41:57.000+0000 bat
2020-11-16T10:45:25.000+0000 dog
2020-11-16T10:45:25.000+0000 bat
2020-11-16T10:45:43.000+0000 pig
2020-11-16T10:45:43.000+0000 owl
2020-11-16T10:47:16.000+0000 rat
2020-11-16T10:47:16.000+0000 dog
2020-11-16T10:53:52.000+0000 rat
2020-11-16T10:53:52.000+0000 dog
2020-11-16T10:55:12.000+0000 dog
2020-11-16T10:55:12.000+0000 pig
2020-11-16T10:32:01.000+0000 owl
2020-11-16T10:32:01.000+0000 bat
2020-11-16T10:32:25.000+0000 cat
2020-11-16T10:32:25.000+0000 pig
2020-11-16T10:33:29.000+0000 rat
2020-11-16T10:33:29.000+0000 bat
2020-11-16T10:34:01.000+0000 pig
2020-11-16T10:34:01.000+0000 cat
2020-11-16T10:34:37.000+0000 dog
2020-11-16T10:34:37.000+0000 pig
2020-11-16T10:42:11.000+0000 pig
2020-11-16T10:42:11.000+0000 cat
2020-11-16T10:42:51.000+0000 pig
2020-11-16T10:42:51.000+0000 rat
2020-11-16T10:49:06.000+0000 pig
2020-11-16T10:49:06.000+0000 cat
// Group the data by window and word and compute the count of each group
val windowDuration = "180 seconds"
val slideDuration = "90 seconds"
val windowedCounts = csvStreamingDS.groupBy(
      window($"timestamp", windowDuration, slideDuration), $"animal"
    ).count().orderBy("window")

// Start running the query that prints the windowed word counts to the console
val query = windowedCounts.writeStream
      .outputMode("complete")
      .format("console")
      .option("truncate", "false")
      .start()

query.awaitTermination()
-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------------------------+------+-----+
|window                                    |animal|count|
+------------------------------------------+------+-----+
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|dog   |1    |
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|rat   |1    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|rat   |1    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|dog   |1    |
+------------------------------------------+------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------------+------+-----+
|window                                    |animal|count|
+------------------------------------------+------+-----+
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|dog   |1    |
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|rat   |1    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|dog   |1    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|rat   |1    |
|[2020-11-16 10:31:30, 2020-11-16 10:34:30]|bat   |1    |
|[2020-11-16 10:31:30, 2020-11-16 10:34:30]|pig   |1    |
|[2020-11-16 10:33:00, 2020-11-16 10:36:00]|bat   |1    |
|[2020-11-16 10:33:00, 2020-11-16 10:36:00]|pig   |1    |
+------------------------------------------+------+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+------------------------------------------+------+-----+
|window                                    |animal|count|
+------------------------------------------+------+-----+
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|dog   |1    |
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|rat   |1    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|rat   |1    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|dog   |1    |
|[2020-11-16 10:31:30, 2020-11-16 10:34:30]|pig   |1    |
|[2020-11-16 10:31:30, 2020-11-16 10:34:30]|bat   |1    |
|[2020-11-16 10:33:00, 2020-11-16 10:36:00]|pig   |1    |
|[2020-11-16 10:33:00, 2020-11-16 10:36:00]|bat   |1    |
|[2020-11-16 10:43:30, 2020-11-16 10:46:30]|bat   |1    |
|[2020-11-16 10:43:30, 2020-11-16 10:46:30]|cat   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|cat   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|bat   |1    |
+------------------------------------------+------+-----+

-------------------------------------------
Batch: 3
-------------------------------------------
+------------------------------------------+------+-----+
|window                                    |animal|count|
+------------------------------------------+------+-----+
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|dog   |1    |
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|rat   |1    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|rat   |1    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|dog   |1    |
|[2020-11-16 10:31:30, 2020-11-16 10:34:30]|bat   |1    |
|[2020-11-16 10:31:30, 2020-11-16 10:34:30]|pig   |1    |
|[2020-11-16 10:33:00, 2020-11-16 10:36:00]|pig   |1    |
|[2020-11-16 10:33:00, 2020-11-16 10:36:00]|bat   |1    |
|[2020-11-16 10:43:30, 2020-11-16 10:46:30]|bat   |1    |
|[2020-11-16 10:43:30, 2020-11-16 10:46:30]|cat   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|pig   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|bat   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|dog   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|cat   |1    |
|[2020-11-16 10:46:30, 2020-11-16 10:49:30]|dog   |1    |
|[2020-11-16 10:46:30, 2020-11-16 10:49:30]|pig   |1    |
+------------------------------------------+------+-----+

-------------------------------------------
Batch: 4
-------------------------------------------
+------------------------------------------+------+-----+
|window                                    |animal|count|
+------------------------------------------+------+-----+
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|dog   |1    |
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|rat   |1    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|dog   |1    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|rat   |1    |
|[2020-11-16 10:31:30, 2020-11-16 10:34:30]|bat   |1    |
|[2020-11-16 10:31:30, 2020-11-16 10:34:30]|pig   |1    |
|[2020-11-16 10:33:00, 2020-11-16 10:36:00]|pig   |1    |
|[2020-11-16 10:33:00, 2020-11-16 10:36:00]|bat   |1    |
|[2020-11-16 10:43:30, 2020-11-16 10:46:30]|bat   |1    |
|[2020-11-16 10:43:30, 2020-11-16 10:46:30]|cat   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|pig   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|bat   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|dog   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|cat   |1    |
|[2020-11-16 10:46:30, 2020-11-16 10:49:30]|dog   |1    |
|[2020-11-16 10:46:30, 2020-11-16 10:49:30]|pig   |1    |
|[2020-11-16 10:48:00, 2020-11-16 10:51:00]|pig   |1    |
|[2020-11-16 10:48:00, 2020-11-16 10:51:00]|dog   |1    |
|[2020-11-16 10:49:30, 2020-11-16 10:52:30]|dog   |1    |
|[2020-11-16 10:49:30, 2020-11-16 10:52:30]|pig   |1    |
+------------------------------------------+------+-----+

-------------------------------------------
Batch: 5
-------------------------------------------
+------------------------------------------+------+-----+
|window                                    |animal|count|
+------------------------------------------+------+-----+
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|dog   |1    |
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|rat   |1    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|rat   |1    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|dog   |1    |
|[2020-11-16 10:31:30, 2020-11-16 10:34:30]|bat   |1    |
|[2020-11-16 10:31:30, 2020-11-16 10:34:30]|pig   |1    |
|[2020-11-16 10:33:00, 2020-11-16 10:36:00]|pig   |1    |
|[2020-11-16 10:33:00, 2020-11-16 10:36:00]|bat   |1    |
|[2020-11-16 10:43:30, 2020-11-16 10:46:30]|cat   |1    |
|[2020-11-16 10:43:30, 2020-11-16 10:46:30]|bat   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|cat   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|dog   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|bat   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|pig   |1    |
|[2020-11-16 10:46:30, 2020-11-16 10:49:30]|dog   |1    |
|[2020-11-16 10:46:30, 2020-11-16 10:49:30]|pig   |1    |
|[2020-11-16 10:48:00, 2020-11-16 10:51:00]|pig   |1    |
|[2020-11-16 10:48:00, 2020-11-16 10:51:00]|dog   |1    |
|[2020-11-16 10:49:30, 2020-11-16 10:52:30]|pig   |1    |
|[2020-11-16 10:49:30, 2020-11-16 10:52:30]|dog   |1    |
+------------------------------------------+------+-----+
only showing top 20 rows

-------------------------------------------
Batch: 6
-------------------------------------------
+------------------------------------------+------+-----+
|window                                    |animal|count|
+------------------------------------------+------+-----+
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|rat   |1    |
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|dog   |1    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|rat   |1    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|dog   |1    |
|[2020-11-16 10:31:30, 2020-11-16 10:34:30]|bat   |1    |
|[2020-11-16 10:31:30, 2020-11-16 10:34:30]|pig   |1    |
|[2020-11-16 10:33:00, 2020-11-16 10:36:00]|pig   |1    |
|[2020-11-16 10:33:00, 2020-11-16 10:36:00]|bat   |1    |
|[2020-11-16 10:43:30, 2020-11-16 10:46:30]|cat   |1    |
|[2020-11-16 10:43:30, 2020-11-16 10:46:30]|bat   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|pig   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|bat   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|dog   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|cat   |1    |
|[2020-11-16 10:46:30, 2020-11-16 10:49:30]|dog   |1    |
|[2020-11-16 10:46:30, 2020-11-16 10:49:30]|pig   |1    |
|[2020-11-16 10:48:00, 2020-11-16 10:51:00]|pig   |1    |
|[2020-11-16 10:48:00, 2020-11-16 10:51:00]|dog   |1    |
|[2020-11-16 10:49:30, 2020-11-16 10:52:30]|pig   |1    |
|[2020-11-16 10:49:30, 2020-11-16 10:52:30]|dog   |1    |
+------------------------------------------+------+-----+
only showing top 20 rows

-------------------------------------------
Batch: 7
-------------------------------------------
+------------------------------------------+------+-----+
|window                                    |animal|count|
+------------------------------------------+------+-----+
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|dog   |1    |
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|pig   |1    |
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|rat   |1    |
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|cat   |1    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|rat   |1    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|cat   |1    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|dog   |1    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|pig   |1    |
|[2020-11-16 10:31:30, 2020-11-16 10:34:30]|bat   |1    |
|[2020-11-16 10:31:30, 2020-11-16 10:34:30]|pig   |1    |
|[2020-11-16 10:33:00, 2020-11-16 10:36:00]|bat   |1    |
|[2020-11-16 10:33:00, 2020-11-16 10:36:00]|pig   |1    |
|[2020-11-16 10:43:30, 2020-11-16 10:46:30]|bat   |1    |
|[2020-11-16 10:43:30, 2020-11-16 10:46:30]|cat   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|pig   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|bat   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|dog   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|cat   |1    |
|[2020-11-16 10:46:30, 2020-11-16 10:49:30]|dog   |1    |
|[2020-11-16 10:46:30, 2020-11-16 10:49:30]|pig   |1    |
+------------------------------------------+------+-----+
only showing top 20 rows

Handling Late Data and Watermarking

Now consider what happens if one of the events arrives late to the application. For example, say, a word generated at 12:04 (i.e. event time) could be received by the application at 12:11. The application should use the time 12:04 instead of 12:11 to update the older counts for the window 12:00 - 12:10. This occurs naturally in our window-based grouping – Structured Streaming can maintain the intermediate state for partial aggregates for a long period of time such that late data can update aggregates of old windows correctly, as illustrated below.

Handling Late Data

However, to run this query for days, it’s necessary for the system to bound the amount of intermediate in-memory state it accumulates. This means the system needs to know when an old aggregate can be dropped from the in-memory state because the application is not going to receive late data for that aggregate any more. To enable this, in Spark 2.1, we have introduced watermarking, which lets the engine automatically track the current event time in the data and attempt to clean up old state accordingly. You can define the watermark of a query by specifying the event time column and the threshold on how late the data is expected to be in terms of event time. For a specific window starting at time T, the engine will maintain state and allow late data to update the state until (max event time seen by the engine - late threshold > T). In other words, late data within the threshold will be aggregated, but data later than the threshold will be dropped. Let’s understand this with an example. We can easily define watermarking on the previous example using withWatermark() as shown below.

// Group the data by window and word and compute the count of each group
val windowDuration = "180 seconds"
val slideDuration = "90 seconds"
val watermarkDuration = "10 minutes"
val windowedCounts = csvStreamingDS
     .withWatermark("timestamp", watermarkDuration)
     .groupBy(
      window($"timestamp", windowDuration, slideDuration), $"animal"
    ).count().orderBy("window")

// Start running the query that prints the windowed word counts to the console
val query = windowedCounts.writeStream
      .outputMode("complete")
      .format("console")
      .option("truncate", "false")
      .start()

query.awaitTermination()
-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------------------------+------+-----+
|window                                    |animal|count|
+------------------------------------------+------+-----+
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|rat   |1    |
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|dog   |1    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|rat   |1    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|dog   |1    |
+------------------------------------------+------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------------+------+-----+
|window                                    |animal|count|
+------------------------------------------+------+-----+
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|dog   |1    |
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|rat   |1    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|rat   |1    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|dog   |1    |
|[2020-11-16 10:31:30, 2020-11-16 10:34:30]|bat   |1    |
|[2020-11-16 10:31:30, 2020-11-16 10:34:30]|pig   |1    |
|[2020-11-16 10:33:00, 2020-11-16 10:36:00]|pig   |1    |
|[2020-11-16 10:33:00, 2020-11-16 10:36:00]|bat   |1    |
+------------------------------------------+------+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+------------------------------------------+------+-----+
|window                                    |animal|count|
+------------------------------------------+------+-----+
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|dog   |1    |
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|rat   |1    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|dog   |1    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|rat   |1    |
|[2020-11-16 10:31:30, 2020-11-16 10:34:30]|bat   |1    |
|[2020-11-16 10:31:30, 2020-11-16 10:34:30]|pig   |1    |
|[2020-11-16 10:33:00, 2020-11-16 10:36:00]|pig   |1    |
|[2020-11-16 10:33:00, 2020-11-16 10:36:00]|bat   |1    |
|[2020-11-16 10:43:30, 2020-11-16 10:46:30]|cat   |1    |
|[2020-11-16 10:43:30, 2020-11-16 10:46:30]|bat   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|cat   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|bat   |1    |
+------------------------------------------+------+-----+

-------------------------------------------
Batch: 3
-------------------------------------------
+------------------------------------------+------+-----+
|window                                    |animal|count|
+------------------------------------------+------+-----+
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|rat   |1    |
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|dog   |1    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|rat   |1    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|dog   |1    |
|[2020-11-16 10:31:30, 2020-11-16 10:34:30]|bat   |1    |
|[2020-11-16 10:31:30, 2020-11-16 10:34:30]|pig   |1    |
|[2020-11-16 10:33:00, 2020-11-16 10:36:00]|bat   |1    |
|[2020-11-16 10:33:00, 2020-11-16 10:36:00]|pig   |1    |
|[2020-11-16 10:43:30, 2020-11-16 10:46:30]|bat   |1    |
|[2020-11-16 10:43:30, 2020-11-16 10:46:30]|cat   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|pig   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|bat   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|dog   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|cat   |1    |
|[2020-11-16 10:46:30, 2020-11-16 10:49:30]|pig   |1    |
|[2020-11-16 10:46:30, 2020-11-16 10:49:30]|dog   |1    |
+------------------------------------------+------+-----+

-------------------------------------------
Batch: 4
-------------------------------------------
+------------------------------------------+------+-----+
|window                                    |animal|count|
+------------------------------------------+------+-----+
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|dog   |1    |
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|rat   |1    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|dog   |1    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|rat   |1    |
|[2020-11-16 10:31:30, 2020-11-16 10:34:30]|pig   |1    |
|[2020-11-16 10:31:30, 2020-11-16 10:34:30]|bat   |1    |
|[2020-11-16 10:33:00, 2020-11-16 10:36:00]|pig   |1    |
|[2020-11-16 10:33:00, 2020-11-16 10:36:00]|bat   |1    |
|[2020-11-16 10:43:30, 2020-11-16 10:46:30]|cat   |1    |
|[2020-11-16 10:43:30, 2020-11-16 10:46:30]|bat   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|cat   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|dog   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|pig   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|bat   |1    |
|[2020-11-16 10:46:30, 2020-11-16 10:49:30]|dog   |1    |
|[2020-11-16 10:46:30, 2020-11-16 10:49:30]|pig   |1    |
|[2020-11-16 10:48:00, 2020-11-16 10:51:00]|dog   |1    |
|[2020-11-16 10:48:00, 2020-11-16 10:51:00]|pig   |1    |
|[2020-11-16 10:49:30, 2020-11-16 10:52:30]|dog   |1    |
|[2020-11-16 10:49:30, 2020-11-16 10:52:30]|pig   |1    |
+------------------------------------------+------+-----+

-------------------------------------------
Batch: 5
-------------------------------------------
+------------------------------------------+------+-----+
|window                                    |animal|count|
+------------------------------------------+------+-----+
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|dog   |1    |
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|rat   |1    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|dog   |1    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|rat   |1    |
|[2020-11-16 10:31:30, 2020-11-16 10:34:30]|bat   |1    |
|[2020-11-16 10:31:30, 2020-11-16 10:34:30]|pig   |1    |
|[2020-11-16 10:33:00, 2020-11-16 10:36:00]|pig   |1    |
|[2020-11-16 10:33:00, 2020-11-16 10:36:00]|bat   |1    |
|[2020-11-16 10:43:30, 2020-11-16 10:46:30]|cat   |1    |
|[2020-11-16 10:43:30, 2020-11-16 10:46:30]|bat   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|cat   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|dog   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|pig   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|bat   |1    |
|[2020-11-16 10:46:30, 2020-11-16 10:49:30]|dog   |1    |
|[2020-11-16 10:46:30, 2020-11-16 10:49:30]|pig   |1    |
|[2020-11-16 10:48:00, 2020-11-16 10:51:00]|dog   |1    |
|[2020-11-16 10:48:00, 2020-11-16 10:51:00]|pig   |1    |
|[2020-11-16 10:49:30, 2020-11-16 10:52:30]|dog   |1    |
|[2020-11-16 10:49:30, 2020-11-16 10:52:30]|pig   |1    |
+------------------------------------------+------+-----+
only showing top 20 rows

-------------------------------------------
Batch: 6
-------------------------------------------
+------------------------------------------+------+-----+
|window                                    |animal|count|
+------------------------------------------+------+-----+
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|rat   |1    |
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|dog   |1    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|rat   |1    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|dog   |1    |
|[2020-11-16 10:31:30, 2020-11-16 10:34:30]|bat   |1    |
|[2020-11-16 10:31:30, 2020-11-16 10:34:30]|pig   |1    |
|[2020-11-16 10:33:00, 2020-11-16 10:36:00]|bat   |1    |
|[2020-11-16 10:33:00, 2020-11-16 10:36:00]|pig   |1    |
|[2020-11-16 10:43:30, 2020-11-16 10:46:30]|bat   |1    |
|[2020-11-16 10:43:30, 2020-11-16 10:46:30]|cat   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|pig   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|bat   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|dog   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|cat   |1    |
|[2020-11-16 10:46:30, 2020-11-16 10:49:30]|pig   |1    |
|[2020-11-16 10:46:30, 2020-11-16 10:49:30]|dog   |1    |
|[2020-11-16 10:48:00, 2020-11-16 10:51:00]|pig   |1    |
|[2020-11-16 10:48:00, 2020-11-16 10:51:00]|dog   |1    |
|[2020-11-16 10:49:30, 2020-11-16 10:52:30]|pig   |1    |
|[2020-11-16 10:49:30, 2020-11-16 10:52:30]|dog   |1    |
+------------------------------------------+------+-----+
only showing top 20 rows

-------------------------------------------
Batch: 7
-------------------------------------------
+------------------------------------------+------+-----+
|window                                    |animal|count|
+------------------------------------------+------+-----+
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|pig   |1    |
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|cat   |1    |
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|rat   |1    |
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|dog   |1    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|pig   |1    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|rat   |1    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|dog   |1    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|cat   |1    |
|[2020-11-16 10:31:30, 2020-11-16 10:34:30]|bat   |1    |
|[2020-11-16 10:31:30, 2020-11-16 10:34:30]|pig   |1    |
|[2020-11-16 10:33:00, 2020-11-16 10:36:00]|bat   |1    |
|[2020-11-16 10:33:00, 2020-11-16 10:36:00]|pig   |1    |
|[2020-11-16 10:43:30, 2020-11-16 10:46:30]|bat   |1    |
|[2020-11-16 10:43:30, 2020-11-16 10:46:30]|cat   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|pig   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|bat   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|dog   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|cat   |1    |
|[2020-11-16 10:46:30, 2020-11-16 10:49:30]|pig   |1    |
|[2020-11-16 10:46:30, 2020-11-16 10:49:30]|dog   |1    |
+------------------------------------------+------+-----+
only showing top 20 rows

-------------------------------------------
Batch: 8
-------------------------------------------
+------------------------------------------+------+-----+
|window                                    |animal|count|
+------------------------------------------+------+-----+
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|dog   |2    |
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|rat   |2    |
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|pig   |1    |
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|cat   |1    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|cat   |1    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|rat   |2    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|pig   |1    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|dog   |2    |
|[2020-11-16 10:31:30, 2020-11-16 10:34:30]|pig   |1    |
|[2020-11-16 10:31:30, 2020-11-16 10:34:30]|bat   |1    |
|[2020-11-16 10:33:00, 2020-11-16 10:36:00]|pig   |1    |
|[2020-11-16 10:33:00, 2020-11-16 10:36:00]|bat   |1    |
|[2020-11-16 10:43:30, 2020-11-16 10:46:30]|cat   |1    |
|[2020-11-16 10:43:30, 2020-11-16 10:46:30]|bat   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|cat   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|pig   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|bat   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|dog   |1    |
|[2020-11-16 10:46:30, 2020-11-16 10:49:30]|dog   |1    |
|[2020-11-16 10:46:30, 2020-11-16 10:49:30]|pig   |1    |
+------------------------------------------+------+-----+
only showing top 20 rows

-------------------------------------------
Batch: 9
-------------------------------------------
+------------------------------------------+------+-----+
|window                                    |animal|count|
+------------------------------------------+------+-----+
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|rat   |2    |
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|dog   |2    |
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|cat   |1    |
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|pig   |1    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|cat   |2    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|rat   |2    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|pig   |1    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|dog   |3    |
|[2020-11-16 10:31:30, 2020-11-16 10:34:30]|dog   |1    |
|[2020-11-16 10:31:30, 2020-11-16 10:34:30]|cat   |1    |
|[2020-11-16 10:31:30, 2020-11-16 10:34:30]|bat   |1    |
|[2020-11-16 10:31:30, 2020-11-16 10:34:30]|pig   |1    |
|[2020-11-16 10:33:00, 2020-11-16 10:36:00]|bat   |1    |
|[2020-11-16 10:33:00, 2020-11-16 10:36:00]|pig   |1    |
|[2020-11-16 10:43:30, 2020-11-16 10:46:30]|bat   |1    |
|[2020-11-16 10:43:30, 2020-11-16 10:46:30]|cat   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|pig   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|bat   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|dog   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|cat   |1    |
+------------------------------------------+------+-----+
only showing top 20 rows

-------------------------------------------
Batch: 10
-------------------------------------------
+------------------------------------------+------+-----+
|window                                    |animal|count|
+------------------------------------------+------+-----+
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|dog   |2    |
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|rat   |2    |
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|cat   |1    |
|[2020-11-16 10:28:30, 2020-11-16 10:31:30]|pig   |1    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|cat   |2    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|pig   |1    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|rat   |2    |
|[2020-11-16 10:30:00, 2020-11-16 10:33:00]|dog   |3    |
|[2020-11-16 10:31:30, 2020-11-16 10:34:30]|bat   |1    |
|[2020-11-16 10:31:30, 2020-11-16 10:34:30]|pig   |1    |
|[2020-11-16 10:31:30, 2020-11-16 10:34:30]|cat   |1    |
|[2020-11-16 10:31:30, 2020-11-16 10:34:30]|dog   |1    |
|[2020-11-16 10:33:00, 2020-11-16 10:36:00]|bat   |1    |
|[2020-11-16 10:33:00, 2020-11-16 10:36:00]|rat   |1    |
|[2020-11-16 10:33:00, 2020-11-16 10:36:00]|pig   |2    |
|[2020-11-16 10:34:30, 2020-11-16 10:37:30]|rat   |1    |
|[2020-11-16 10:34:30, 2020-11-16 10:37:30]|pig   |1    |
|[2020-11-16 10:43:30, 2020-11-16 10:46:30]|bat   |1    |
|[2020-11-16 10:43:30, 2020-11-16 10:46:30]|cat   |1    |
|[2020-11-16 10:45:00, 2020-11-16 10:48:00]|pig   |1    |
+------------------------------------------+------+-----+
only showing top 20 rows

In this example, we are defining the watermark of the query on the value of the column “timestamp”, and also defining “10 minutes” as the threshold of how late is the data allowed to be. If this query is run in Update output mode (discussed later in Output Modes section), the engine will keep updating counts of a window in the Result Table until the window is older than the watermark, which lags behind the current event time in column “timestamp” by 10 minutes. Here is an illustration.

Watermarking in Update Mode

As shown in the illustration, the maximum event time tracked by the engine is the blue dashed line, and the watermark set as (max event time - '10 mins') at the beginning of every trigger is the red line For example, when the engine observes the data (12:14, dog), it sets the watermark for the next trigger as 12:04. This watermark lets the engine maintain intermediate state for additional 10 minutes to allow late data to be counted. For example, the data (12:09, cat) is out of order and late, and it falls in windows 12:05 - 12:15 and 12:10 - 12:20. Since, it is still ahead of the watermark 12:04 in the trigger, the engine still maintains the intermediate counts as state and correctly updates the counts of the related windows. However, when the watermark is updated to 12:11, the intermediate state for window (12:00 - 12:10) is cleared, and all subsequent data (e.g. (12:04, donkey)) is considered “too late” and therefore ignored. Note that after every trigger, the updated counts (i.e. purple rows) are written to sink as the trigger output, as dictated by the Update mode.

Some sinks (e.g. files) may not supported fine-grained updates that Update Mode requires. To work with them, we have also support Append Mode, where only the final counts are written to sink. This is illustrated below.

Note that using withWatermark on a non-streaming Dataset is no-op. As the watermark should not affect any batch query in any way, we will ignore it directly.

Watermarking in Append Mode

Similar to the Update Mode earlier, the engine maintains intermediate counts for each window. However, the partial counts are not updated to the Result Table and not written to sink. The engine waits for “10 mins” for late date to be counted, then drops intermediate state of a window < watermark, and appends the final counts to the Result Table/sink. For example, the final counts of window 12:00 - 12:10 is appended to the Result Table only after the watermark is updated to 12:11.

Conditions for watermarking to clean aggregation state It is important to note that the following conditions must be satisfied for the watermarking to clean the state in aggregation queries (as of Spark 2.1.1, subject to change in the future).

  • Output mode must be Append or Update. Complete mode requires all aggregate data to be preserved, and hence cannot use watermarking to drop intermediate state. See the Output Modes section for detailed explanation of the semantics of each output mode.

  • The aggregation must have either the event-time column, or a window on the event-time column.

  • withWatermark must be called on the same column as the timestamp column used in the aggregate. For example, df.withWatermark("time", "1 min").groupBy("time2").count() is invalid in Append output mode, as watermark is defined on a different column from the aggregation column.

  • withWatermark must be called before the aggregation for the watermark details to be used. For example, df.groupBy("time").count().withWatermark("time", "1 min") is invalid in Append output mode.

Join Operations

Streaming DataFrames can be joined with static DataFrames to create new streaming DataFrames. Here are a few examples.

val staticDf = spark.read. ...
val streamingDf = spark.readStream. ...

streamingDf.join(staticDf, "type")          // inner equi-join with a static DF
streamingDf.join(staticDf, "type", "right_join")  // right outer join with a static DF

Streaming Deduplication

You can deduplicate records in data streams using a unique identifier in the events. This is exactly same as deduplication on static using a unique identifier column. The query will store the necessary amount of data from previous records such that it can filter duplicate records. Similar to aggregations, you can use deduplication with or without watermarking.

  • With watermark - If there is a upper bound on how late a duplicate record may arrive, then you can define a watermark on a event time column and deduplicate using both the guid and the event time columns. The query will use the watermark to remove old state data from past records that are not expected to get any duplicates any more. This bounds the amount of the state the query has to maintain.

  • Without watermark - Since there are no bounds on when a duplicate record may arrive, the query stores the data from all the past records as state.

    val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

    // Without watermark using guid column
    streamingDf.dropDuplicates("guid")

    // With watermark using guid and eventTime columns
    streamingDf
      .withWatermark("eventTime", "10 seconds")
      .dropDuplicates("guid", "eventTime")

Arbitrary Stateful Operations

Many uscases require more advanced stateful operations than aggregations. For example, in many usecases, you have to track sessions from data streams of events. For doing such sessionization, you will have to save arbitrary types of data as state, and perform arbitrary operations on the state using the data stream events in every trigger. Since Spark 2.2, this can be done using the operation mapGroupsWithState and the more powerful operation flatMapGroupsWithState. Both operations allow you to apply user-defined code on grouped Datasets to update user-defined state. For more concrete details, take a look at the API documentation (Scala/Java) and the examples (Scala/Java).

Unsupported Operations

There are a few DataFrame/Dataset operations that are not supported with streaming DataFrames/Datasets. Some of them are as follows.

  • Multiple streaming aggregations (i.e. a chain of aggregations on a streaming DF) are not yet supported on streaming Datasets.

  • Limit and take first N rows are not supported on streaming Datasets.

  • Distinct operations on streaming Datasets are not supported.

  • Sorting operations are supported on streaming Datasets only after an aggregation and in Complete Output Mode.

  • Outer joins between a streaming and a static Datasets are conditionally supported.

    • Full outer join with a streaming Dataset is not supported

    • Left outer join with a streaming Dataset on the right is not supported

    • Right outer join with a streaming Dataset on the left is not supported

  • Any kind of joins between two streaming Datasets is not yet supported.

In addition, there are some Dataset methods that will not work on streaming Datasets. They are actions that will immediately run queries and return results, which does not make sense on a streaming Dataset. Rather, those functionalities can be done by explicitly starting a streaming query (see the next section regarding that).

  • count() - Cannot return a single count from a streaming Dataset. Instead, use ds.groupBy().count() which returns a streaming Dataset containing a running count.

  • foreach() - Instead use ds.writeStream.foreach(...) (see next section).

  • show() - Instead use the console sink (see next section).

If you try any of these operations, you will see an AnalysisException like “operation XYZ is not supported with streaming DataFrames/Datasets”. While some of them may be supported in future releases of Spark, there are others which are fundamentally hard to implement on streaming data efficiently. For example, sorting on the input stream is not supported, as it requires keeping track of all the data received in the stream. This is therefore fundamentally hard to execute efficiently.

Starting Streaming Queries

Once you have defined the final result DataFrame/Dataset, all that is left is for you to start the streaming computation. To do that, you have to use the DataStreamWriter (Scala/Java/Python docs) returned through Dataset.writeStream(). You will have to specify one or more of the following in this interface.

  • Details of the output sink: Data format, location, etc.

  • Output mode: Specify what gets written to the output sink.

  • Query name: Optionally, specify a unique name of the query for identification.

  • Trigger interval: Optionally, specify the trigger interval. If it is not specified, the system will check for availability of new data as soon as the previous processing has completed. If a trigger time is missed because the previous processing has not completed, then the system will attempt to trigger at the next trigger point, not immediately after the processing has completed.

  • Checkpoint location: For some output sinks where the end-to-end fault-tolerance can be guaranteed, specify the location where the system will write all the checkpoint information. This should be a directory in an HDFS-compatible fault-tolerant file system. The semantics of checkpointing is discussed in more detail in the next section.

Output Modes

There are a few types of output modes.

  • Append mode (default) - This is the default mode, where only the new rows added to the Result Table since the last trigger will be outputted to the sink. This is supported for only those queries where rows added to the Result Table is never going to change. Hence, this mode guarantees that each row will be output only once (assuming fault-tolerant sink). For example, queries with only select, where, map, flatMap, filter, join, etc. will support Append mode.

  • Complete mode - The whole Result Table will be outputted to the sink after every trigger. This is supported for aggregation queries.

  • Update mode - (Available since Spark 2.1.1) Only the rows in the Result Table that were updated since the last trigger will be outputted to the sink. More information to be added in future releases.

Different types of streaming queries support different output modes. Here is the compatibility matrix.

Query Type Supported Output Modes Notes
Queries with aggregation Aggregation on event-time with watermark Append, Update, Complete Append mode uses watermark to drop old aggregation state. But the output of a windowed aggregation is delayed the late threshold specified in `withWatermark()` as by the modes semantics, rows can be added to the Result Table only once after they are finalized (i.e. after watermark is crossed). See the Late Data section for more details.

Update mode uses watermark to drop old aggregation state.

Complete mode does not drop old aggregation state since by definition this mode preserves all data in the Result Table.
Other aggregations Complete, Update Since no watermark is defined (only defined in other category), old aggregation state is not dropped.

Append mode is not supported as aggregates can update thus violating the semantics of this mode.
Queries with mapGroupsWithState Update
Queries with flatMapGroupsWithState Append operation mode Append Aggregations are allowed after flatMapGroupsWithState.
Update operation mode Update Aggregations not allowed after flatMapGroupsWithState.
Other queries Append, Update Complete mode not supported as it is infeasible to keep all unaggregated data in the Result Table.

Output Sinks

There are a few types of built-in output sinks.

  • File sink - Stores the output to a directory.
    writeStream
        .format("parquet")        // can be "orc", "json", "csv", etc.
        .option("path", "path/to/destination/dir")
        .start()
  • Foreach sink - Runs arbitrary computation on the records in the output. See later in the section for more details.
    writeStream
        .foreach(...)
        .start()
  • Console sink (for debugging) - Prints the output to the console/stdout every time there is a trigger. Both, Append and Complete output modes, are supported. This should be used for debugging purposes on low data volumes as the entire output is collected and stored in the driver’s memory after every trigger.
    writeStream
        .format("console")
        .start()
  • Memory sink (for debugging) - The output is stored in memory as an in-memory table. Both, Append and Complete output modes, are supported. This should be used for debugging purposes on low data volumes as the entire output is collected and stored in the driver’s memory. Hence, use it with caution.
    writeStream
        .format("memory")
        .queryName("tableName")
        .start()

Some sinks are not fault-tolerant because they do not guarantee persistence of the output and are meant for debugging purposes only. See the earlier section on fault-tolerance semantics. Here are the details of all the sinks in Spark.

Sink Supported Output Modes Options Fault-tolerant Notes
File Sink Append path: path to the output directory, must be specified.

For file-format-specific options, see the related methods in DataFrameWriter (Scala/Java/Python/R). E.g. for "parquet" format options see DataFrameWriter.parquet()
Yes Supports writes to partitioned tables. Partitioning by time may be useful.
Foreach Sink Append, Update, Compelete None Depends on ForeachWriter implementation More details in the next section
Console Sink Append, Update, Complete numRows: Number of rows to print every trigger (default: 20)
truncate: Whether to truncate the output if too long (default: true)
No
Memory Sink Append, Complete None No. But in Complete Mode, restarted query will recreate the full table. Table name is the query name.

Note that you have to call start() to actually start the execution of the query. This returns a StreamingQuery object which is a handle to the continuously running execution. You can use this object to manage the query, which we will discuss in the next subsection. For now, let’s understand all this with a few examples.

``` // ========== DF with no aggregations ========== val noAggDF = deviceDataDf.select("device").where("signal > 10")

// Print new data to console
noAggDF
  .writeStream
  .format("console")
  .start()

// Write new data to Parquet files
noAggDF
  .writeStream
  .format("parquet")
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .option("path", "path/to/destination/dir")
  .start()

// ========== DF with aggregation ==========
val aggDF = df.groupBy("device").count()

// Print updated aggregations to console
aggDF
  .writeStream
  .outputMode("complete")
  .format("console")
  .start()

// Have all the aggregates in an in-memory table
aggDF
  .writeStream
  .queryName("aggregates")    // this query name will be the table name
  .outputMode("complete")
  .format("memory")
  .start()

spark.sql("select * from aggregates").show()   // interactively query in-memory table
```

Using Foreach

The foreach operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface ForeachWriter (Scala/Java docs), which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points.

  • The writer must be serializable, as it will be serialized and sent to the executors for execution.

  • All the three methods, open, process and close will be called on the executors.

  • The writer must do all the initialization (e.g. opening connections, starting a transaction, etc.) only when the open method is called. Be aware that, if there is any initialization in the class as soon as the object is created, then that initialization will happen in the driver (because that is where the instance is being created), which may not be what you intend.

  • version and partition are two parameters in open that uniquely represent a set of rows that needs to be pushed out. version is a monotonically increasing id that increases with every trigger. partition is an id that represents a partition of the output, since the output is distributed and will be processed on multiple executors.

  • open can use the version and partition to choose whether it needs to write the sequence of rows. Accordingly, it can return true (proceed with writing), or false (no need to write). If false is returned, then process will not be called on any row. For example, after a partial failure, some of the output partitions of the failed trigger may have already been committed to a database. Based on metadata stored in the database, the writer can identify partitions that have already been committed and accordingly return false to skip committing them again.

  • Whenever open is called, close will also be called (unless the JVM exits due to some error). This is true even if open returns false. If there is any error in processing and writing the data, close will be called with the error. It is your responsibility to clean up state (e.g. connections, transactions, etc.) that have been created in open such that there are no resource leaks.

Managing Streaming Queries

The StreamingQuery object created when a query is started can be used to monitor and manage the query.

    val query = df.writeStream.format("console").start()   // get the query object

    query.id          // get the unique identifier of the running query that persists across restarts from checkpoint data

    query.runId       // get the unique id of this run of the query, which will be generated at every start/restart

    query.name        // get the name of the auto-generated or user-specified name

    query.explain()   // print detailed explanations of the query

    query.stop()      // stop the query

    query.awaitTermination()   // block until query is terminated, with stop() or with error

    query.exception       // the exception if the query has been terminated with error

    query.recentProgress  // an array of the most recent progress updates for this query

    query.lastProgress    // the most recent progress update of this streaming query

You can start any number of queries in a single SparkSession. They will all be running concurrently sharing the cluster resources. You can use sparkSession.streams() to get the StreamingQueryManager (Scala/Java/Python docs) that can be used to manage the currently active queries.

    val spark: SparkSession = ...

    spark.streams.active    // get the list of currently active streaming queries

    spark.streams.get(id)   // get a query object by its unique id

    spark.streams.awaitAnyTermination()   // block until any one of them terminates

Monitoring Streaming Queries

There are two APIs for monitoring and debugging active queries - interactively and asynchronously.

Interactive APIs

You can directly get the current status and metrics of an active query using streamingQuery.lastProgress() and streamingQuery.status(). lastProgress() returns a StreamingQueryProgress object in Scala and Java and a dictionary with the same fields in Python. It has all the information about the progress made in the last trigger of the stream - what data was processed, what were the processing rates, latencies, etc. There is also streamingQuery.recentProgress which returns an array of last few progresses.

In addition, streamingQuery.status() returns a StreamingQueryStatus object in Scala and Java and a dictionary with the same fields in Python. It gives information about what the query is immediately doing - is a trigger active, is data being processed, etc.

Here are a few examples.

    val query: StreamingQuery = ...

    println(query.lastProgress)

    /* Will print something like the following.

    {
      "id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
      "runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
      "name" : "MyQuery",
      "timestamp" : "2016-12-14T18:45:24.873Z",
      "numInputRows" : 10,
      "inputRowsPerSecond" : 120.0,
      "processedRowsPerSecond" : 200.0,
      "durationMs" : {
        "triggerExecution" : 3,
        "getOffset" : 2
      },
      "eventTime" : {
        "watermark" : "2016-12-14T18:45:24.873Z"
      },
      "stateOperators" : [ ],
      "sources" : [ {
        "description" : "KafkaSource[Subscribe[topic-0]]",
        "startOffset" : {
          "topic-0" : {
            "2" : 0,
            "4" : 1,
            "1" : 1,
            "3" : 1,
            "0" : 1
          }
        },
        "endOffset" : {
          "topic-0" : {
            "2" : 0,
            "4" : 115,
            "1" : 134,
            "3" : 21,
            "0" : 534
          }
        },
        "numInputRows" : 10,
        "inputRowsPerSecond" : 120.0,
        "processedRowsPerSecond" : 200.0
      } ],
      "sink" : {
        "description" : "MemorySink"
      }
    }
    */


    println(query.status)

    /*  Will print something like the following.
    {
      "message" : "Waiting for data to arrive",
      "isDataAvailable" : false,
      "isTriggerActive" : false
    }
    */

Asynchronous API

You can also asynchronously monitor all queries associated with a SparkSession by attaching a StreamingQueryListener (Scala/Java docs). Once you attach your custom StreamingQueryListener object with sparkSession.streams.attachListener(), you will get callbacks when a query is started and stopped and when there is progress made in an active query. Here is an example,

    val spark: SparkSession = ...

    spark.streams.addListener(new StreamingQueryListener() {
        override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
            println("Query started: " + queryStarted.id)
        }
        override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
            println("Query terminated: " + queryTerminated.id)
        }
        override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
            println("Query made progress: " + queryProgress.progress)
        }
    })

Recovering from Failures with Checkpointing

In case of a failure or intentional shutdown, you can recover the previous progress and state of a previous query, and continue where it left off. This is done using checkpointing and write ahead logs. You can configure a query with a checkpoint location, and the query will save all the progress information (i.e. range of offsets processed in each trigger) and the running aggregates (e.g. word counts in the quick example to the checkpoint location. This checkpoint location has to be a path in an HDFS compatible file system, and can be set as an option in the DataStreamWriter when starting a query.

    aggDF
      .writeStream
      .outputMode("complete")
      .option("checkpointLocation", "path/to/HDFS/dir")
      .format("memory")
      .start()