ScaDaMaLe Course site and book

Streaming Trend Calculus with Maximum Necessary Reversals

Johannes Graner (LinkedIn), Albert Nilsson (LinkedIn) and Raazesh Sainudiin (LinkedIn)

2020, Uppsala, Sweden

This project was supported by Combient Mix AB through summer internships at:

Combient Competence Centre for Data Engineering Sciences, Department of Mathematics, Uppsala University, Uppsala, Sweden


Resources

This builds on the following library and its antecedents therein:

This work was inspired by:

"./000a_finance_utils"

We use the spark-trend-calculus library and Spark structured streams over delta.io files to obtain a representation of the complete time series of trends with their k-th order reversal.

This representation is a sufficient statistic for a Markov model of trends that we show in the next notebook.

defined object TrendUtils
import java.sql.Timestamp
import io.delta.tables._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode, Trigger}
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions.{Window, WindowSpec}
import org.lamastex.spark.trendcalculus._
import java.sql.Timestamp
import io.delta.tables._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode, Trigger}
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions.{Window, WindowSpec}
import org.lamastex.spark.trendcalculus._

Input data in s3. The data contains oil price data from 2010 to last month and gold price data from 2009 to last month.

val rootPath = "s3a://XXXXX/summerinterns2020/johannes/streamable-trend-calculus/"
val oilGoldPath = rootPath + "oilGoldDelta"
spark.read.format("delta").load(oilGoldPath).orderBy("x").show(20,false)
+------+-------------------+------+
|ticker|x                  |y     |
+------+-------------------+------+
|XAUUSD|2009-03-15 17:00:00|929.6 |
|XAUUSD|2009-03-15 18:00:00|926.05|
|XAUUSD|2009-03-15 18:01:00|925.9 |
|XAUUSD|2009-03-15 18:02:00|925.9 |
|XAUUSD|2009-03-15 18:03:00|926.95|
|XAUUSD|2009-03-15 18:04:00|925.8 |
|XAUUSD|2009-03-15 18:05:00|926.35|
|XAUUSD|2009-03-15 18:06:00|925.8 |
|XAUUSD|2009-03-15 18:07:00|925.6 |
|XAUUSD|2009-03-15 18:08:00|925.7 |
|XAUUSD|2009-03-15 18:09:00|925.4 |
|XAUUSD|2009-03-15 18:10:00|925.75|
|XAUUSD|2009-03-15 18:11:00|925.7 |
|XAUUSD|2009-03-15 18:12:00|925.65|
|XAUUSD|2009-03-15 18:13:00|925.65|
|XAUUSD|2009-03-15 18:14:00|925.75|
|XAUUSD|2009-03-15 18:15:00|925.75|
|XAUUSD|2009-03-15 18:16:00|925.65|
|XAUUSD|2009-03-15 18:17:00|925.6 |
|XAUUSD|2009-03-15 18:18:00|925.85|
+------+-------------------+------+
only showing top 20 rows

Reading the data from s3 as a Structured Stream to simulate streaming.

val input = spark
  .readStream
  .format("delta")
  .load(oilGoldPath)
  .as[TickerPoint]
input: org.apache.spark.sql.Dataset[org.lamastex.spark.trendcalculus.TickerPoint] = [ticker: string, x: timestamp ... 1 more field]

Using the trendcalculus library to 1. Apply Trend Calculus to the streaming dataset. - Save the result as a delta table. - Read the result as a stream. - Repeat from 1. using the latest result as input. Stop when result is empty.

val windowSize = 2

// Initializing variables for while loop.
var i = 1
var prevSinkPath = ""
var sinkPath = rootPath + "multiSinks/reversal" + (i)
var chkptPath = rootPath + "multiSinks/checkpoint/" + (i)

// The first order reversal.
var stream = new TrendCalculus2(input, windowSize, spark)
  .reversals
  .select("tickerPoint.ticker", "tickerPoint.x", "tickerPoint.y", "reversal")
  .as[FlatReversal]
  .writeStream
  .format("delta")
  .option("path", sinkPath)
  .option("checkpointLocation", chkptPath)
  .trigger(Trigger.Once())
  .start

stream.processAllAvailable

i += 1

var lastReversalSeries = spark.emptyDataset[TickerPoint]
while (!spark.read.format("delta").load(sinkPath).isEmpty) {
  
  prevSinkPath = rootPath + "multiSinks/reversal" + (i-1)
  sinkPath = rootPath + "multiSinks/reversal" + (i)
  chkptPath = rootPath + "multiSinks/checkpoint/" + (i)
  
  // Reading last result as stream
  lastReversalSeries = spark
    .readStream
    .format("delta")
    .load(prevSinkPath)
    .drop("reversal")
    .as[TickerPoint]

  // Writing next result
  stream = new TrendCalculus2(lastReversalSeries, windowSize, spark)
    .reversals
    .select("tickerPoint.ticker", "tickerPoint.x", "tickerPoint.y", "reversal")
    .as[FlatReversal]
    .map( rev => rev.copy(reversal=i*rev.reversal))
    .writeStream
    .format("delta")
    .option("path", sinkPath)
    .option("checkpointLocation", chkptPath)
    .partitionBy("ticker")
    .trigger(Trigger.Once())
    .start
  
  stream.processAllAvailable()
  
  i += 1
}

Checking the total number of reversals written. The last sink is empty so the highest order reversal is \[\text{number of sinks} - 1\].

val i = dbutils.fs.ls(rootPath + "multiSinks").length - 1
i: Int = 18

