01trend-calculus-showcase(Scala)

Loading...

Finding trend in oil price data.

Johannes Graner, Albert Nilsson and Raazesh Sainudiin

2020, Uppsala, Sweden

This project was supported by Combient Mix AB through summer internships at:

Combient Competence Centre for Data Engineering Sciences, Department of Mathematics, Uppsala University, Uppsala, Sweden

Resources

This builds on the following library and its antecedents therein:

This work was inspired by:

When dealing with time series, it can be difficult to find a good way to find and analyze trends in the data.

One approach is by using the Trend Calculus algorithm invented by Andrew Morgan. More information about Trend Calculus can be found at this github.io page.

import org.lamastex.spark.trendcalculus._
import spark.implicits._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import java.sql.Timestamp
import org.lamastex.spark.trendcalculus._ import spark.implicits._ import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import java.sql.Timestamp

The input to the algorithm is data in the format (ticker, time, value). In this example, ticker is "BCOUSD" (Brent Crude Oil), time is given in minutes and value is the closing price for Brent Crude Oil during that minute.

This data is historical data from 2010 to 2019 taken from https://www.histdata.com/ using methods from FX-1-Minute-Data by Philippe Remy. In this notebook, everything is done on static dataframes. See 02streamable-trend-calculus for examples on streaming dataframes.

There are gaps in the data, notably during the weekends when no trading takes place, but this does not affect the algorithm as it is does not place any assumptions on the data other than that time is monotonically increasing.

The window size is set to 2, which is minimal, beacuse we want to retain as much information as possible.

val windowSize = 2
val oilDS = spark.read.fx1m("s3a://xxx-yyy-zzz/findata/com/histdata/free/FX-1-Minute-Data/bcousd/*.csv.gz")
  .toDF.withColumn("ticker", lit("BCOUSD"))
  .select($"ticker", $"time" as "x", $"close" as "y")
  .as[TickerPoint].distinct.orderBy("x")
windowSize: Int = 2 oilDS: org.apache.spark.sql.Dataset[org.lamastex.spark.trendcalculus.TickerPoint] = [ticker: string, x: timestamp ... 1 more field]

If we want to look at long term trends, we can use the output time series as input for another iteration. The output contains the points of the input where the trend changes (reversals). This can be repeated several times, resulting in longer term trends.

The algorithm computes all possible orders of trend reversal in one pass.

val dfWithReversals = new TrendCalculus2(oilDS, windowSize, spark).reversals
numReversals: Int = 15 dfWithReversals: org.apache.spark.sql.Dataset[org.lamastex.spark.trendcalculus.Reversal] = [tickerPoint: struct<ticker: string, x: timestamp ... 1 more field>, reversal: int]
display(dfWithReversals.cache.orderBy("tickerPoint.x"))
 
tickerPoint
reversal
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{"ticker": "BCOUSD", "x": "2010-11-14T20:15:00.000+0000", "y": 86.74}
0
{"ticker": "BCOUSD", "x": "2010-11-14T20:17:00.000+0000", "y": 86.75}
0
{"ticker": "BCOUSD", "x": "2010-11-14T20:18:00.000+0000", "y": 86.76}
-1
{"ticker": "BCOUSD", "x": "2010-11-14T20:19:00.000+0000", "y": 86.74}
0
{"ticker": "BCOUSD", "x": "2010-11-14T20:21:00.000+0000", "y": 86.74}
1
{"ticker": "BCOUSD", "x": "2010-11-14T20:24:00.000+0000", "y": 86.75}
0
{"ticker": "BCOUSD", "x": "2010-11-14T20:26:00.000+0000", "y": 86.77}
0
{"ticker": "BCOUSD", "x": "2010-11-14T20:27:00.000+0000", "y": 86.75}
0
{"ticker": "BCOUSD", "x": "2010-11-14T20:28:00.000+0000", "y": 86.79}
0
{"ticker": "BCOUSD", "x": "2010-11-14T20:32:00.000+0000", "y": 86.81}
0
{"ticker": "BCOUSD", "x": "2010-11-14T20:33:00.000+0000", "y": 86.81}
-1
{"ticker": "BCOUSD", "x": "2010-11-14T20:34:00.000+0000", "y": 86.81}
0
{"ticker": "BCOUSD", "x": "2010-11-14T20:35:00.000+0000", "y": 86.79}
0
{"ticker": "BCOUSD", "x": "2010-11-14T20:36:00.000+0000", "y": 86.8}
0
{"ticker": "BCOUSD", "x": "2010-11-14T20:37:00.000+0000", "y": 86.79}
0

Truncated results, showing first 1000 rows.

The number of reversals decrease rapidly as more iterations are done.

dfWithReversals.cache.count
res13: Long = 2859139
println("order\tcount")
dfWithReversals
  .groupBy(abs($"reversal") as "reversal").agg(count($"tickerPoint") as "count")
  .orderBy("reversal").select("count").as[Long]
  .collect.scanRight(0L)(_ + _).dropRight(1).zipWithIndex.tail
  .foreach(rev => println(s"${rev._2}:\t${rev._1}"))
order count 1: 775426 2: 253313 3: 93830 4: 37085 5: 15426 6: 6607 7: 2872 8: 1247 9: 532 10: 229 11: 96 12: 44 13: 24 14: 11 15: 7 16: 3 17: 1

Writing dataframe to parquet in order to read from python.

dfWithReversals.write.mode(SaveMode.Overwrite).parquet("s3a://xxx-yyy-zzz/canwrite/summerinterns2020/trend-calculus-blog/public/joinedDSWithMaxRev")
dfWithReversals.unpersist
res15: dfWithReversals.type = [tickerPoint: struct<ticker: string, x: timestamp ... 1 more field>, reversal: int]

Visualization

Plotly in python is used to make interactive visualizations.

%python
from plotly.offline import plot
from plotly.graph_objs import *
from datetime import *
from pyspark.sql.functions import abs
joinedDS = spark.read.parquet("s3a://xxx-yyy-zzz/canwrite/summerinterns2020/trend-calculus-blog/public/joinedDSWithMaxRev").orderBy("tickerPoint.x")

Seeing how much the timeseries has to be thinned out in order to display locally.

No information about higher order trend reversals is lost since every higher order reversal is also a lower order reversal.

%python
joinedDS.filter("abs(reversal) > 2").count()
Out[2]: 93830
%python
fullTS = joinedDS.filter("abs(reversal) > 2").withColumn("x", joinedDS.tickerPoint.x).withColumn("y", joinedDS.tickerPoint.y).withColumn("maxRev", abs(joinedDS.reversal)).select("x","y","maxRev").collect()

Picking an interval to focus on.

Start and end dates as (year, month, day, hour, minute, second). Only year, month and day are required. The interval from 1800 to 2200 ensures all data is selected.

%python
startDate = datetime(1800,1,1)
endDate= datetime(2200,12,31)
TS = [row for row in fullTS if startDate <= row['x'] and row['x'] <= endDate]

Setting up the visualization.

%python
numReversals = 17
startReversal = 7
 
allData = {'x': [row['x'] for row in TS], 'y': [row['y'] for row in TS], 'maxRev': [row['maxRev'] for row in TS]}
revTS = [row for row in TS if row[2] >= startReversal]
colorList = ['rgba(' + str(tmp) + ',' + str(255-tmp) + ',' + str(255-tmp) + ',1)' for tmp in [int(i*255/(numReversals-startReversal+1)) for i in range(1,numReversals-startReversal+2)]]
 
def getRevTS(tsWithRevMax, revMax):
  x = [row[0] for row in tsWithRevMax if row[2] >= revMax]
  y = [row[1] for row in tsWithRevMax if row[2] >= revMax]
  return x,y,revMax
 
reducedData = [getRevTS(revTS, i) for i in range(startReversal, numReversals+1)]
 
markerPlots = [Scattergl(x=x, y=y, mode='markers', marker=dict(color=colorList[i-startReversal], size=i), name='Reversal ' + str(i)) for (x,y,i) in [getRevTS(revTS, i) for i in range(startReversal, numReversals+1)]]

Plotting result as plotly graph

The graph is interactive, one can drag to zoom in on an area (double-click to get back) and click on the legend to hide/show different series.

%python
p = plot(
  [Scattergl(x=allData['x'], y=allData['y'], mode='lines', name='Oil Price')] + markerPlots
  ,
  output_type='div'
)
 
displayHTML(p)