Our implementation

We packaged our implementation to deal with serialization issues of Spark. Therefore, we show the implementation in markdown cells as the code in the notebooks is quite short and incomprehensible.

Optimization problem

We first define a helper function that projects points from the GP search space \([0,1]^D\) to the actual support of the function \(f\).

from botorch.utils.transforms import unnormalize

def eval_objective(x, fun):
    """This is a helper function we use to unnormalize a point from [0, 1]^D and evaluate it"""
    return fun(unnormalize(x, fun.bounds))

We define a generic class for optimization problems.

from abc import ABC, abstractmethod
import torch
import numpy as np

class OptimizationProblem(ABC):
    """
    This is an abstract class for generic optimization problems we consider
    """

    @abstractmethod
    def __init__(self, dim: int):
        """
        implemented by subclasses
        """
        raise NotImplementedError()

    @abstractmethod
    def __call__(self, x: torch.Tensor):
        """
        implemented by subclasses
        """
        raise NotImplementedError()

    @abstractmethod
    def lb(self) -> np.ndarray:
        """
        returns the lower bound of the problem
        """
        raise NotImplementedError()

    @abstractmethod
    def ub(self) -> np.ndarray:
        """
        returns the upper bound of the problem
        """
        raise NotImplementedError()

    @abstractmethod
    def dim(self) -> int:
        """
        returns the dimensionality of the problem
        """
        raise NotImplementedError()

We define the Ackley and Griewank functions as a subclass for OptimizationProblem:

from botorch.test_functions import Ackley as _Ackley

class Ackley(OptimizationProblem):

    def __init__(self, dim: int):
        self._dim = dim
        self._ackley = _Ackley(dim=dim, negate=True)

    def __call__(self, x: torch.Tensor):
        return eval_objective(x, self._ackley)

    def lb(self) -> np.ndarray:
        return self._ackley.bounds[0]

    def ub(self) -> np.ndarray:
        return self._ackley.bounds[1]

    def dim(self) -> int:
        return self._dim
      
      
class Griewank(OptimizationProblem):

    def __init__(self, dim: int):
        self._dim = dim
        self._griewank = _Griewank(dim=dim, negate=True)
        self._name = f'Griewank-{dim}'

    def __call__(self, x: torch.Tensor):
        return eval_objective(x, self._griewank)

    def lb(self) -> np.ndarray:
        return self._griewank.bounds[0]

    def ub(self) -> np.ndarray:
        return self._griewank.bounds[1]

    def dim(self) -> int:
        return self._dim

TuRBO state

TuRBO is an algorithm for scalable high-dimensional BO. We first define the state of a TuRBO instance:

@dataclass
class TurboState:
    dim: int # dimensionality of the search space
    batch_size: int # number of parallel function evaluations
    length: float = 0.8 # initial TR base length
    length_min: float = 0.5 ** 7 # min TR base length
    length_max: float = 1.6 # max TR base length
    failure_counter: int = 0 # number of times we did not make progress in optimizing the function
    failure_tolerance: int = float("nan")  # Note: Post-initialized, after more than failure_tolerance failures, we shrink the TR
    success_counter: int = 0 # number of times we made progress in optimizing the function
    success_tolerance: int = 10  # after that many successes, we increase the TR size
    best_value: float = -float("inf") # best value observed
    restart_triggered: bool = False # whether we want to restart this TR

    def __post_init__(self):
        # the failure tolerance increases with the dimensionality of the problem
        # and decreases with batch size as batches correspond to parallel
        # function evaluations
        self.failure_tolerance = math.ceil(
            max([4.0 / self.batch_size, float(self.dim) / self.batch_size])
        )

Depending on the \(y\)-value of the next point evaluated, we want to update the state;

def update_state(state, y_next):
    # if we made progress in optimizing the function
    if max(y_next) > state.best_value + 1e-3 * math.fabs(state.best_value):
        state.success_counter += 1
        state.failure_counter = 0
    else:
        state.success_counter = 0
        state.failure_counter += 1
  
    # if we made state.success_tolerance many progresses
    if state.success_counter == state.success_tolerance:  # Expand trust region
        state.length = min(2.0 * state.length, state.length_max)
        state.success_counter = 0
    elif state.failure_counter == state.failure_tolerance:  # Shrink trust region
        state.length /= 2.0
        state.failure_counter = 0

    state.best_value = max(state.best_value, max(y_next).item())
    if state.length < state.length_min:
        state.restart_triggered = True
    return state

Next, we define the optimize function of a TurboInstance:

