ScaDaMaLe Course site and book

Distributed ensembles

Amanda Olmin, Amirhossein Ahmadian and Jakob Lindqvist

Video presentation

Python version: python 3.7

Library dependencies - PySpark - PyTorch - toolz - matplotlib

Introduction

In this project, we create a distributed ensemble of neural networks that we can train and make predictions with in a distributed fashion, and we also apply this model to the out-of-distribution detection problem [2] (detecting inputs that are highly dissimilar from the training data).

Why ensembles?

Ensembles of neural networks - often have better predictive performance than single ensemble members [1] - have shown to provide reliable uncertainty estimates

The latter quality is beneficial in itself but is especially useful when it comes to tasks such as out-of-distribution detection, where a model’s uncertainty estimates can be used to determine if a sample is in-distribution or not. We demonstrate this in the experiments below.

Distributed ensembles

In Spark, it is common to distribute data over several worker nodes. In this way, the same function is performed on several nodes on different parts of the data. The result from each node is then communicated and aggregated to a final function output. Similarily, we can train a neural network (a single ensemble member) in a distributed way by distributing the data that we use to train it. This can for example be done using the built-in MLP and MLPC classes in Pyspark [3]. However, this approach requires continuous communication between nodes to update model weights (possibly at every iteration) since every node keeps its own version of the model weights. The approach therefore scales badly as - the number of model parameters grow (more information to communicate between nodes) - when the complexity of the training algorithm increases, e.g. we wish to use a stochastic training algorithm

In this regard, the communication becomes a bottleneck. Asynchronous updating can reduce the amount of communication, but might also hurt model performance [4].

Considering that the ensemble members are independent models, they never need to communicate during the training phase. Hence, training ensemble members in a way that requires the otherwise independent training processes to integrate or synchronize, would cause unnecessary costs, for example since the training processes all need to communicate through the driver node. The same holds for prediction; no communication is needed between ensemble members except at the very end when the predictions are aggregated.

To avoid unnecessary communication, we distribute the ensemble members and train them on separate worker nodes such that we - are able to train several ensemble members in parallell (limited by the number of nodes in our cluster) and independently - avoid communication between worker nodes

To achieve this, we implement our own training processes below. In addition, we implement our own MLP class with the help of PyTorch. MLP objects and their training data are then distributed on worker nodes using Spark. This is not only to avoid distributing the training data over several nodes during training but also to package the ensemble members in a way that makes it possible for us to send them between the driver and the worker nodes prior to and at the end of training.

from random import randrange
import random
from pathlib import Path
# External libs added to cluster
from pyspark.mllib.random import RandomRDDs
from pyspark.ml.feature import StringIndexer, OneHotEncoder, StandardScaler, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.rdd import PipelinedRDD
from toolz.itertoolz import partition_all
from toolz.itertoolz import cons
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from matplotlib import pyplot as plt

Data

We introduce the functions that we use to load the data for the experiments that we conduct. We split the available training data between ensemble members using sampling with or without replacement. The number of training data points that we can distribute to each ensemble member is only limited by the memory available to each worker node.

TOY DATA

We create a function for generating data consisting of Gaussian clusters. The function takes as input, user defined means and variances for each cluster in the data as well as the total number of observations and a vector of intended class proportions. It also comes with an option to split the final RDD into train and test sets.

We will use this data later on to demonstrate our distributed ensembles framework as well as to generate out-of-distribution data for OOD detection.

