ScaDaMaLe Course site and book

This notebook extends the TF v2.x code from the single machine notebook 23_ds_single_machine. The modification are such that the code enables multi-machine training using the horovod framwork.

We will highlight the changes compared the single machine implementation.

First: Check if the data is in local. If not, go to notebook 1_data_and_preprocssing and download the data from dbfs to local.

ls 06_LHC/
LICENSE README.md data h5 models scripts utils

Get the imports.

import argparse from argparse import Namespace from datetime import datetime import numpy as np import tensorflow as tf import socket import os import sys from sklearn.cluster import KMeans from tqdm import tqdm import h5py #Added matplot for accuracy import matplotlib.pyplot as plt np.set_printoptions(edgeitems=1000) from scipy.optimize import linear_sum_assignment BASE_DIR = os.path.join(os.getcwd(), '06_LHC','scripts') #os.path.dirname(os.path.abspath(__file__)) sys.path.append(BASE_DIR) sys.path.append(os.path.join(BASE_DIR, '..', 'models')) sys.path.append(os.path.join(BASE_DIR, '..', 'utils')) import provider import gapnet_classify as MODEL

Get the input parameters.

parserdict = {'max_dim': 3, #help='Dimension of the encoding layer [Default: 3]') 'n_clusters': 3, #help='Number of clusters [Default: 3]') 'gpu': 0, #help='GPU to use [default: GPU 0]') 'model': 'gapnet_clasify', #help='Model name [default: gapnet_classify]') 'log_dir': 'log', #help='Log dir [default: log]') 'num_point': 100, #help='Point Number [default: 100]') 'max_epoch': 100, #help='Epoch to run [default: 200]') 'epochs_pretrain': 20, #help='Epochs to for pretraining [default: 10]') 'batch_size': 1024, #help='Batch Size during training [default: 512]') 'learning_rate': 0.001, #help='Initial learning rate [default: 0.01]') 'momentum': 0.9, #help='Initial momentum [default: 0.9]') 'optimizer': 'adam', #help='adam or momentum [default: adam]') 'decay_step': 500000, #help='Decay step for lr decay [default: 500000]') 'wd': 0.0, #help='Weight Decay [Default: 0.0]') 'decay_rate': 0.5, #help='Decay rate for lr decay [default: 0.5]') 'output_dir': 'train_results', #help='Directory that stores all training logs and trained models') 'data_dir': os.path.join(os.getcwd(),'06_LHC', 'h5'), # '../h5', #help='directory with data [default: hdf5_data]') 'nfeat': 8, #help='Number of features [default: 8]') 'ncat': 20, #help='Number of categories [default: 20]') } FLAGS = Namespace(**parserdict) H5_DIR = FLAGS.data_dir EPOCH_CNT = 0 MAX_PRETRAIN = FLAGS.epochs_pretrain BATCH_SIZE = FLAGS.batch_size NUM_POINT = FLAGS.num_point NUM_FEAT = FLAGS.nfeat NUM_CLASSES = FLAGS.ncat MAX_EPOCH = FLAGS.max_epoch BASE_LEARNING_RATE = FLAGS.learning_rate GPU_INDEX = FLAGS.gpu MOMENTUM = FLAGS.momentum OPTIMIZER = FLAGS.optimizer DECAY_STEP = FLAGS.decay_step DECAY_RATE = FLAGS.decay_rate # MODEL = importlib.import_module(FLAGS.model) # import network module MODEL_FILE = os.path.join(BASE_DIR, 'models', FLAGS.model + '.py') LOG_DIR = os.path.join(os.getcwd(), '06_LHC', 'logs', FLAGS.log_dir) if not os.path.exists(LOG_DIR): os.makedirs(LOG_DIR) os.system('cp %s.py %s' % (MODEL_FILE, LOG_DIR)) # bkp of model def os.system('cp train_kmeans.py %s' % (LOG_DIR)) # bkp of train procedure BN_INIT_DECAY = 0.5 BN_DECAY_DECAY_RATE = 0.5 BN_DECAY_DECAY_STEP = float(DECAY_STEP) BN_DECAY_CLIP = 0.99 LEARNING_RATE_CLIP = 1e-5 HOSTNAME = socket.gethostname() TRAIN_FILES = provider.getDataFiles(os.path.join(H5_DIR, 'train_files_wztop.txt')) TEST_FILES = provider.getDataFiles(os.path.join(H5_DIR, 'test_files_wztop.txt'))

Define the utils functions.

def get_learning_rate(batch): learning_rate = tf.compat.v1.train.exponential_decay( BASE_LEARNING_RATE, # Base learning rate. batch * BATCH_SIZE, # Current index into the dataset. DECAY_STEP, # Decay step. DECAY_RATE, # Decay rate. staircase=True) learning_rate = tf.minimum(learning_rate, LEARNING_RATE_CLIP) # CLIP THE LEARNING RATE! return learning_rate def get_bn_decay(batch): bn_momentum = tf.compat.v1.train.exponential_decay( BN_INIT_DECAY, batch * BATCH_SIZE, BN_DECAY_DECAY_STEP, BN_DECAY_DECAY_RATE, staircase=True) bn_decay = tf.minimum(BN_DECAY_CLIP, 1 - bn_momentum) return bn_decay

Modification: - create checkpoint directory for horovod - directory is user chosen

import os import time checkpoint_dir = '/dbfs/databricks/driver/06_LHC/logs/train/{}/'.format(time.time()) os.makedirs(checkpoint_dir)

Create horovod h5 loading function: - not the rank and size is inputed. - rank is the current device id - size is the total number of available GPUs - we split the data in the h5 file for each device.

def load_h5_hvd(h5_filename, rank=0, size=1): f = h5py.File(h5_filename, 'r') data = f['data'][rank::size] label = f['pid'][rank::size] seg = f['label'][rank::size] print("loaded {0} events".format(len(data))) return (data, label, seg)

Main training function. Modifications are: - import packages again. Otherwise single devices may cause problems. - initialise the horovod runner - copy the files from local to each GPU such that they are available for horovod. - scale the learning rate by the number of available devices. - add a horovod specific distributed optimizer. - use hooks for checkpoint saving ever 1000 steps. - switch from a normal TF training session to a monitored training session.

def train_hvd(): import horovod.tensorflow as hvd import tensorflow as tf import shutil # do all the imports here again in order for hvd to work nicely import horovod.tensorflow as hvd import argparse, shlex from datetime import datetime import numpy as np import tensorflow as tf import socket import os import sys from sklearn.cluster import KMeans from tqdm import tqdm np.set_printoptions(edgeitems=1000) from scipy.optimize import linear_sum_assignment BASE_DIR = os.path.join(os.getcwd(), '06_LHC','scripts') sys.path.append(BASE_DIR) sys.path.append(os.path.join(BASE_DIR, '..', 'models')) sys.path.append(os.path.join(BASE_DIR, '..', 'utils')) # HOROVOD: initialize Horovod. hvd.init() # HOROVOD: Copy files from local to each single GPU directory src = "/dbfs/FileStore/06_LHC" dst = os.path.join(os.getcwd(), '06_LHC') print("Copying data/files to local horovod folder...") shutil.copytree(src, dst) print("Done with copying!") import provider import gapnet_classify as MODEL with tf.Graph().as_default(): with tf.device('/gpu:' + str(GPU_INDEX)): #ADDED THIS TO RECORD ACCURACY epochs_acc = [] pointclouds_pl, labels_pl = MODEL.placeholder_inputs(BATCH_SIZE, NUM_POINT, NUM_FEAT) is_training_pl = tf.compat.v1.placeholder(tf.bool, shape=()) # Note the global_step=batch parameter to minimize. # That tells the optimizer to helpfully increment the 'batch' parameter for you every time it trains. batch = tf.Variable(0) alpha = tf.compat.v1.placeholder(dtype=tf.float32, shape=()) bn_decay = get_bn_decay(batch) tf.compat.v1.summary.scalar('bn_decay', bn_decay) print("--- Get model and loss") pred, max_pool = MODEL.get_model(pointclouds_pl, is_training=is_training_pl, bn_decay=bn_decay, num_class=NUM_CLASSES, weight_decay=FLAGS.wd, ) class_loss = MODEL.get_focal_loss(pred, labels_pl, NUM_CLASSES) mu = tf.Variable(tf.zeros(shape=(FLAGS.n_clusters, FLAGS.max_dim)), name="mu", trainable=True) # k centroids kmeans_loss, stack_dist = MODEL.get_loss_kmeans(max_pool, mu, FLAGS.max_dim, FLAGS.n_clusters, alpha) full_loss = kmeans_loss + class_loss print("--- Get training operator") # Get training operator learning_rate = get_learning_rate(batch) # HOROVOD: scale learning rate from hvd dependent number of processes (=hvd.size) tf.compat.v1.summary.scalar('learning_rate', learning_rate * hvd.size()) if OPTIMIZER == 'momentum': optimizer = tf.compat.v1.train.MomentumOptimizer(learning_rate * hvd.size(), momentum=MOMENTUM) elif OPTIMIZER == 'adam': optimizer = tf.compat.v1.train.AdamOptimizer(learning_rate * hvd.size()) # HOROVOD: add Horovod Distributed Optimizer optimizer = hvd.DistributedOptimizer(optimizer) global_step = tf.compat.v1.train.get_or_create_global_step() train_op_full = optimizer.minimize(full_loss, global_step=global_step) #batch) train_op = optimizer.minimize(class_loss, global_step=global_step) #batch) # Add ops to save and restore all the variables. saver = tf.compat.v1.train.Saver() # HOROVOD hooks = [ # Horovod: BroadcastGlobalVariablesHook broadcasts initial variable states # from rank 0 to all other processes. This is necessary to ensure consistent # initialization of all workers when training is started with random weights # or restored from a checkpoint. hvd.BroadcastGlobalVariablesHook(0), #checkpoint_dir_mod = checkpoint_dir if hvd.rank() == 0 else None tf.compat.v1.train.CheckpointSaverHook(checkpoint_dir=checkpoint_dir, checkpoint_basename='cluster.ckpt', save_steps=1_000 ), # this one basically prints every n steps the "step" and the "loss". Output is cleaner without # tf.compat.v1.train.LoggingTensorHook(tensors={'step': global_step, 'loss': full_loss}, every_n_iter=75), ] # Create a session config = tf.compat.v1.ConfigProto() config.gpu_options.allow_growth = True config.allow_soft_placement = True config.log_device_placement = False config.gpu_options.visible_device_list = str(hvd.local_rank()) # global variable initializer must be defined before session definition init_global_step = tf.compat.v1.global_variables_initializer() # MonitoredTrainingSession # takes care of session initialization, # restoring from a checkpoint, saving to a checkpoint, and closing when done # or an error occurs. #checkpoint_dir_mod = checkpoint_dir if hvd.rank() == 0 else None sess = tf.compat.v1.train.MonitoredTrainingSession(checkpoint_dir=checkpoint_dir, hooks=hooks, config=config) # get one batch_data from the training files in oder to inintialise the session train_idxs = np.arange(0, len(TRAIN_FILES)) current_file = os.path.join(os.getcwd(), '06_LHC', 'h5', TRAIN_FILES[train_idxs[0]]) current_data, current_label, current_cluster = load_h5_hvd(current_file, hvd.rank(), hvd.size()) batch_data, batch_label = get_batch(current_data, current_label, 0, BATCH_SIZE) # feed_dict = {pointclouds_pl: batch_data, labels_pl: batch_label, is_training_pl: False, alpha: 2 * (EPOCH_CNT - MAX_PRETRAIN + 1),} #NOT SO CLEAR THAT init_global_step IS NECESSARY. sess.run(init_global_step, feed_dict=feed_dict) # hels with merging: CHANGE THIS IF POSSIBLE sess.graph._unsafe_unfinalize() # Add summary writers merged = tf.compat.v1.summary.merge_all() train_writer = tf.compat.v1.summary.FileWriter(os.path.join(LOG_DIR, 'train'), sess.graph) test_writer = tf.compat.v1.summary.FileWriter(os.path.join(LOG_DIR, 'test'), sess.graph) # Init variables print("Total number of weights for the model: ", np.sum([np.prod(v.get_shape().as_list()) for v in tf.compat.v1.trainable_variables()])) ops = {'pointclouds_pl': pointclouds_pl, 'labels_pl': labels_pl, 'is_training_pl': is_training_pl, 'max_pool': max_pool, 'pred': pred, 'alpha': alpha, 'mu': mu, 'stack_dist': stack_dist, 'class_loss': class_loss, 'kmeans_loss': kmeans_loss, 'train_op': train_op, 'train_op_full': train_op_full, 'merged': merged, 'step': batch, 'learning_rate': learning_rate } for epoch in range(MAX_EPOCH): print('\n**** EPOCH %03d ****' % (epoch)) sys.stdout.flush() is_full_training = epoch > MAX_PRETRAIN max_pool = train_one_epoch(sess, ops, train_writer, hvd.rank(), hvd.size(), is_full_training) if epoch == MAX_PRETRAIN: centers = KMeans(n_clusters=FLAGS.n_clusters).fit(np.squeeze(max_pool)) centers = centers.cluster_centers_ sess.run(tf.compat.v1.assign(mu, centers)) #eval_one_epoch(sess, ops, test_writer, hvd.rank(), hvd.size(), is_full_training) #Added these lines to record accuracy epoch_acc = eval_one_epoch(sess, ops, test_writer, hvd.rank(), hvd.size(), is_full_training) epochs_acc.append(epoch_acc) """if is_full_training: save_path = saver.save(sess, os.path.join(LOG_DIR, 'cluster.ckpt')) else: save_path = saver.save(sess, os.path.join(LOG_DIR, 'model.ckpt'))""" #print("Model saved in file: %s" % save_path) return epochs_acc

Training utils.

def get_batch(data, label, start_idx, end_idx): batch_label = label[start_idx:end_idx] batch_data = data[start_idx:end_idx, :, :] return batch_data, batch_label def cluster_acc(y_true, y_pred): """ Calculate clustering accuracy. Require scikit-learn installed """ y_true = y_true.astype(np.int64) D = max(y_pred.max(), y_true.max()) + 1 w = np.zeros((D, D), dtype=np.int64) for i in range(y_pred.size): w[y_pred[i], y_true[i]] += 1 ind = linear_sum_assignment(w.max() - w) ind = np.asarray(ind) ind = np.transpose(ind) return sum([w[i, j] for i, j in ind]) * 1.0 / y_pred.size

One epoch training and evaluation functions: - the applicable horovod rank and size is fed into both functions. - use the rank and size to load the correct h5 data. - remove progress bars since progress bars from each device would overlap.

def train_one_epoch(sess, ops, train_writer, hvd_rank, hvd_size, is_full_training): """ ops: dict mapping from string to tf ops """ is_training = True train_idxs = np.arange(0, len(TRAIN_FILES)) acc = loss_sum = 0 y_pool = [] for fn in range(len(TRAIN_FILES)): # print('----' + str(fn) + '-----') current_file = os.path.join(os.getcwd(), '06_LHC', 'h5', TRAIN_FILES[train_idxs[fn]]) current_data, current_label, current_cluster = load_h5_hvd(current_file, hvd_rank, hvd_size) current_label = np.squeeze(current_label) file_size = current_data.shape[0] num_batches = file_size // BATCH_SIZE # num_batches = 5 print(str(datetime.now())) # initialise progress bar #process_desc = "TRAINING: Loss {:2.3e}" #progress_bar = tqdm(initial=0, leave=True, total=num_batches, # desc=process_desc.format(0), # position=0) for batch_idx in range(num_batches): start_idx = batch_idx * BATCH_SIZE end_idx = (batch_idx + 1) * BATCH_SIZE batch_data, batch_label = get_batch(current_data, current_label, start_idx, end_idx) cur_batch_size = end_idx - start_idx # print(batch_weight) feed_dict = {ops['pointclouds_pl']: batch_data, ops['labels_pl']: batch_label, ops['is_training_pl']: is_training, ops['alpha']: 2 * (EPOCH_CNT - MAX_PRETRAIN + 1),} if is_full_training: summary, step, _, loss_val, dist, lr = sess.run([ops['merged'], ops['step'], ops['train_op_full'], ops['kmeans_loss'], ops['stack_dist'], ops['learning_rate']], feed_dict=feed_dict) batch_cluster = np.array([np.where(r == 1)[0][0] for r in current_cluster[start_idx:end_idx]]) cluster_assign = np.zeros((cur_batch_size), dtype=int) for i in range(cur_batch_size): index_closest_cluster = np.argmin(dist[:, i]) cluster_assign[i] = index_closest_cluster acc += cluster_acc(batch_cluster, cluster_assign) else: summary, step, _, loss_val, max_pool, lr = sess.run([ops['merged'], ops['step'], ops['train_op'], ops['class_loss'], ops['max_pool'], ops['learning_rate']], feed_dict=feed_dict) if len(y_pool) == 0: y_pool = np.squeeze(max_pool) else: y_pool = np.concatenate((y_pool, np.squeeze(max_pool)), axis=0) loss_sum += np.mean(loss_val) #train_writer.add_summary(summary, step) if hvd_rank == 0: train_writer.add_summary(summary, step) # Update train bar #process_desc.format(loss_val) #progress_bar.update(1) #progress_bar.close() print('learning rate: %f' % (lr)) print('train mean loss: %f' % (loss_sum / float(num_batches))) #if is_full_training: print('train clustering accuracy: %f' % (acc / float(num_batches))) return y_pool def eval_one_epoch(sess, ops, test_writer, hvd_rank, hvd_size, is_full_training): """ ops: dict mapping from string to tf ops """ global EPOCH_CNT is_training = False test_idxs = np.arange(0, len(TEST_FILES)) # Test on all data: last batch might be smaller than BATCH_SIZE loss_sum = acc = 0 acc_kmeans = 0 for fn in range(len(TEST_FILES)): # print('----' + str(fn) + '-----') current_file = os.path.join(os.getcwd(), '06_LHC', 'h5', TEST_FILES[test_idxs[fn]]) current_data, current_label, current_cluster = load_h5_hvd(current_file, hvd_rank, hvd_size) current_label = np.squeeze(current_label) file_size = current_data.shape[0] num_batches = file_size // BATCH_SIZE """process_desc = "VALIDATION: Loss {:2.3e}" progress_bar = tqdm(initial=0, leave=True, total=num_batches, desc=process_desc.format(0), position=0)""" for batch_idx in range(num_batches): start_idx = batch_idx * BATCH_SIZE end_idx = (batch_idx + 1) * BATCH_SIZE batch_data, batch_label = get_batch(current_data, current_label, start_idx, end_idx) cur_batch_size = end_idx - start_idx feed_dict = {ops['pointclouds_pl']: batch_data, ops['is_training_pl']: is_training, ops['labels_pl']: batch_label, ops['alpha']: 2 * (EPOCH_CNT - MAX_PRETRAIN + 1),} if is_full_training: summary, step, loss_val, max_pool, dist, mu = sess.run([ops['merged'], ops['step'], ops['kmeans_loss'], ops['max_pool'], ops['stack_dist'], ops['mu']], feed_dict=feed_dict) batch_cluster = np.array([np.where(r == 1)[0][0] for r in current_cluster[start_idx:end_idx]]) cluster_assign = np.zeros((cur_batch_size), dtype=int) for i in range(cur_batch_size): index_closest_cluster = np.argmin(dist[:, i]) cluster_assign[i] = index_closest_cluster acc += cluster_acc(batch_cluster, cluster_assign) else: summary, step, loss_val = sess.run([ops['merged'], ops['step'], ops['class_loss']], feed_dict=feed_dict) #test_writer.add_summary(summary, step) if hvd_rank == 0: test_writer.add_summary(summary, step) loss_sum += np.mean(loss_val) """# Update train bar process_desc.format(loss_val) progress_bar.update(1)""" #progress_bar.close() total_loss = loss_sum * 1.0 / float(num_batches) print('test mean loss: %f' % (total_loss)) #if is_full_training: print('testing clustering accuracy: %f' % (acc / float(num_batches))) return acc/float(num_batches) EPOCH_CNT += 1

Run the training: - initialise the Horovod runner with np=2 GPUs. The cluster does not allow more GPUs - run the horovod runner with the given training function.

from sparkdl import HorovodRunner hr = HorovodRunner(np=2) epochs_acc=hr.run(train_hvd) plt.plot(epochs_acc) plt.ylabel('Validation accuracy') plt.xlabel('epochs') plt.show() plt.savefig('distributed.png')
print(epochs_acc)

Results: - Execution of the command for np=2 GPUs takes 3.39 hours. - Plot below show the validation accuracy vs epoch. - Note that we switch to the full loss after n=10 epochs. - We observe an improvement in the cluster validation set accuracy after around 50 epochs. - Highest cluster validation set accuracy lies at about 68%. - Output of the algorithm is the stored model.

The Standard Model

checkpoint_dir