The written delta tables can be read as streams but for now we read them as static datasets to be able to join them together.

val sinkPaths = (1 to i-1).map(rootPath + "multiSinks/reversal" + _)
val maxRevPath = rootPath + "maxRev"
val revTables = sinkPaths.map(DeltaTable.forPath(_).toDF.as[FlatReversal])
val oilGoldTable = DeltaTable.forPath(oilGoldPath).toDF.as[TickerPoint]

The number of reversals decrease rapidly as the reversal order increases.

revTables.map(_.cache.count)
res6: scala.collection.immutable.IndexedSeq[Long] = Vector(1954849, 677939, 262799, 108202, 46992, 21154, 9703, 4427, 1992, 890, 404, 183, 83, 35, 15, 7, 2)

Joining all results to get a dataset with all reversals in a single column.

def maxByAbs(a: Int, b: Int): Int = {
  Seq(a,b).maxBy(math.abs)
}

val maxByAbsUDF = udf((a: Int, b: Int) => maxByAbs(a,b))

val maxRevDS = revTables.foldLeft(oilGoldTable.toDF.withColumn("reversal", lit(0)).as[FlatReversal]){ (acc: Dataset[FlatReversal], ds: Dataset[FlatReversal]) => 
  acc
    .toDF
    .withColumnRenamed("reversal", "oldMaxRev")
    .join(ds.select($"ticker" as "tmpt", $"x" as "tmpx", $"reversal" as "newRev"), $"ticker" === $"tmpt" && $"x" === $"tmpx", "left")
    .drop("tmpt", "tmpx")
    .na.fill(0,Seq("newRev"))
    .withColumn("reversal", maxByAbsUDF($"oldMaxRev", $"newRev"))
    .select("ticker", "x", "y", "reversal")
    .as[FlatReversal]    
}
maxByAbs: (a: Int, b: Int)Int
maxByAbsUDF: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$9089/155868360@d163ce5,IntegerType,List(Some(class[value[0]: int]), Some(class[value[0]: int])),None,false,true)
maxRevDS: org.apache.spark.sql.Dataset[org.lamastex.spark.trendcalculus.FlatReversal] = [ticker: string, x: timestamp ... 2 more fields]

Writing result as delta table.

maxRevDS.write.format("delta").partitionBy("ticker").save(maxRevPath)

The reversal column in the joined dataset contains the information of all orders of reversals.

0 indicates that no reversal happens while a non-zero value indicates that this is a reversal point for that order and every lower order.

For example, row 33 contains the value -4, meaning that this point is trend reversal downwards for orders 1, 2, 3, and 4.

DeltaTable.forPath(maxRevPath).toDF.as[FlatReversal].filter("ticker == 'BCOUSD'").orderBy("x").show(35, false)
+------+-------------------+-----+--------+
|ticker|x                  |y    |reversal|
+------+-------------------+-----+--------+
|BCOUSD|2010-11-14 20:15:00|86.74|0       |
|BCOUSD|2010-11-14 20:17:00|86.75|0       |
|BCOUSD|2010-11-14 20:18:00|86.76|-1      |
|BCOUSD|2010-11-14 20:19:00|86.74|0       |
|BCOUSD|2010-11-14 20:21:00|86.74|1       |
|BCOUSD|2010-11-14 20:24:00|86.75|0       |
|BCOUSD|2010-11-14 20:26:00|86.77|0       |
|BCOUSD|2010-11-14 20:27:00|86.75|0       |
|BCOUSD|2010-11-14 20:28:00|86.79|0       |
|BCOUSD|2010-11-14 20:32:00|86.81|0       |
|BCOUSD|2010-11-14 20:33:00|86.81|-1      |
|BCOUSD|2010-11-14 20:34:00|86.81|0       |
|BCOUSD|2010-11-14 20:35:00|86.79|0       |
|BCOUSD|2010-11-14 20:36:00|86.8 |0       |
|BCOUSD|2010-11-14 20:37:00|86.79|0       |
|BCOUSD|2010-11-14 20:38:00|86.79|1       |
|BCOUSD|2010-11-14 20:39:00|86.79|0       |
|BCOUSD|2010-11-14 20:40:00|86.8 |0       |
|BCOUSD|2010-11-14 20:41:00|86.8 |0       |
|BCOUSD|2010-11-14 20:42:00|86.8 |0       |
|BCOUSD|2010-11-14 20:43:00|86.82|0       |
|BCOUSD|2010-11-14 20:44:00|86.81|0       |
|BCOUSD|2010-11-14 20:47:00|86.84|0       |
|BCOUSD|2010-11-14 20:48:00|86.82|0       |
|BCOUSD|2010-11-14 20:49:00|86.84|-1      |
|BCOUSD|2010-11-14 20:50:00|86.82|0       |
|BCOUSD|2010-11-14 20:51:00|86.83|0       |
|BCOUSD|2010-11-14 20:52:00|86.82|1       |
|BCOUSD|2010-11-14 20:53:00|86.83|0       |
|BCOUSD|2010-11-14 20:54:00|86.84|0       |
|BCOUSD|2010-11-14 20:58:00|86.88|0       |
|BCOUSD|2010-11-14 20:59:00|86.89|0       |
|BCOUSD|2010-11-14 21:00:00|86.9 |-4      |
|BCOUSD|2010-11-14 21:03:00|86.87|0       |
|BCOUSD|2010-11-14 21:04:00|86.87|0       |
+------+-------------------+-----+--------+
only showing top 35 rows