ScaDaMaLe Course site and book

// Scala imports import org.lamastex.spark.trendcalculus._ import spark.implicits._ import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import java.sql.Timestamp import org.apache.spark.sql.expressions._
import org.lamastex.spark.trendcalculus._ import spark.implicits._ import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import java.sql.Timestamp import org.apache.spark.sql.expressions._
// Load dataset val oilDS = spark.read.fx1m("dbfs:/FileStore/shared_uploads/fabiansi@kth.se/*csv.gz").toDF.withColumn("ticker", lit("BCOUSD")).select($"ticker", $"time" as "x", $"close" as "y").as[TickerPoint].orderBy("time") // Add column with difference from previous close value (expected 'x', 'y' column names) val windowSpec = Window.orderBy("x") val oilDS1 = oilDS .withColumn("diff_close", $"y" - when((lag("y", 1).over(windowSpec)).isNull, 0).otherwise(lag("y", 1).over(windowSpec))) // Rename variables val oilDS2 = oilDS1.withColumnRenamed("x","time").withColumnRenamed("y","close") // Remove incomplete data from first day (2010-11-14) and last day (2019-06-21) val oilDS3 = oilDS2.filter(to_date(oilDS2("time")) >= lit("2010-11-15") && to_date(oilDS2("time")) <= lit("2019-06-20")) // Add index column val windowSpec1 = Window.orderBy("time") val oilDS4 = oilDS3 .withColumn("index", row_number().over(windowSpec1)) // Drop ticker column val oilDS5 = oilDS4.drop("ticker") // Store loaded data as temp view, to be accessible in Python oilDS5.createOrReplaceTempView("temp")
oilDS: org.apache.spark.sql.Dataset[org.lamastex.spark.trendcalculus.TickerPoint] = [ticker: string, x: timestamp ... 1 more field] windowSpec: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@6dd55d65 oilDS1: org.apache.spark.sql.DataFrame = [ticker: string, x: timestamp ... 2 more fields] oilDS2: org.apache.spark.sql.DataFrame = [ticker: string, time: timestamp ... 2 more fields] oilDS3: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [ticker: string, time: timestamp ... 2 more fields] windowSpec1: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@5be434ff oilDS4: org.apache.spark.sql.DataFrame = [ticker: string, time: timestamp ... 3 more fields] oilDS5: org.apache.spark.sql.DataFrame = [time: timestamp, close: double ... 2 more fields]
#Python imports import datetime import gym import math import random import json import collections import numpy as np import matplotlib.pyplot as plt from keras.models import Sequential from keras.layers.core import Dense from keras.layers import Conv1D, MaxPool1D, Flatten, BatchNormalization from keras import optimizers from elephas.utils.rdd_utils import to_simple_rdd from elephas.spark_model import SparkModel
Using TensorFlow backend. WARNING
# Create Dataframe from temp data oilDF_py = spark.table("temp") # Select the 10 first Rows of data and print them ten_oilDF_py = oilDF_py.limit(10) ten_oilDF_py.show() # Check number of data points last_index = oilDF_py.count() print("Number of data points: {}".format(last_index)) # Select the date of the last data point print("Last data point: {}".format(np.array(oilDF_py.where(oilDF_py.index == last_index).select('time').collect()).item()))
+-------------------+-----+--------------------+-----+ | time|close| diff_close|index| +-------------------+-----+--------------------+-----+ |2010-11-15 00:00:00| 86.6|-0.01000000000000...| 1| |2010-11-15 00:01:00| 86.6| 0.0| 2| |2010-11-15 00:02:00|86.63|0.030000000000001137| 3| |2010-11-15 00:03:00|86.61|-0.01999999999999602| 4| |2010-11-15 00:05:00|86.61| 0.0| 5| |2010-11-15 00:07:00| 86.6|-0.01000000000000...| 6| |2010-11-15 00:08:00|86.58|-0.01999999999999602| 7| |2010-11-15 00:09:00|86.58| 0.0| 8| |2010-11-15 00:10:00|86.58| 0.0| 9| |2010-11-15 00:12:00|86.57|-0.01000000000000...| 10| +-------------------+-----+--------------------+-----+ Number of data points: 2523078 Last data point: 2019-06-20 23:59:00
# Adapted from: https://github.com/kh-kim/stock_market_reinforcement_learning/blob/master/market_env.py class MarketEnv(gym.Env): def __init__(self, full_data, start_date, end_date, episode_size=30*24*60, scope=60): self.episode_size = episode_size self.actions = ["LONG", "SHORT"] self.action_space = gym.spaces.Discrete(len(self.actions)) self.state_space = gym.spaces.Box(np.ones(scope) * -1, np.ones(scope)) self.diff_close = np.array(full_data.filter(full_data["time"] > start_date).filter(full_data["time"] <= end_date).select('diff_close').collect()) max_diff_close = np.max(self.diff_close) self.diff_close = self.diff_close*max_diff_close self.close = np.array(full_data.filter(full_data["time"] > start_date).filter(full_data["time"] <= end_date).select('close').collect()) self.num_ticks_train = np.shape(self.diff_close)[0] self.scope = scope # N values to be included in a state vector self.time_index = self.scope # start N steps in, to ensure that we have enough past values for history self.episode_init_time = self.time_index # initial time index of the episode def step(self, action): info = {'index': int(self.time_index), 'close': float(self.close[self.time_index])} self.time_index += 1 self.state = self.diff_close[self.time_index - self.scope:self.time_index] self.reward = float( - (2 * action - 1) * self.state[-1] ) # Check if done if self.time_index - self.episode_init_time > self.episode_size: self.done = True if self.time_index > self.diff_close.shape[0] - self.scope -1: self.done = True return self.state, self.reward, self.done, info def reset(self, random_starttime=True): self.done = False self.reward = 0. self.time_index = self.scope self.state = self.diff_close[self.time_index - self.scope:self.time_index] if random_starttime: self.time_index += random.randint(0, self.num_ticks_train - self.scope) self.episode_init_time = self.time_index return self.state def seed(self): pass
# Adapted from: https://dbc-635ca498-e5f1.cloud.databricks.com/?o=445287446643905#notebook/4201196137758409/command/4201196137758410 class ExperienceReplay: def __init__(self, max_memory=100, discount=.9): self.max_memory = max_memory self.memory = list() self.discount = discount def remember(self, states, done): self.memory.append([states, done]) if len(self.memory) > self.max_memory: del self.memory[0] def get_batch(self, model, batch_size=10): len_memory = len(self.memory) num_actions = model.output_shape[-1] env_dim = self.memory[0][0][0].shape[1] inputs = np.zeros((min(len_memory, batch_size), env_dim, 1)) targets = np.zeros((inputs.shape[0], num_actions)) for i, idx in enumerate(np.random.randint(0, len_memory, size=inputs.shape[0])): state_t, action_t, reward_t, state_tp1 = self.memory[idx][0] done = self.memory[idx][1] inputs[i:i + 1] = state_t # There should be no target values for actions not taken. targets[i] = model.predict(state_t)[0] Q_sa = np.max(model.predict(state_tp1)[0]) if done: # if done is True targets[i, action_t] = reward_t else: # reward_t + gamma * max_a' Q(s', a') targets[i, action_t] = reward_t + self.discount * Q_sa return inputs, targets
# Adapted from: https://dbc-635ca498-e5f1.cloud.databricks.com/?o=445287446643905#notebook/4201196137758409/command/4201196137758410 # RL parameters epsilon = .5 # exploration min_epsilon = 0.1 max_memory = 5000 batch_size = 512 discount = 0.8 # Environment parameters num_actions = 2 # [long, short] episodes = 500 # 100000 episode_size = 1 * 1 * 60 # roughly an hour worth of data in each training episode # Define state sequence scope (approx. 1 hour) sequence_scope = 60 input_shape = (batch_size, sequence_scope, 1) # Create Q Network hidden_size = 128 model = Sequential() model.add(Conv1D(32, (5), strides=2, input_shape=input_shape[1:], activation='relu')) model.add(MaxPool1D(pool_size=2, strides=1)) model.add(BatchNormalization()) model.add(Conv1D(32, (5), strides=1, activation='relu')) model.add(MaxPool1D(pool_size=2, strides=1)) model.add(BatchNormalization()) model.add(Flatten()) model.add(Dense(hidden_size, activation='relu')) model.add(BatchNormalization()) model.add(Dense(num_actions)) opt = optimizers.Adam(lr=0.01) model.compile(loss='mse', optimizer=opt) # Define training interval start = datetime.datetime(2010, 11, 15, 0, 0) end = datetime.datetime(2018, 12, 31, 23, 59) # Initialize Environment env = MarketEnv(oilDF_py, start, end, episode_size=episode_size, scope=sequence_scope) # Initialize experience replay object exp_replay = ExperienceReplay(max_memory=max_memory, discount=discount)
WARNING:tensorflow:From /databricks/python/lib/python3.7/site-packages/tensorflow/python/framework/op_def_library.py:263: colocate_with (from tensorflow.python.framework.ops) is deprecated and will be removed in a future version. Instructions for updating: Colocations handled automatically by placer. /databricks/python/lib/python3.7/site-packages/gym/logger.py:30: UserWarning: <span class="ansi-yellow-fg">WARN: Box bound precision lowered by casting to float32</span> warnings.warn(colorize('%s: %s'%('WARN', msg % args), 'yellow'))

