03streamable-trend-calculus-estimators(Scala)

Loading...

Markov Model for Trend Calculus

Johannes Graner, Albert Nilsson and Raazesh Sainudiin

2020, Uppsala, Sweden

This project was supported by Combient Mix AB through summer internships at:

Combient Competence Centre for Data Engineering Sciences, Department of Mathematics, Uppsala University, Uppsala, Sweden

Resources

This builds on the following library and its antecedents therein:

This work was inspired by:

We use the dataset generated in the last notebook to build a simple, proof of concept Markov model for predicting trends.

import java.sql.Timestamp
import io.delta.tables._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode, Trigger}
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions.{Window, WindowSpec}
import org.lamastex.spark.trendcalculus._
import scala.util.Random
import java.sql.Timestamp import io.delta.tables._ import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode, Trigger} import org.apache.spark.sql.types._ import org.apache.spark.sql.expressions.{Window, WindowSpec} import org.lamastex.spark.trendcalculus._ import scala.util.Random
dbutils.widgets.dropdown("m", "5", (1 to 10).map(_.toString).toSeq ++ Seq(15,20,25,30).map(_.toString) :+ "100")
dbutils.widgets.dropdown("n", "1", (1 to 3).map(_.toString).toSeq)
dbutils.widgets.dropdown("k", "max", (1 to 10).map(_.toString).toSeq :+ "max")
dbutils.widgets.dropdown("numTrainingSets", "10", (1 to 20).map( i => (i*5).toString).toSeq)

Reading the joined dataset from the last notebook.

We train the model using both oil and gold data and predict trends in oil data. We show that this yields better results than just training on the oil data.

val maxRevPath = "s3a://xxx-yyy-zzz/canwrite/summerinterns2020/johannes/streamable-trend-calculus/stream/reversals"
val maxRevDS = spark.read.format("delta").load(maxRevPath).as[FlatReversal]
Show result

We want to predict what the trend of the next data point will be given the trend reversals we have observed.

For this, we use an m-th order Markov model. We look at the reversal state of the last m points and use this to predict the trends in the next n points. k is the maximum order of reversal that is considered when training the model.

trainingRatio is the ratio of the data used for training the model, the rest is used for testing.

val modelPath = "s3a://xxx-yyy-zzz/canwrite/summerinterns2020/johannes/streamable-trend-calculus/estimators_new/"
val maxRevDSWithLagCountPath = modelPath + "maxRevDSWithLag_new"
 
val numPartitions = dbutils.widgets.get("numTrainingSets").toInt // 5
val partialModelPaths = (1 to numPartitions).map( i => modelPath + s"partialModel_new${i}" )
val fullModelPath = modelPath + "fullModel_new"
 
val m = dbutils.widgets.get("m").toInt // 5
val n = dbutils.widgets.get("n").toInt // 1
val k = dbutils.widgets.get("k") match { // 17
  case "max" => math.abs(maxRevDS.orderBy(abs($"reversal").desc).first.reversal) + 1
  case _ => dbutils.widgets.get("k").toInt
}
val trainingRatio = 0.7
type FinalModel = Map[Seq[Int], Map[Seq[Int], Double]]
Show result
def truncRev(k: Int)(rev: Int): Int = {
  if (math.abs(rev) > k) k*rev.signum else rev
}
val truncRevUDF = udf{ rev: Int => rev.signum }
def truncRevsUDF(k: Int) = udf{ revs: Seq[Int] => revs.map(truncRev(k)) }
 