def optimize(self):
        # create a number of initial points by a sobol sequence
        x_init = self.get_initial_points()
        # evaluate function for initial points
        y_init = torch.tensor([self.function(x) for x in x_init])

        # maintain tensors of all observations
        self.X = torch.cat((self.X, x_init), dim=0)
        self.y = torch.cat((self.y, y_init))

        # while the trust region isnt too smalle
        while not self.state.restart_triggered:
            # normalize function values
            train_y = (self.y - self.y.mean()) / self.y.std()
            
            # get model for training points (pass points if no deep kernel used, otherwise use deep kernel)
            model = self.model(
                self.X, train_y.unsqueeze(-1), **self.model_kwargs) if not hasattr(self.model,
                                                                                   'feature_extractor') else self.model

            with gpytorch.settings.max_cholesky_size(float("inf")):
                # Fit the model if not in deep kernel mode
                if not hasattr(self.model, 'feature_extractor'):
                    mll = self.mll_opt(model.likelihood, model)
                    fit_gpytorch_torch(mll, options={'disp': False})

                # Create a batch
                x_next = generate_batch(
                    state=self.state,
                    model=model,
                    X=self.X,
                    Y=train_y,
                    batch_size=self.batch_size,
                    n_candidates=self.n_candidates,
                    num_restarts=self.num_restarts,
                    raw_samples=self.raw_samples,
                    acqf="ts",
                )
                
                # evaluate function for new points
                y_next = torch.tensor([self.function(x) for x in x_next])
              
                # update state based on new function values
                self.state = update_state(self.state, y_next)

                # append to local observations
                self.X = torch.cat((self.X, x_next), dim=0)
                self.y = torch.cat((self.y, y_next), dim=0)
                print(
                    f"{self.identifier}: {len(self.X)}) Best value: {self.state.best_value:.3}, TR length: {self.state.length:.3f}"
                )
        self.has_run = True
        return self.X, self.y

The initial points are simply drawn from a scrambled Sobol sequence:

def get_initial_points(self):
        sobol = SobolEngine(dimension=self.dim, scramble=True, seed=self.seed)
        X_init = sobol.draw(n=self.n_init)
        return X_init

Deep Kernel Model

Next, we define the deep kernel as a kernel that prepends a neural network as a feature extractor. The NN is trained by maximum likelihood.

import gpytorch
import torch
from botorch.models.gpytorch import GPyTorchModel
from gpytorch.distributions import MultivariateNormal
from gpytorch.kernels import MaternKernel, ScaleKernel, GridInterpolationKernel
from gpytorch.means import ConstantMean
from gpytorch.models import ExactGP
from gpytorch.utils.grid import ScaleToBounds
from torch.nn import Linear, ReLU


class FeatureExtractor(torch.nn.Sequential):
    '''
    The feature extractor for the inputs
    '''
    def __init__(self, data_dim, layer_depths = None):
        super(FeatureExtractor, self).__init__()
        self.add_module('linear1', Linear(data_dim, 128))
        self.add_module('relu1', ReLU())
        self.add_module('linear2', Linear(128, 64))
        self.add_module('relu2', ReLU())
        self.add_module('linear3', Linear(64, 16))
        self.add_module('relu3', ReLU())
        self.add_module('linear5', Linear(16, 2))
 

class DeepKernelGPRegressor(GPyTorchModel, ExactGP):
    '''
    Custom GP model that first calls the feature extractor and passes the outputs to the actual kernel.
    '''

    # Freeze everything but the last layer when training locally?
    def __init__(self, train_x, train_y, likelihood, architecture):
        super(DeepKernelGPRegressor, self).__init__(train_x, train_y, likelihood)
        
        self.mean_module = ConstantMean()
        self.covar_module = GridInterpolationKernel(
            ScaleKernel(MaternKernel(ard_num_dims=2)),
            num_dims=2, grid_size=100
        )
        self.feature_extractor = architecture

        # This module will scale the NN features so that they're nice values
        self.scale_to_bounds = ScaleToBounds(-1., 1.)

    def __call__(self, *args, **kwargs):
        with gpytorch.settings.debug(False):
            return super().__call__(*args, **kwargs)

    def forward(self, x):
        # We're first putting our data through a deep net (feature extractor)
        projected_x = self.feature_extractor(x)
        projected_x = self.scale_to_bounds(projected_x)  # Make the NN values "nice"

        mean_x = self.mean_module(projected_x)
        covar_x = self.covar_module(projected_x)
        return MultivariateNormal(mean_x, covar_x)

Scalable Optimizer

The ScalableOptimizer maintains multiple TurboInstances:

import os
from logging import info
from typing import Optional, Dict

import torch
import tqdm
from botorch.models import SingleTaskGP
from gpytorch.likelihoods import GaussianLikelihood
from gpytorch.mlls import ExactMarginalLogLikelihood
from pyspark.sql import SparkSession
from torch import Tensor

from scalable_gps.dkl_model import FeatureExtractor, DeepKernelGPRegressor
from scalable_gps.objective import OptimizationProblem
from scalable_gps.turbo_state import TurboInstance
from scalable_gps.utils import save


class ScalableOptimizer:

    def __init__(self,
                 objective: OptimizationProblem,
                 outer_iterations: int = 10,
                 num_parallel: int = 2,
                 num_total_iterations: int = -1,
                 batch_size: int = 2,
                 turbo_kwargs: Optional[Dict] = {},
                 use_dkl: bool = True,
                 name: str = 'TurBO-DKL',
                 save_path: str = os.getcwd(),
                 seed: int = 0
                 ):
        # create spark session if it doesnt exist yet
        self.spark = SparkSession.builder.getOrCreate()
        self.sc = self.spark.sparkContext
        self.batch_size = batch_size
        self.num_total_iterations = num_total_iterations
        self.num_parallel = num_parallel
        self.batch_size = batch_size
        self.turbo_kwargs = turbo_kwargs
        self.objective = objective
        self.dim = objective.dim()
        self.outer_iterations = outer_iterations
        self.name = name
        self.use_dkl = use_dkl
        self.save_path = save_path
        self.seed = seed

    def optimize(self):
        # maintain tensors of all observations from all TurboInstances
        x_global = torch.empty((0, self.dim))
        y_global = torch.empty(0)
        
        # we start with no deep kernel
        deep_kernel_model = None
        for i_outer in range(self.outer_iterations):
            # for each outer iteration, define a list of TurboInstances
            info(f"Starting outer iteration {i_outer + 1}")
            self.turbo_processes = [
                TurboInstance(
                    batch_size=self.batch_size,
                    function=self.objective,
                    model=deep_kernel_model if deep_kernel_model is not None else SingleTaskGP,
                    identifier=f"TR-{i}",
                    seed=int(f"{self.seed}{i+1}"))
                for i in range(self.num_parallel)
            ]
            # parallelize the list
            turbos = self.sc.parallelize(self.turbo_processes)
            # define the optimization
            res = turbos.map(lambda t: t.optimize())
            # and collect the data
            data = res.collect()
            # append to global observations
            x_aggregated = torch.cat([Tensor(proc_data[0])
                                      for proc_data in data], axis=0)
            y_aggregated = torch.cat([Tensor(proc_data[1])
                                      for proc_data in data], axis=0)
            x_global = torch.cat((x_global, x_aggregated), dim=0)
            y_global = torch.cat((y_global, y_aggregated), dim=0)
            y_global_normalized = (y_global - y_global.mean()) / y_global.std()

            if self.use_dkl:
                # train deep kernel
                deep_kernel_model = self._train_deepkernel(
                    x_global, y_global_normalized)
                    
        # save optimization results
        save(x_global, y_global, self.save_path, self.name, self.objective.name())

    def _train_deepkernel(self, X: Tensor, y: Tensor, num_iters: int = 100):
        '''
        Train a feature extractor (neural net) for the inputs
        '''
        likelihood = GaussianLikelihood()
        feature_extractor = FeatureExtractor(self.objective.dim())
        dkl_model = DeepKernelGPRegressor(X, y, likelihood, feature_extractor)

        optimizer = torch.optim.Adam([
            {'params': dkl_model.feature_extractor.parameters()},
            {'params': dkl_model.covar_module.parameters()},
            {'params': dkl_model.mean_module.parameters()},
            {'params': dkl_model.likelihood.parameters()},
        ], lr=0.01)
        mll = ExactMarginalLogLikelihood(likelihood, dkl_model)
        iterator = tqdm.tqdm(range(num_iters))
        # on-the-fly SGD - should probably be implemented according to a paper on overfit in DKL
        for i in iterator:
            # Zero backprop gradients
            optimizer.zero_grad()
            # Get output from model
            output = dkl_model(X)
            # Calc loss and backprop derivatives
            loss = -mll(output, y)

            loss.backward()
            iterator.set_postfix(loss=loss.item())
            optimizer.step()
        return dkl_model