Elephas

https://github.com/danielenricocahall/elephas

Elephas is a third party library that allows to train distributed Keras models on Spark. To run a distributed training session, a Keras model is first declared and compiled on one (singular) master node. Then, copies of the Master model are serialized and shipped to an arbitrary number of worker nodes. Elephas uses RDD's internally to make the data dynamically available to the workers when required. After gradient computating and the update of the weights, the updated model parameters are pushed to the master model.

For updating the parameters of the master_model, Elephas provides three modes, Synchronous, Asynchronous and HOGWILD (https://arxiv.org/abs/1106.5730).

Integrating Distributed model training in our RL-framework

Elephas supports in the current version only supervised model training. We therefore opt to distribute the supervised training step based on the experience replay buffer and keep the surrounding for loops from the previeous RL-implementation.

Training data conversion

Data must be provided as either RDD or pyspark dataframe (that will internally be converted to RDD's). A more elaborate pipeline might slice and evaluate replay buffer instances from the original dataframe, however, since most of our implementation expects numpy arrays, we convert the buffer to an RDD manually each step.

Elephas SparkModel for retraining in the RL-loop

When Elephas finished its training epochs (here, one Experiancereplay buffer training in one of the RL-loop sweeps), the used processes get terminated. This leads to a crash when trying to retrain a already trained model. As a workaround, we initialize theelephas in each training step newly by using the keras model from the previous training step.

# elephas variables ele_epochs = 10 ele_batchsize = 32 ele_verbose = 0 ele_valsplit = 0.1 # Train returns = [] for e in range(1, episodes): loss = 0. counter = 0 reward_sum = 0. done = False state = env.reset() input_t = state.reshape(1, sequence_scope, 1) while not done: counter += 1 input_tm1 = input_t # get next action if np.random.rand() <= epsilon: action = np.random.randint(0, num_actions, size=1) else: q = model.predict(input_tm1) action = np.argmax(q[0]) # apply action, get rewards and new state state, reward, done, info = env.step(action) reward_sum += reward input_t = state.reshape(1, sequence_scope, 1) # store experience exp_replay.remember([input_tm1, action, reward, input_t], done) # adapt model inputs, targets = exp_replay.get_batch(model, batch_size=batch_size) # elephas calls for distributed gradient optimization train_rdd = to_simple_rdd(sc, inputs, targets) # note that we provide the spark context sc (sc variable automatically set in databricks) spark_model = SparkModel(model, frequency='epoch', mode='asynchronous') # 'asynchronous', 'hogwild' or 'synchronous' spark_model.fit(train_rdd, epochs=ele_epochs, batch_size=ele_batchsize, verbose=ele_verbose, validation_split=ele_valsplit) model = spark_model._master_network # hacky! loss += model.train_on_batch(inputs, targets) print("Episode {:03d}/{:d} | Average Loss {:.4f} | Cumulative Reward {:.4f}".format(e, episodes, loss / counter, reward_sum)) epsilon = max(min_epsilon, epsilon * 0.99) returns.append(reward_sum)

Notes to the elephas training

The pipeline in this notebook serves as a proof of concept to demonstrate how RL-training can be distributed on a spark cluster. During testing, we observed that the experience replay is a bottleneck during distributed model training, when comparing to running keras out-of the box in parallel.

Error that sometimes appears when running on databricks

We notice occasional crashes in the distributed training at line 180 in https://github.com/danielenricocahall/elephas/blob/master/elephas/spark_model.py, more precisely at rdd.mapPartitions(worker.train).collect(), with a Py4JJavaError. Restarting the cluster does not resolve the issue, however sometimes it was possible to re-run a training succesfully after a bit of time. We assume that it is connected with the jvm, but lacking precise insight regarding the nature of this bug.