def lagColumn(df: DataFrame, orderColumnName: String, lagKeyName: String, lagValueName: String, m: Int, n: Int): DataFrame = {
  val windowSpec = Window.partitionBy("ticker").orderBy(orderColumnName)
  val laggedKeyColNames = (1 to m).map( i => s"lagKey$i" ).toSeq
  val laggedValueColNames = (1 to n).map( i => s"lagValue$i" ).toSeq
  val dfWithLaggedKeyColumns = (n+1 to m+n)
    .foldLeft(df)( (df: DataFrame, i: Int) => df.withColumn(laggedKeyColNames(i-n-1), lag(lagKeyName, i-1, Int.MaxValue).over(windowSpec)) )
  val dfWithLaggedKeyValueColumns = (1 to n)
    .foldLeft(dfWithLaggedKeyColumns)( (df: DataFrame, i: Int) => df.withColumn(laggedValueColNames(i-1), lag(lagValueName, i-1, Int.MaxValue).over(windowSpec)) )
  
  dfWithLaggedKeyValueColumns
    .withColumn("lagKey", array(laggedKeyColNames.reverse.take(m).map(col(_)):_*))
    .withColumn("lagValue", array(laggedValueColNames.reverse.takeRight(n).map(col(_)):_*))
    .withColumn("lagKeyFirst", col(laggedKeyColNames.last))
    .filter($"lagKeyFirst" =!= Int.MaxValue)
    .drop("lagKeyFirst")
    .drop(laggedKeyColNames:_*)
    .drop(laggedValueColNames:_*)
}
truncRev: (k: Int)(rev: Int)Int truncRevUDF: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$9580/1938427799@3bf97dfd,IntegerType,List(Some(class[value[0]: int])),Some(class[value[0]: int]),None,false,true) truncRevsUDF: (k: Int)org.apache.spark.sql.expressions.UserDefinedFunction lagColumn: (df: org.apache.spark.sql.DataFrame, orderColumnName: String, lagKeyName: String, lagValueName: String, m: Int, n: Int)org.apache.spark.sql.DataFrame

The trend at each point can be extracted from the trend reversals by taking the sum of all previous 1-st order trend reversals. This sum will always be either 0 (up trend) or -1 (down trend) and 0 is therefore mapped to 1 to get (1, -1) as (up, down).

val maxRevDSWithLag = lagColumn(
  maxRevDS
    .orderBy("x")
    .toDF
    .withColumn("truncRev", truncRevUDF($"reversal"))
    .withColumn("tmpTrend", sum("truncRev").over(Window.partitionBy("ticker").orderBy("x").rowsBetween(Window.unboundedPreceding, Window.currentRow)))
    .withColumn("trend", when($"tmpTrend" === 0, 1).otherwise(-1))
    .drop("truncRev", "tmpTrend"),
  "x", 
  "reversal",
  "trend",
  m, 
  n
)
maxRevDSWithLag: org.apache.spark.sql.DataFrame = [ticker: string, x: timestamp ... 5 more fields]

We now want to predict lagValue from lagKey.

display(maxRevDSWithLag)
 
ticker
x
y
reversal
trend
lagKey
lagValue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
XAUUSD
2009-03-15T18:09:00.000+0000
925.4
4
1
[-6, 0, 0, 1, -1, 0, 0, 0, 0, 0]
[1]
XAUUSD
2009-03-15T18:10:00.000+0000
925.75
-1
-1
[0, 0, 1, -1, 0, 0, 0, 0, 0, 4]
[-1]
XAUUSD
2009-03-15T18:11:00.000+0000
925.7
0
-1
[0, 1, -1, 0, 0, 0, 0, 0, 4, -1]
[-1]
XAUUSD
2009-03-15T18:12:00.000+0000
925.65
1
1
[1, -1, 0, 0, 0, 0, 0, 4, -1, 0]
[1]
XAUUSD
2009-03-15T18:13:00.000+0000
925.65
0
1
[-1, 0, 0, 0, 0, 0, 4, -1, 0, 1]
[1]
XAUUSD
2009-03-15T18:14:00.000+0000
925.75
0
1
[0, 0, 0, 0, 0, 4, -1, 0, 1, 0]
[1]
XAUUSD
2009-03-15T18:15:00.000+0000
925.75
-2
-1
[0, 0, 0, 0, 4, -1, 0, 1, 0, 0]
[-1]
XAUUSD
2009-03-15T18:16:00.000+0000
925.65
0
-1
[0, 0, 0, 4, -1, 0, 1, 0, 0, -2]
[-1]
XAUUSD
2009-03-15T18:17:00.000+0000
925.6
2
1
[0, 0, 4, -1, 0, 1, 0, 0, -2, 0]
[1]
XAUUSD
2009-03-15T18:18:00.000+0000
925.85
0
1
[0, 4, -1, 0, 1, 0, 0, -2, 0, 2]
[1]
XAUUSD
2009-03-15T18:19:00.000+0000
926.05
0
1
[4, -1, 0, 1, 0, 0, -2, 0, 2, 0]
[1]
XAUUSD
2009-03-15T18:20:00.000+0000
925.95
0
1
[-1, 0, 1, 0, 0, -2, 0, 2, 0, 0]
[1]
XAUUSD
2009-03-15T18:21:00.000+0000
926.55
0
1
[0, 1, 0, 0, -2, 0, 2, 0, 0, 0]
[1]
XAUUSD
2009-03-15T18:22:00.000+0000
926.95
0
1
[1, 0, 0, -2, 0, 2, 0, 0, 0, 0]
[1]
XAUUSD
2009-03-15T18:23:00.000+0000
927.25
0
1
[0, 0, -2, 0, 2, 0, 0, 0, 0, 0]
[1]