def create_gaussian_RDD(means, variances, num_observations, class_proportions, train_test_split=False):
  """Create toy Gaussian classification data
  Let C := number of clusters/classes and P := number of data features

  Args:
    means (np.array[float]): mean vector of shape (C, P)
    variances (np.array[float]): vector of variances, shape (C, P)
    num_observations (scalar[int]): the total number of observations in the final data set
    class_proportions (np.array[float]): vector of class proportions, length C
    train_test_split: whether to split the data into train/test sets or not

  Returns:
    Gaussian data, RDD of tuples (list(features), int(label))
  """

  assert means.shape[0] == variances.shape[0]
  assert means.shape[1] == variances.shape[1]
  assert class_proportions.sum() == 1

  num_classes = means.shape[0]
  num_features = means.shape[1]

  data_rdd = sc.emptyRDD()
  for k in range(num_classes):

    # Generate standard normal data
    class_size = int(num_observations * class_proportions[k])
    class_rdd = RandomRDDs.normalVectorRDD(sc, numRows=class_size, numCols=num_features, numPartitions=1) #, seed=123)

    # Map to true distribution
    class_rdd_transformed = class_rdd.map(lambda v: means[k, :] + (variances[k, :]**0.5) * v)

    # Add labels
    class_rdd_w_label = class_rdd_transformed.map(lambda v: (v, k))

    data_rdd = data_rdd.union(class_rdd_w_label)

  # We will shuffle and repartition the data
  num_partitions = 10
  shuffled_rdd =  data_rdd.sortBy(lambda v: randrange(num_observations)).repartition(num_partitions)
  final_rdd = shuffled_rdd.map(tuple).map(lambda v: (list(v[0]), int(v[1])))

  if train_test_split:
    train_rdd, test_rdd = final_rdd.randomSplit(weights=[0.8, 0.2], seed=12)
    final_rdd = (train_rdd, test_rdd)

  return final_rdd                                   

FIRE WALL DATA

We will also consider some real data. The dataset that we will use consits of traffic from a firewall tracking record. We have accessed it through the UCI Machine Learning repository [4]: https://archive.ics.uci.edu/ml/datasets/Internet+Firewall+Data.

  • Number of data points: 65,532.

  • Number of features: 11 (all numerical).

  • Number of classes: 4 (allow/deny/drop/reset both).

def load_firewall_data(train_test_split=False,file_location="/FileStore/shared_uploads/amanda.olmin@liu.se/fire_wall_data.csv"):
  """Load and preprocess firewall data
  Args:
     file_location: file location from which to load the data
     train_test_split: whether to split the data into train/test sets or not

  Returns:
     Firewall data, RDD of tuples (list(features), int(label))
  """

  # File location and type
  # file_location = "/FileStore/shared_uploads/amanda.olmin@liu.se/fire_wall_data.csv"
  file_type = "csv"

  # CSV options
  infer_schema = "true"
  first_row_is_header = "true"
  delimiter = ","

  # Load the data from file
  df = spark.read.format(file_type) \
      .option("inferSchema", infer_schema) \
      .option("header", first_row_is_header) \
      .option("sep", delimiter) \
      .load(file_location)

  # Preprocess data
  col_num = ["Source Port", "Destination Port", "NAT Source Port", "NAT Destination Port", "Bytes", "Bytes Sent", "Bytes Received", "Packets", "Elapsed Time (sec)", "pkts_sent", "pkts_received"]

  # Index qualitative variable
  indexer = StringIndexer(inputCol = "Action", outputCol = "label")

  # Scale numerical features
  va = VectorAssembler(inputCols = col_num, outputCol = "numerical_features")
  scaler = StandardScaler(inputCol = "numerical_features", outputCol = "features")

  # Apply pipeline
  pipeline = Pipeline(stages=[indexer, va, scaler])
  final_df = pipeline.fit(df).transform(df).select("features", "label")

  # Convert to RDD
  final_rdd = final_df.rdd.map(tuple).map(lambda v: (list(v[0]), int(v[1])))

  if train_test_split:
    train_rdd, test_rdd = final_rdd.randomSplit(weights=[0.8, 0.2], seed=12)
    final_rdd = (train_rdd, test_rdd)

  return final_rdd

** RDD partition **

Below, we provide a function that partitions an RDD. We will use it to distribute data between ensemble members.

