import java.sql.Timestamp
import io.delta.tables._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode, Trigger}
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions.{Window, WindowSpec}
import org.lamastex.spark.trendcalculus._
import scala.util.Random
import java.sql.Timestamp
import io.delta.tables._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode, Trigger}
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions.{Window, WindowSpec}
import org.lamastex.spark.trendcalculus._
import scala.util.Random
dbutils.widgets.dropdown("m", "5", (1 to 10).map(_.toString).toSeq ++ Seq(15,20,25,30).map(_.toString) :+ "100")
dbutils.widgets.dropdown("n", "1", (1 to 3).map(_.toString).toSeq)
dbutils.widgets.dropdown("k", "max", (1 to 10).map(_.toString).toSeq :+ "max")
dbutils.widgets.dropdown("numTrainingSets", "10", (1 to 20).map( i => (i*5).toString).toSeq)
val maxRevPath = "s3a://xxx-yyy-zzz/canwrite/summerinterns2020/johannes/streamable-trend-calculus/stream/reversals"
val maxRevDS = spark.read.format("delta").load(maxRevPath).as[FlatReversal]
val modelPath = "s3a://xxx-yyy-zzz/canwrite/summerinterns2020/johannes/streamable-trend-calculus/estimators_new/"
val maxRevDSWithLagCountPath = modelPath + "maxRevDSWithLag_new"
val numPartitions = dbutils.widgets.get("numTrainingSets").toInt // 5
val partialModelPaths = (1 to numPartitions).map( i => modelPath + s"partialModel_new${i}" )
val fullModelPath = modelPath + "fullModel_new"
val m = dbutils.widgets.get("m").toInt // 5
val n = dbutils.widgets.get("n").toInt // 1
val k = dbutils.widgets.get("k") match { // 17
case "max" => math.abs(maxRevDS.orderBy(abs($"reversal").desc).first.reversal) + 1
case _ => dbutils.widgets.get("k").toInt
}
val trainingRatio = 0.7
type FinalModel = Map[Seq[Int], Map[Seq[Int], Double]]
def truncRev(k: Int)(rev: Int): Int = {
if (math.abs(rev) > k) k*rev.signum else rev
}
val truncRevUDF = udf{ rev: Int => rev.signum }
def truncRevsUDF(k: Int) = udf{ revs: Seq[Int] => revs.map(truncRev(k)) }
def lagColumn(df: DataFrame, orderColumnName: String, lagKeyName: String, lagValueName: String, m: Int, n: Int): DataFrame = {
val windowSpec = Window.partitionBy("ticker").orderBy(orderColumnName)
val laggedKeyColNames = (1 to m).map( i => s"lagKey$i" ).toSeq
val laggedValueColNames = (1 to n).map( i => s"lagValue$i" ).toSeq
val dfWithLaggedKeyColumns = (n+1 to m+n)
.foldLeft(df)( (df: DataFrame, i: Int) => df.withColumn(laggedKeyColNames(i-n-1), lag(lagKeyName, i-1, Int.MaxValue).over(windowSpec)) )
val dfWithLaggedKeyValueColumns = (1 to n)
.foldLeft(dfWithLaggedKeyColumns)( (df: DataFrame, i: Int) => df.withColumn(laggedValueColNames(i-1), lag(lagValueName, i-1, Int.MaxValue).over(windowSpec)) )
dfWithLaggedKeyValueColumns
.withColumn("lagKey", array(laggedKeyColNames.reverse.take(m).map(col(_)):_*))
.withColumn("lagValue", array(laggedValueColNames.reverse.takeRight(n).map(col(_)):_*))
.withColumn("lagKeyFirst", col(laggedKeyColNames.last))
.filter($"lagKeyFirst" =!= Int.MaxValue)
.drop("lagKeyFirst")
.drop(laggedKeyColNames:_*)
.drop(laggedValueColNames:_*)
}
truncRev: (k: Int)(rev: Int)Int
truncRevUDF: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$9580/1938427799@3bf97dfd,IntegerType,List(Some(class[value[0]: int])),Some(class[value[0]: int]),None,false,true)
truncRevsUDF: (k: Int)org.apache.spark.sql.expressions.UserDefinedFunction
lagColumn: (df: org.apache.spark.sql.DataFrame, orderColumnName: String, lagKeyName: String, lagValueName: String, m: Int, n: Int)org.apache.spark.sql.DataFrame
val maxRevDSWithLag = lagColumn(
maxRevDS
.orderBy("x")
.toDF
.withColumn("truncRev", truncRevUDF($"reversal"))
.withColumn("tmpTrend", sum("truncRev").over(Window.partitionBy("ticker").orderBy("x").rowsBetween(Window.unboundedPreceding, Window.currentRow)))
.withColumn("trend", when($"tmpTrend" === 0, 1).otherwise(-1))
.drop("truncRev", "tmpTrend"),
"x",
"reversal",
"trend",
m,
n
)
maxRevDSWithLag: org.apache.spark.sql.DataFrame = [ticker: string, x: timestamp ... 5 more fields]
val divUDF = udf{ (a: Long, b: Long) => a.toDouble/b }
val maxRevDSWithLagCount = spark.read.format("delta").load(maxRevDSWithLagCountPath)
val numberOfRows = maxRevDSWithLagCount.count
divUDF: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$9719/1255373188@144001bc,DoubleType,List(Some(class[value[0]: bigint]), Some(class[value[0]: bigint])),Some(class[value[0]: double]),None,false,true)
maxRevDSWithLagCount: org.apache.spark.sql.DataFrame = [ticker: string, x: timestamp ... 6 more fields]
numberOfRows: Long = 6845681
val tickers = maxRevDSWithLagCount.select("ticker").distinct.as[String].collect.toSeq
val tickerDFs = tickers.map( ticker => maxRevDSWithLagCount.filter($"ticker" === ticker))
val trainingDF = tickerDFs.map( df => df.limit((df.count*trainingRatio).toInt) ).reduce( _.union(_) ).orderBy("x")
val trainingRows = trainingDF.count
val testingDF = maxRevDSWithLagCount.except(trainingDF)
tickers: Seq[String] = WrappedArray(BCOUSD, XAUUSD)
tickerDFs: Seq[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]] = ArrayBuffer([ticker: string, x: timestamp ... 6 more fields], [ticker: string, x: timestamp ... 6 more fields])
trainingDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [ticker: string, x: timestamp ... 6 more fields]
trainingRows: Long = 4791976
testingDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [ticker: string, x: timestamp ... 6 more fields]
Markov Model for Trend Calculus
Johannes Graner, Albert Nilsson and Raazesh Sainudiin
2020, Uppsala, Sweden
This project was supported by Combient Mix AB through summer internships at:
Combient Competence Centre for Data Engineering Sciences, Department of Mathematics, Uppsala University, Uppsala, Sweden
Resources
This builds on the following library and its antecedents therein:
This work was inspired by: