Our implementation
We packaged our implementation to deal with serialization issues of Spark. Therefore, we show the implementation in markdown cells as the code in the notebooks is quite short and incomprehensible.
Optimization problem
We first define a helper function that projects points from the GP search space \([0,1]^D\) to the actual support of the function \(f\).
from botorch.utils.transforms import unnormalize
def eval_objective(x, fun):
"""This is a helper function we use to unnormalize a point from [0, 1]^D and evaluate it"""
return fun(unnormalize(x, fun.bounds))
We define a generic class for optimization problems.
from abc import ABC, abstractmethod
import torch
import numpy as np
class OptimizationProblem(ABC):
"""
This is an abstract class for generic optimization problems we consider
"""
@abstractmethod
def __init__(self, dim: int):
"""
implemented by subclasses
"""
raise NotImplementedError()
@abstractmethod
def __call__(self, x: torch.Tensor):
"""
implemented by subclasses
"""
raise NotImplementedError()
@abstractmethod
def lb(self) -> np.ndarray:
"""
returns the lower bound of the problem
"""
raise NotImplementedError()
@abstractmethod
def ub(self) -> np.ndarray:
"""
returns the upper bound of the problem
"""
raise NotImplementedError()
@abstractmethod
def dim(self) -> int:
"""
returns the dimensionality of the problem
"""
raise NotImplementedError()
We define the Ackley and Griewank functions as a subclass for OptimizationProblem
:
from botorch.test_functions import Ackley as _Ackley
class Ackley(OptimizationProblem):
def __init__(self, dim: int):
self._dim = dim
self._ackley = _Ackley(dim=dim, negate=True)
def __call__(self, x: torch.Tensor):
return eval_objective(x, self._ackley)
def lb(self) -> np.ndarray:
return self._ackley.bounds[0]
def ub(self) -> np.ndarray:
return self._ackley.bounds[1]
def dim(self) -> int:
return self._dim
class Griewank(OptimizationProblem):
def __init__(self, dim: int):
self._dim = dim
self._griewank = _Griewank(dim=dim, negate=True)
self._name = f'Griewank-{dim}'
def __call__(self, x: torch.Tensor):
return eval_objective(x, self._griewank)
def lb(self) -> np.ndarray:
return self._griewank.bounds[0]
def ub(self) -> np.ndarray:
return self._griewank.bounds[1]
def dim(self) -> int:
return self._dim
TuRBO state
TuRBO is an algorithm for scalable high-dimensional BO. We first define the state of a TuRBO instance:
@dataclass
class TurboState:
dim: int # dimensionality of the search space
batch_size: int # number of parallel function evaluations
length: float = 0.8 # initial TR base length
length_min: float = 0.5 ** 7 # min TR base length
length_max: float = 1.6 # max TR base length
failure_counter: int = 0 # number of times we did not make progress in optimizing the function
failure_tolerance: int = float("nan") # Note: Post-initialized, after more than failure_tolerance failures, we shrink the TR
success_counter: int = 0 # number of times we made progress in optimizing the function
success_tolerance: int = 10 # after that many successes, we increase the TR size
best_value: float = -float("inf") # best value observed
restart_triggered: bool = False # whether we want to restart this TR
def __post_init__(self):
# the failure tolerance increases with the dimensionality of the problem
# and decreases with batch size as batches correspond to parallel
# function evaluations
self.failure_tolerance = math.ceil(
max([4.0 / self.batch_size, float(self.dim) / self.batch_size])
)
Depending on the \(y\)-value of the next point evaluated, we want to update the state;
def update_state(state, y_next):
# if we made progress in optimizing the function
if max(y_next) > state.best_value + 1e-3 * math.fabs(state.best_value):
state.success_counter += 1
state.failure_counter = 0
else:
state.success_counter = 0
state.failure_counter += 1
# if we made state.success_tolerance many progresses
if state.success_counter == state.success_tolerance: # Expand trust region
state.length = min(2.0 * state.length, state.length_max)
state.success_counter = 0
elif state.failure_counter == state.failure_tolerance: # Shrink trust region
state.length /= 2.0
state.failure_counter = 0
state.best_value = max(state.best_value, max(y_next).item())
if state.length < state.length_min:
state.restart_triggered = True
return state
Next, we define the optimize
function of a TurboInstance
:
def optimize(self):
# create a number of initial points by a sobol sequence
x_init = self.get_initial_points()
# evaluate function for initial points
y_init = torch.tensor([self.function(x) for x in x_init])
# maintain tensors of all observations
self.X = torch.cat((self.X, x_init), dim=0)
self.y = torch.cat((self.y, y_init))
# while the trust region isnt too smalle
while not self.state.restart_triggered:
# normalize function values
train_y = (self.y - self.y.mean()) / self.y.std()
# get model for training points (pass points if no deep kernel used, otherwise use deep kernel)
model = self.model(
self.X, train_y.unsqueeze(-1), **self.model_kwargs) if not hasattr(self.model,
'feature_extractor') else self.model
with gpytorch.settings.max_cholesky_size(float("inf")):
# Fit the model if not in deep kernel mode
if not hasattr(self.model, 'feature_extractor'):
mll = self.mll_opt(model.likelihood, model)
fit_gpytorch_torch(mll, options={'disp': False})
# Create a batch
x_next = generate_batch(
state=self.state,
model=model,
X=self.X,
Y=train_y,
batch_size=self.batch_size,
n_candidates=self.n_candidates,
num_restarts=self.num_restarts,
raw_samples=self.raw_samples,
acqf="ts",
)
# evaluate function for new points
y_next = torch.tensor([self.function(x) for x in x_next])
# update state based on new function values
self.state = update_state(self.state, y_next)
# append to local observations
self.X = torch.cat((self.X, x_next), dim=0)
self.y = torch.cat((self.y, y_next), dim=0)
print(
f"{self.identifier}: {len(self.X)}) Best value: {self.state.best_value:.3}, TR length: {self.state.length:.3f}"
)
self.has_run = True
return self.X, self.y
The initial points are simply drawn from a scrambled Sobol sequence:
def get_initial_points(self):
sobol = SobolEngine(dimension=self.dim, scramble=True, seed=self.seed)
X_init = sobol.draw(n=self.n_init)
return X_init
Deep Kernel Model
Next, we define the deep kernel as a kernel that prepends a neural network as a feature extractor. The NN is trained by maximum likelihood.
import gpytorch
import torch
from botorch.models.gpytorch import GPyTorchModel
from gpytorch.distributions import MultivariateNormal
from gpytorch.kernels import MaternKernel, ScaleKernel, GridInterpolationKernel
from gpytorch.means import ConstantMean
from gpytorch.models import ExactGP
from gpytorch.utils.grid import ScaleToBounds
from torch.nn import Linear, ReLU
class FeatureExtractor(torch.nn.Sequential):
'''
The feature extractor for the inputs
'''
def __init__(self, data_dim, layer_depths = None):
super(FeatureExtractor, self).__init__()
self.add_module('linear1', Linear(data_dim, 128))
self.add_module('relu1', ReLU())
self.add_module('linear2', Linear(128, 64))
self.add_module('relu2', ReLU())
self.add_module('linear3', Linear(64, 16))
self.add_module('relu3', ReLU())
self.add_module('linear5', Linear(16, 2))
class DeepKernelGPRegressor(GPyTorchModel, ExactGP):
'''
Custom GP model that first calls the feature extractor and passes the outputs to the actual kernel.
'''
# Freeze everything but the last layer when training locally?
def __init__(self, train_x, train_y, likelihood, architecture):
super(DeepKernelGPRegressor, self).__init__(train_x, train_y, likelihood)
self.mean_module = ConstantMean()
self.covar_module = GridInterpolationKernel(
ScaleKernel(MaternKernel(ard_num_dims=2)),
num_dims=2, grid_size=100
)
self.feature_extractor = architecture
# This module will scale the NN features so that they're nice values
self.scale_to_bounds = ScaleToBounds(-1., 1.)
def __call__(self, *args, **kwargs):
with gpytorch.settings.debug(False):
return super().__call__(*args, **kwargs)
def forward(self, x):
# We're first putting our data through a deep net (feature extractor)
projected_x = self.feature_extractor(x)
projected_x = self.scale_to_bounds(projected_x) # Make the NN values "nice"
mean_x = self.mean_module(projected_x)
covar_x = self.covar_module(projected_x)
return MultivariateNormal(mean_x, covar_x)
Scalable Optimizer
The ScalableOptimizer
maintains multiple TurboInstance
s:
import os
from logging import info
from typing import Optional, Dict
import torch
import tqdm
from botorch.models import SingleTaskGP
from gpytorch.likelihoods import GaussianLikelihood
from gpytorch.mlls import ExactMarginalLogLikelihood
from pyspark.sql import SparkSession
from torch import Tensor
from scalable_gps.dkl_model import FeatureExtractor, DeepKernelGPRegressor
from scalable_gps.objective import OptimizationProblem
from scalable_gps.turbo_state import TurboInstance
from scalable_gps.utils import save
class ScalableOptimizer:
def __init__(self,
objective: OptimizationProblem,
outer_iterations: int = 10,
num_parallel: int = 2,
num_total_iterations: int = -1,
batch_size: int = 2,
turbo_kwargs: Optional[Dict] = {},
use_dkl: bool = True,
name: str = 'TurBO-DKL',
save_path: str = os.getcwd(),
seed: int = 0
):
# create spark session if it doesnt exist yet
self.spark = SparkSession.builder.getOrCreate()
self.sc = self.spark.sparkContext
self.batch_size = batch_size
self.num_total_iterations = num_total_iterations
self.num_parallel = num_parallel
self.batch_size = batch_size
self.turbo_kwargs = turbo_kwargs
self.objective = objective
self.dim = objective.dim()
self.outer_iterations = outer_iterations
self.name = name
self.use_dkl = use_dkl
self.save_path = save_path
self.seed = seed
def optimize(self):
# maintain tensors of all observations from all TurboInstances
x_global = torch.empty((0, self.dim))
y_global = torch.empty(0)
# we start with no deep kernel
deep_kernel_model = None
for i_outer in range(self.outer_iterations):
# for each outer iteration, define a list of TurboInstances
info(f"Starting outer iteration {i_outer + 1}")
self.turbo_processes = [
TurboInstance(
batch_size=self.batch_size,
function=self.objective,
model=deep_kernel_model if deep_kernel_model is not None else SingleTaskGP,
identifier=f"TR-{i}",
seed=int(f"{self.seed}{i+1}"))
for i in range(self.num_parallel)
]
# parallelize the list
turbos = self.sc.parallelize(self.turbo_processes)
# define the optimization
res = turbos.map(lambda t: t.optimize())
# and collect the data
data = res.collect()
# append to global observations
x_aggregated = torch.cat([Tensor(proc_data[0])
for proc_data in data], axis=0)
y_aggregated = torch.cat([Tensor(proc_data[1])
for proc_data in data], axis=0)
x_global = torch.cat((x_global, x_aggregated), dim=0)
y_global = torch.cat((y_global, y_aggregated), dim=0)
y_global_normalized = (y_global - y_global.mean()) / y_global.std()
if self.use_dkl:
# train deep kernel
deep_kernel_model = self._train_deepkernel(
x_global, y_global_normalized)
# save optimization results
save(x_global, y_global, self.save_path, self.name, self.objective.name())
def _train_deepkernel(self, X: Tensor, y: Tensor, num_iters: int = 100):
'''
Train a feature extractor (neural net) for the inputs
'''
likelihood = GaussianLikelihood()
feature_extractor = FeatureExtractor(self.objective.dim())
dkl_model = DeepKernelGPRegressor(X, y, likelihood, feature_extractor)
optimizer = torch.optim.Adam([
{'params': dkl_model.feature_extractor.parameters()},
{'params': dkl_model.covar_module.parameters()},
{'params': dkl_model.mean_module.parameters()},
{'params': dkl_model.likelihood.parameters()},
], lr=0.01)
mll = ExactMarginalLogLikelihood(likelihood, dkl_model)
iterator = tqdm.tqdm(range(num_iters))
# on-the-fly SGD - should probably be implemented according to a paper on overfit in DKL
for i in iterator:
# Zero backprop gradients
optimizer.zero_grad()
# Get output from model
output = dkl_model(X)
# Calc loss and backprop derivatives
loss = -mll(output, y)
loss.backward()
iterator.set_postfix(loss=loss.item())
optimizer.step()
return dkl_model