def get_partitioned_rdd(input_rdd, partition_size=1000):
  """Partition RDD

  Args:
    input_rdd: RDD to be partitioned

  Returns:
    Partitioned RDD
  """

  return input_rdd.mapPartitions(lambda partition: partition_all(partition_size, partition))

PyTorch Model

To implement the ensemble members, we first write an ordinary feedforward (MLP) neural network class using PyTorch, which has a Softmax output and Tanh activation functions. The number of layers and neurons in each layer is passed as an argument to the constructor of this class. Moreover, any instance of this network class (parameters and structure) can be easily stored in and loaded from a state dictionary (state_dict) object.

#Feedforward network for classification
class MLP(nn.Module):

  def __init__(self,shape):
    #shape: number of neurons in each layer (including the input and output layers)
    super(MLP,self).__init__()

    self.units=nn.ModuleList()
    for i in range(len(shape)-1):
      self.units.append(nn.Linear(shape[i],shape[i+1]))

    self._shape=shape
    self._nlayers=len(shape)

  def forward(self,x):

    y=x

    for i,layer in enumerate(self.units):
      if i<self._nlayers-2:
        y=nn.functional.tanh(layer(y))
      else:
        y=nn.functional.softmax(layer(y),dim=1)

    return y

  #constructing an instance of this class based on a state dictionary (network parameters)
  @staticmethod
  def from_state_dict(state_dict):
    net_shape = MLP.shape_from_state_dict(state_dict)
    net=MLP(net_shape)
    net.load_state_dict(state_dict)
    return net

  @staticmethod
  def shape_from_state_dict(state_dict):
    """Infer MLP layer shapes from state_dict"""
    iter_ = iter(state_dict.items())
    _, input_size = next(iter_)
    bias_tensors = filter(lambda key_val: key_val[0].find("bias") != -1, iter_)
    shapes = map(lambda key_val: key_val[1].size(0), bias_tensors)
    return list(cons(input_size.size(1), shapes))

Functions for training and testing networks

Here we have some functions that are used to train/test each individual network in the ensemble. The Train function takes the initial weights of a network, trains it on a set of input-taraget data based on stochastic gradient optimization and cross-entropy loss, and returns the state dictionary of the trained network. PyTorch's backpropagation and optimization tools are used to implement this function as usual. The Predict function simply takes the state dictionary corresponding to a network as well as a data point (or batch of data), and returns the output (probabilities) of the network at that point.

We note that Spark can automatically distribute these functions on the nodes, and thus writing them for a distributed ensemble is not basically different from a local setup.

#utility class for pytorch data loader
class DataSet(torch.utils.data.Dataset):
  def __init__(self, x, y):
    self.x = x
    self.y = y

  def __len__(self):
    return self.x.shape[0]

  def __getitem__(self, ind):
    x = self.x[ind]
    y = self.y[ind]

    return x, y

#The main training function (is run on worker nodes)
def Train(net_params,x,y):
  #net_params: initial parameters of the feedforward network (state dictionary)
  #x,y: training data (pytorch tensors)

  n_epochs=100
  batchsize=10

  net=MLP.from_state_dict(net_params)

  train_data = DataSet(x, y)
  dataloader = torch.utils.data.DataLoader(train_data, batch_size=batchsize)

  opt=optim.Adam(net.parameters())
  loss=nn.CrossEntropyLoss()

  for i in range(n_epochs):
    for batch in dataloader:

      opt.zero_grad()

      xb,yb=batch

      yhat=net(xb)
      err=loss(yhat,yb)
      err.backward()

      opt.step()

  err=loss(net(x),y)
  lossval=float(err.detach().numpy())

  #returns parameters of the trained network and loss
  return (net.state_dict(),lossval)

#Get the output of a feedforward network given an input tensor
def Predict(net_params, x):
  #net_params: parameters (state dictionary) of the network
  #x: input (pytorch tensor)
  net = MLP.from_state_dict(net_params)
  net.eval()
  return net(x)