Truncated results, showing first 1000 rows.

Cleaning up last run and writing model training input to delta tables.

dbutils.fs.rm(maxRevDSWithLagCountPath, recurse=true)
 
maxRevDSWithLag
  //.withColumn("lagValueTrunc", truncRevsUDF(1)($"lagValue"))
  .withColumn("count", lit(1L))
  .write
  .format("delta")
  .mode("overwrite")
  .save(maxRevDSWithLagCountPath)
partialModelPaths.map(dbutils.fs.rm(_, recurse=true))
res5: scala.collection.immutable.IndexedSeq[Boolean] = Vector(true, true, true, true, true, true, true, true, true, true)
val divUDF = udf{ (a: Long, b: Long) => a.toDouble/b }
val maxRevDSWithLagCount = spark.read.format("delta").load(maxRevDSWithLagCountPath)
val numberOfRows = maxRevDSWithLagCount.count
divUDF: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$9719/1255373188@144001bc,DoubleType,List(Some(class[value[0]: bigint]), Some(class[value[0]: bigint])),Some(class[value[0]: double]),None,false,true) maxRevDSWithLagCount: org.apache.spark.sql.DataFrame = [ticker: string, x: timestamp ... 6 more fields] numberOfRows: Long = 6845681

The data is split into training and testing data. This is not done randomly as there is a dependence on previous data points. We don't want to train on data that is dependent on the testing data and therefore the training data consists on looking at the first (for example) 70% of the data and the last 30% is saved for testing. This also reflects how the model would be used since we can only train on data points that have already been observed.

val tickers = maxRevDSWithLagCount.select("ticker").distinct.as[String].collect.toSeq
val tickerDFs = tickers.map( ticker => maxRevDSWithLagCount.filter($"ticker" === ticker))
val trainingDF = tickerDFs.map( df => df.limit((df.count*trainingRatio).toInt) ).reduce( _.union(_) ).orderBy("x")
val trainingRows = trainingDF.count
 
val testingDF = maxRevDSWithLagCount.except(trainingDF)
tickers: Seq[String] = WrappedArray(BCOUSD, XAUUSD) tickerDFs: Seq[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]] = ArrayBuffer([ticker: string, x: timestamp ... 6 more fields], [ticker: string, x: timestamp ... 6 more fields]) trainingDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [ticker: string, x: timestamp ... 6 more fields] trainingRows: Long = 4791976 testingDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [ticker: string, x: timestamp ... 6 more fields]

Create numTrainingSets training set of increasing size to get snapshots of how a partially trained model looks like. The sizes are spaced logarithmically since the improvement in the model is fastest in the beginning.