#Reshaping and converting the tuples stored in a dataset RDD into input and target tensors
def Totensor(d):
  #d: the dataset (list of tuples)

  x=[v[0] for v in d]
  y=[v[1] for v in d]
  x=torch.tensor(x,dtype=torch.float)
  y=torch.tensor(y,dtype=torch.long)

  return (x,y)

def make_prediction(state_dict, x):
  print(state_dict)
  return Predict(state_dict, x)

Creating an ensemble of networks, and training them in parallel

We now use the class and functions defined above to create an ensemble of feedforward neural networks, and train it in a distributed fashion, where each network is trained on a single worker independently from the other ones. Firstly, several networks are initialized using the MLP class with a random number of hidden layers and neurons, and random initial weights. Using randomness helps to increase the diversity in the ensemble (without which, the outputs of ensemble members could get correlated with each other).

As mentioned before, the training data is partioned into equal size parts, and each of the networks in the ensemble is assigned one part. Since the dataset is assumed to be an RDD (to let it be huge), an iterator object is needed which collects one part of the data RDD (transfers it from the cloud to the driver node) in each call. Note that we here implicitly assume that each part of the data (but not the whole dataset) fits into the memory of a single machine.

After constructing the network object and loading data for each member of the ensemble, the state dictionary of the network and its corresponding training data are packed into a tuple, and appended to a list. The list of state_dict/data tuples is then parallelized to obtain an Spark RDD. We found out that it is difficult to directly put the PyTorch neural network objects in an RDD, apparently becasue Spark does not know by default how to encode these objects and transfer them between nodes. Therefore, we use the state dictionary instead, which contains all the necessary information about a network.

Finally, the network training function (Train defined above) is applied to each element of the model/data RDD, in the form of a map operation. This tells Spark to run the function on each element in parallel (on worker machines) independently.

def train_ensemble(n_models, inputdims, nclasses, max_layers, min_neurons, max_neurons, data_iterator):
  """Constructing and training a distributed ensemble of feedforward networks

    Args:
      n_models: number of the ensemble memebrs
      inputdims: number of features dimesnions
      nclasses: number of the classes
      max_layers: maximum allowed number of hidden layers for the networks
      min_neurons,max_neurons: the valid range for the number of neurons in each hidden layer
      data_iterator: a Python iterator over the parts of the training data (one part per each member of the ensemble)

    Returns: a list of state dictionaries of the trained networks
  """

  # initialization
  model_data=[] # pairs of model parameters and their training data
  for i in range(n_models):
    # pick random number of hidden layers and neurons for each network
    nhidden=random.randint(1, max_layers)
    shape=[inputdims]
    for k in range(nhidden):
      shape.append(random.randint(min_neurons, max_neurons))
    shape.append(nclasses)

    net=MLP(shape)

    #fetch the next part of data
    d=next(data_iterator)
    x=d[0]
    y=d[1]

    model_data.append((net.state_dict(),x,y))

  # distribute the array
  model_data_par= sc.parallelize(model_data)
  # execute the train function on the worker nodes
  models_trained = model_data_par.map(lambda t: Train(*t))

  #transfer the trained models and loss values to the driver
  models_trained=models_trained.collect()

  #print the training loss values
  print("training losses:")
  print([v[1] for v in models_trained])

  # return the state dicts
  return [v[0] for v in models_trained]

** Utility functions for saving and loading the ensemble model from the disk **

def save_models_distr(models, dir_, model_names=None):
  dir_ = Path(dir_)
  dir_.mkdir(exist_ok=True, parents=True)

  if model_names is None:
    model_names = [f"m{idx}.pt" for idx in range(0, models.count())]
  assert len(model_names) == models.count()
  model_paths = [dir_ / model_name for model_name in model_names]
  model_paths = sc.parallelize(model_paths)
  models.zip(model_paths).foreach(lambda dict_and_path: torch.save(*dict_and_path))

def save_models(models, dir_, model_names=None):
  dir_ = Path(dir_)
  dir_.mkdir(exist_ok=True, parents=True)

  if model_names is None:
    model_names = [f"m{idx}.pt" for idx in range(0, len(models))]
  assert len(model_names) == len(models)
  model_paths = [dir_ / model_name for model_name in model_names]
  for state_dict, path in zip(models, model_paths):
    torch.save(state_dict, path)

def load_models(model_names, dir_):
  dir_ = Path(dir_)
  model_paths = [dir_ / model_name for model_name in model_names]
  state_dicts = [torch.load(path) for path in model_paths]
  return sc.parallelize(state_dicts)

Distributed ensembles prediction API

From the training process we get a distributed iterator models over the trained models. (NB. the train_ensemble function actually collects the trained models for convenience.) Internally this is an iterator over torch.state_dicts holding the param's of each model respectively.

There are different ways in which we can do predictions:

  • Distributed predictions with ens_preds(models, test_x), which maps the combined model and test data to predictions for each data point. This iterator can be collected to a list of the predictions for each ensemble member, or further processed in a distributed and functional manner. This is the most flexible variant since it preserves the prediction of every member on every datapoint. It is also the most expensive (if we do collect all the data).

  • Reduced/aggregated predictions with ens_preds_reduced(models, test_x, red_fn). Working with an ensemble, we are often concerned with some aggregate of the members' predictions, eg., the average prediction. For this we provide an reducing version of ens_preds where the user need only supply the reduce function red_fn, describing how to combine the predictions of two ensemble members. For instance, if you would like to get the average probability vector of a classifier ensemble for every data point you would use:

    avg_prob_vecs = ens_preds_reduced(models, x, lambda x, y: (x+y)/2)
    

    Internally, this simply calls .reduce(red_fn) on the iterator returned from ens_preds. This is merely a convenience function.

  • Metrics per ensemble member. If the number of test samples is large, we will collect a lot of predictions over the cluster. If we know that we only want an aggregate metric for each member across the whole test data, we use the ens_metrics method for aggregation on the worker nodes.

    avg_acc_per_member = ens_metrics(models, test_input, test_true_labels, <list of metric functions>)
    

    Note that each metric function must be on the form: f: R^(N x D_x) x R^(N) --> T

def ens_preds(models, test_x):
  """Distributed ensemble predictions
  Takes a set of models and test data and makes distributed predictions
  Let N := number of data points and D_x := the dimension of a single datapoint x

  Args:
    models (list[state_dict]): set of models represented as a list (state_dict, shape)
    test_x (torch.Tensor): Tensor of size (N, D_x)

  Returns:
    Distributed iterator over the predictions. E.g. an iterator over probability vectors in the case of a classifier ens.
  """
  pred_iter = _pred_models_iter(models, test_x)
  return pred_iter.map(lambda t: Predict(*t))

def ens_preds_reduced(models, test_x, red_fn):
  """Reduced/aggregated ensemble predictions
  Takes a set of models and test data and makes distributed predictions and reduces them with a provided `red_fn`
  Let N := number of data points and D_x := the dimension of a single datapoint x

  Args:
    models (list[state_dict]): set of models represented as a list (state_dict, shape)
    test_x (torch.Tensor): Tensor of size (N, D_x)
    red_fn function: f: R^D_x x R^D_x --> R^D_x

  Returns:
    Single reduced/aggregated prediction of the whole ensemble
  """
  return ens_preds(models, test_x).reduce(red_fn)

def ens_metrics(models, test_x, test_y, metrics):
  """Distributed ensemble metrics
  Takes a set of models and test data, predicts probability vectors and calculates the provided metrics
  given true labels `test_y`
  Let N := number of data points and D_x := the dimension of a single datapoint x

  Args:
    models (list[state_dict]): set of models represented as a list (state_dict, shape)
    test_x (torch.Tensor): Tensor of size (N, D_x)
    test_y (torch.Tensor): Tensor of size (N). NB: hard labels
    metrics (list[functions]): List of functions where each funcion f: R^(N x D_x) x R^(N) --> T, where T is a generic output type.
  """
  return ens_preds(models, test_x).map(lambda prob_vecs: [metric(prob_vecs, test_y) for metric in metrics])


def _pred_models_iter(models, test_x):
  """Helper function to generate a distributed iterator over models and test data
  NB: the same `test_x` is given to all elements in the iterator

  Args:
    models (list[state_dict]): set of models represented as a list (state_dict, shape)
    test_x (torch.Tensor): Tensor of size (N, D_x)
  """
  if isinstance(models, PipelinedRDD):
    return models.map(lambda model: (model, test_x))
  elif isinstance(models, list):
    models_and_data = [(params, test_x) for params in models]
    return sc.parallelize(models_and_data)
  else:
    raise TypeError("'models' must be an RDD or a list")

def avg_accuracy(prob_vecs, labels):
  """Example metrics function: average accuracy
  Let N := number of data points and C := the number of classes

  Args:
    prob_vecs (torch.Tensor): Tensor of size (N, C)
    labels (torch.Tensor): Tensor of size (N), hard labels, with classes corresponding to indices 0, ..., C-1

  Returns:
    torch.Tensor: Tensor of size (N), average accuracy over all datapoints.
  """
  hard_preds = torch.argmax(prob_vecs, 1)
  return (hard_preds == labels).float().mean()

def entropy(prob_vecs):
  return - (prob_vecs * torch.log(prob_vecs)).sum(1)

def avg_entropy(prob_vec_1, prob_vec_2):
  e_1 = entropy(prob_vec_1)
  e_2 = entropy(prob_vec_2)
  return (e_1 + e_2)

Application example: Distributed predictions

Let's first demonstrate our distributed ensembles with a simple toy example. We'll create gaussian toy data with three slightly overlapping clusters:

means = np.array([(0, 0), (1,0), (1, 1)])
variances = 0.1 * np.ones((3, 2))
num_observations = 5000
class_proportions = np.array([1/3, 1/3, 1/3])
data_train, data_test = create_gaussian_RDD(means, variances, num_observations, class_proportions, train_test_split=True)

Now we'll create and distributedly train a classifier ensemble and save it to file. This is not necessary, we can -- in fact -- make predictions with the trained ensemble without ever collecting it from the worker nodes, but in most use cases it will be convenient to save the ensemble on disk.

data_iterator=get_partitioned_rdd(data_train).map(Totensor).toLocalIterator()
n_models=5 # ensemble size
inputdims=2 # features dimensions
nclasses=3 # number of classes
max_layers=2
min_neurons=2
max_neurons=5

models_trained = train_ensemble(n_models, inputdims, nclasses, max_layers, min_neurons, max_neurons, data_iterator)
saved_models_dir = Path("saved_models/gaussian")
save_models(models_trained, saved_models_dir)
training losses:
[0.6597320437431335, 0.6314507126808167, 0.6506103277206421, 0.6424266695976257, 0.657072901725769]

Making distributed predictions

With the trained ensemble we can make predictions and calculate metrics, all in a distributed manner.

test_xx, test_yy = Totensor(data_test.collect())
model_names = [f"m{idx}.pt" for idx in range(n_models)]
models = load_models(model_names, saved_models_dir).collect()
avg_prob_vecs = ens_preds_reduced(models, test_xx, lambda x, y: (x+y)/2) # (A single) Average prob. vec for all data points.
avg_acc = ens_metrics(models, test_xx, test_yy, [avg_accuracy]).collect() # Average acc. for each ens. over all data points

print(f"Average accuracy for each ensemble member: {[acc[0].item() for acc in avg_acc]}")
print(f"Average accuracy for the whole ensemble: {avg_accuracy(avg_prob_vecs, test_yy).item()}")
Average accuracy for each ensemble member: [0.9240759015083313, 0.9240759015083313, 0.9160839319229126, 0.9110888838768005, 0.9220778942108154]
Average accuracy for the whole ensemble: 0.9210789203643799

We can also make use of the uncertainty description provided by the ensemble. We'll plot the test data, each point coloured the predicted distribution, which will illustrate the certain predictions with distinct colur and uncertain with muddied colours.

preds = avg_prob_vecs.detach().numpy()
hard_preds = avg_prob_vecs.argmax(1).detach().numpy()
every_nth = 5
train_xx, train_yy = Totensor(data_train.collect())

(fig, (ax_1, ax_2)) = plt.subplots(1, 2)
# For the train data we use the true labels to simulate a completely certain prediction.
color_map = {0: [1, 0 ,0], 1: [0, 1, 0], 2: [0, 0, 1]}
ax_1.scatter(train_xx[:, 0], train_xx[:, 1], c=[color_map[class_.item()] for class_ in train_yy], label="Train")
ax_2.scatter(test_xx[::every_nth, 0], test_xx[::every_nth, 1], c=preds[::every_nth], label="Test")
ax_1.set_title("Train")
ax_2.set_title("Test")
plt.show()

Application example: Out of distribution detection

Our distributed ensemble can be used for out of distribution (OOD) detection. A simple way is to measure the entropy of the combined ensemble prediction; high entropy signals weird data, not seen in the training distribution.

"Real world" out of distribution data can be hard to come by, but a typical example would be images in different contexts. E.g. scenic vistas or pathology scans may share the same feature space but have very different distribution. For the data we have collected, no such OOD set exists, so we will showcase it with an OOD set of gaussian noise. Of course, noise that is very far from the in distribution (ID) data will saturate the classifiers softmax for one element, actually yielding very confident, low entropy, nonsense predictions.

Regardless, let's see how to do this with the distributed ensemble. First, we train it and again, save the trained parameters to file

data_train, data_test = load_firewall_data(True)
data_iterator=get_partitioned_rdd(data_train).map(Totensor).toLocalIterator()

n_models=10
models_trained=train_ensemble(n_models,
                              inputdims=11,
                              nclasses=4,
                              max_layers=4,
                              min_neurons=5,
                              max_neurons=15,
                              data_iterator=data_iterator)
saved_models_dir = Path("saved_models/firewall")
save_models(models_trained, saved_models_dir)
training losses:
[0.7487057447433472, 0.750811755657196, 0.7578539252281189, 0.7506877183914185, 0.7623474597930908, 0.7521470785140991, 0.7517384886741638, 0.7481321692466736, 0.750828742980957, 0.7547193765640259]
def gen_ood_data(test_x, num_samples):
  num_test_samples, dim_x = test_x.size()
  random_mean = np.random.rand(dim_x).reshape(1, dim_x)
  random_cov = np.random.rand(dim_x).reshape(1, dim_x) * 10
  ood_x, _ = Totensor(create_gaussian_RDD(random_mean, random_cov, num_test_samples, np.array([1.0]), train_test_split=False).collect())
  return ood_x


data = data_test.collect()
batch_size = -1
batch = data[0:batch_size]
test_xx, test_yy = Totensor(batch)
ood_x = gen_ood_data(test_xx, batch_size)

models_p = load_models(model_names, saved_models_dir).collect()

# We can either calculate the average entropy of the ensemble members
avg_entropy_id = ens_preds(models_p, test_xx).map(entropy).reduce(lambda x, y: (x+y)/2).detach().numpy()
avg_entropy_ood = ens_preds(models_p, ood_x).map(entropy).reduce(lambda x, y: (x+y)/2).detach().numpy()

# ... or we the entropy of the average ensemble prediction.
entropy_avg_id = entropy(ens_preds_reduced(models_p, test_xx, lambda x, y: (x+y)/2)).detach().numpy()
entropy_avg_ood = entropy(ens_preds_reduced(models_p, ood_x, lambda x, y: (x+y)/2)).detach().numpy()

# Set entropy measure
entropy_id = avg_entropy_id
entropy_ood = avg_entropy_ood

Comparison of the entropy of the ensemble classifier on in-distribution and OOD data

def entropy_hist(id_, ood, n_bins, upper_x_bound):
  (fig, (ax_1, ax_2)) = plt.subplots(2, 1)
  _plot_hist(ax_1, id_, n_bins, "ID", "b", upper_x_bound)
  _plot_hist(ax_2, ood, n_bins, "OOD", "r", upper_x_bound)
  fig.suptitle("Entropy histogram")
  ax_2.set_xlabel("entropy")
  plt.show()

def _plot_hist(ax, counts, n_bins, label, color, upper_x_bound):
    ax.hist(counts, bins=n_bins, label=label, color=color, density=True)
    ax.set_xbound(lower = 0.0, upper = upper_x_bound)
    ax.set_ylabel("rel freq")
    ax.legend()

n_bins = 100
entropy_bound = 0.15
entropy_hist(entropy_id, entropy_ood, n_bins, entropy_bound)

Evaluation of the OOD detection in terms of ROC curve and area under this curve (AUROC)

def is_ood(entropies, cut_off_entropy):
  return entropies > cut_off_entropy

def fpr_and_tpr(id_, ood, res):
  max_entropy = max(id_.max(), ood.max())
  # max_entropy = id_.max()
  thresholds = np.arange(0.0, max_entropy, max_entropy / res)
  roc = np.array([(fpr(id_, th), tpr(ood, th)) for th in thresholds])
  roc = roc[roc[:,0].argsort()]
  fprs, tprs = (roc[:, 0], roc[:, 1])
  return fprs, tprs

def fpr(id_, th):
  id_pred = is_ood(id_, th)
  fp = id_pred.sum()
  tn = id_pred.shape[0] - fp
  return fp / (tn + fp)

def tpr(ood, th):
  ood_pred = is_ood(ood, th)
  tp = ood_pred.sum()
  fn = ood_pred.shape[0] - tp
  return tp / (tp + fn)

fpr, tpr = fpr_and_tpr(avg_entropy_id, avg_entropy_ood, res = 100)
(fig, ax) = plt.subplots()

ax.plot(fpr, tpr)
ax.set_xlabel("FPR")
ax.set_ylabel("TPR")
ax.set_title("ROC")

print(f"AUROC: {np.trapz(tpr, fpr)}")
AUROC: 0.6480554090292237

References

[1] Lakshminarayanan, B., Pritzel, A., & Blundell, C. (2017). Simple and scalable predictive uncertainty estimation using deep ensembles. In Advances in neural information processing systems (pp. 6402-6413).

[2] Ovadia, Y., Fertig, E., Ren, J., Nado, Z., Sculley, D., Nowozin, S., ... & Snoek, J. (2019). Can you trust your model's uncertainty? Evaluating predictive uncertainty under dataset shift. In Advances in Neural Information Processing Systems (pp. 13991-14002).

[3] Apache Spark. (2021, 01, 11). Classification and Regression [https://spark.apache.org/docs/latest/ml-classification-regression.html].

[4] Chen, J., Pan, X., Monga, R., Bengio, S., & Jozefowicz, R. (2016). Revisiting distributed synchronous SGD. arXiv preprint arXiv:1604.00981.

[5] Dua, D. and Graff, C. (2019). UCI Machine Learning Repository [http://archive.ics.uci.edu/ml]. Irvine, CA: University of California, School of Information and Computer Science.