import uuid
import copy
import logging
import random
import time
from typing import Optional, List, TYPE_CHECKING, Dict, Tuple, Sequence
import pandas as pd
from sklearn.base import TransformerMixin
from sklearn.preprocessing import OneHotEncoder
from gama.genetic_programming.components import Individual
from gama.postprocessing.base_post_processing import BasePostProcessing
from gama.utilities.evaluation_library import EvaluationLibrary, Evaluation
from gama.utilities.export import (
imports_and_steps_for_individual,
format_import,
format_pipeline,
transformers_to_str,
)
from gama.utilities.metrics import Metric, MetricType
if TYPE_CHECKING:
from gama.gama import Gama
log = logging.getLogger(__name__)
[docs]class EnsemblePostProcessing(BasePostProcessing):
def __init__(
self,
time_fraction: float = 0.3,
ensemble_size: Optional[int] = 25,
hillclimb_size: Optional[int] = 10_000,
max_models: Optional[int] = 200,
):
""" Ensemble construction per Caruana et al.
Parameters
----------
time_fraction: float (default=0.3)
Fraction of total time reserved for Ensemble building.
ensemble_size: int, optional (default=25)
Total number of models in the ensemble.
When a single model is chosen more than once, it will increase its weight
in the ensemble and *does* count towards this maximum.
hillclimb_size: int, optional (default=10_000)
Number of predictions that are used to determine the ensemble score
during hillclimbing. If `None`, use all.
max_models: int, optional (default=200)
Only consider the best `max_models` number of models. If `None`, use all.
Consequently also sets the max number of unique models in the ensemble.
"""
super().__init__(time_fraction)
self._hyperparameters = dict(
ensemble_size=(ensemble_size, 25),
metric=(None, None),
evaluation_library=(None, None),
hillclimb_size=(hillclimb_size, 10_000),
max_models=(max_models, 200),
)
self._ensemble: Optional[Ensemble] = None
def dynamic_defaults(self, gama: "Gama"):
self._overwrite_hyperparameter_default("metric", gama._metrics[0])
self._overwrite_hyperparameter_default(
"evaluation_library", gama._evaluation_library
)
def post_process(
self, x: pd.DataFrame, y: pd.Series, timeout: float, selection: List[Individual]
) -> object:
self._ensemble = build_fit_ensemble(
x,
y,
self.hyperparameters["ensemble_size"],
timeout,
self.hyperparameters["metric"],
self.hyperparameters["evaluation_library"],
)
return self._ensemble
def to_code(
self, preprocessing: Sequence[Tuple[str, TransformerMixin]] = None
) -> str:
if isinstance(self._ensemble, EnsembleClassifier):
voter = "VotingClassifier"
elif isinstance(self._ensemble, EnsembleRegressor):
voter = "VotingRegressor"
else:
raise RuntimeError(f"Can't export ensemble of type {type(self._ensemble)}.")
imports = {
f"from sklearn.ensemble import {voter}",
"from sklearn.pipeline import Pipeline",
}
pipelines = []
for i, (model, weight) in enumerate(self._ensemble._models.values()):
ind_imports, steps = imports_and_steps_for_individual(model.individual)
imports = imports.union(ind_imports)
pipeline_name = f"pipeline_{i}"
pipelines.append(format_pipeline(steps, name=pipeline_name))
estimators = ",".join([f"('{i}', pipeline_{i})" for i in range(len(pipelines))])
weights = [weight for _, weight in self._ensemble._models.values()]
if isinstance(self._ensemble, EnsembleClassifier):
if self._ensemble._metric.requires_probabilities:
voting = ",'soft'"
else:
voting = ", 'hard'"
else:
voting = "" # This parameter does not exist for VotingRegressor
if preprocessing is not None:
imports = imports.union({format_import(t) for _, t in preprocessing})
script = (
"\n".join(sorted(imports))
+ "\n\n"
+ "\n\n".join(pipelines)
+ "\n"
+ f"ensemble = {voter}([{estimators}]{voting},{weights})\n"
)
if preprocessing is not None:
trans_strs = transformers_to_str([t for _, t in preprocessing])
names = [name for name, _ in preprocessing]
steps = list(zip(names, trans_strs))
script += format_pipeline(steps + [("ensemble", "ensemble")])
return script
class Ensemble(object):
def __init__(
self,
metric,
y: pd.DataFrame,
evaluation_library: EvaluationLibrary = None,
shrink_on_pickle=True,
downsample_to: Optional[int] = 10_000,
use_top_n_only: Optional[int] = 200,
):
"""
Either model_library or model_library_directory must be specified.
If model_library is specified, model_library_directory is ignored.
Parameters
----------
metric: string or Metric
Metric to optimize the ensemble towards.
y: pandas.DataFrame
True labels for the predictions made by the models in the library.
evaluation_library: `gama.utilities.evaluation_library.EvaluationLibrary`
A list of models from which an ensemble can be built.
shrink_on_pickle: bool (default=True)
If True, remove memory-intensive attributes that are required before pickle.
When unpickled, the model can be used to create predictions,
but the ensemble can't be changed.
"""
if isinstance(metric, str):
metric = Metric(metric)
elif not isinstance(metric, Metric):
raise ValueError(
"metric must be specified as string or `gama.ea.metrics.Metric`."
)
if evaluation_library is None:
raise ValueError(
"`evaluation_library` is None but must be EvaluationLibrary."
)
elif not isinstance(evaluation_library, EvaluationLibrary):
raise TypeError(
"`evaluation_library` must be of type "
"gama.utilities.evaluation_library.EvaluationLibrary."
)
if not isinstance(y, (pd.Series, pd.DataFrame)):
raise TypeError(f"y_true must be pd.DataFrame or pd.Series, is {type(y)}.")
self._metric = metric
self.evaluation_library = evaluation_library
self._model_library: List[Evaluation] = []
self._use_top_n_only = use_top_n_only
self._shrink_on_pickle = shrink_on_pickle
self._prediction_transformation = None
if self.evaluation_library._sample is not None:
# If the library stores sampled predictions, we match that first.
y = y.iloc[self.evaluation_library._sample]
# Then apply even more sampling if requested.
if downsample_to is None or downsample_to >= len(y):
if downsample_to is not None:
log.info(f"Not downsampling because only {len(y)} samples were stored.")
self._y = y
self._prediction_sample = None
else:
log.info(f"Downsampling as training data exceeds {downsample_to} samples.")
self._prediction_sample = random.sample(range(len(y)), downsample_to)
self._y = y.iloc[self._prediction_sample]
self._internal_score = -float("inf")
self._fit_models = None
self._maximize = True
self._models: Dict[uuid.UUID, Tuple[Evaluation, int]] = {}
@property
def model_library(self):
if not self._model_library:
self._model_library = []
for evaluation in self.evaluation_library.n_best(self._use_top_n_only):
predictions = evaluation.predictions
if self._prediction_transformation:
predictions = self._prediction_transformation(predictions)
if self._prediction_sample:
predictions = predictions[self._prediction_sample]
e = copy.copy(evaluation)
e._predictions = predictions
self._model_library.append(e)
return self._model_library
def _ensemble_validation_score(self, prediction_to_validate=None):
raise NotImplementedError("Must be implemented by child class.")
def _total_fit_weights(self):
return sum([weight for (model, weight) in self._fit_models])
def _total_model_weights(self):
return sum([weight for (model, weight) in self._models.values()])
def _averaged_validation_predictions(self):
""" Weighted average of predictions of current models on the hillclimb set. """
weighted_sum_predictions = sum(
[model.predictions * weight for (model, weight) in self._models.values()]
)
return weighted_sum_predictions / self._total_model_weights()
def build_initial_ensemble(self, n: int):
""" Add top n models in EvaluationLibrary to the ensemble.
Parameters
----------
n: int
Number of models to include.
"""
if not n > 0:
raise ValueError("Ensemble must include at least one model.")
if self._models:
log.warning(
"The ensemble already contained models. Overwriting the ensemble."
)
self._models = {}
# Since the model library only features unique models,
# we do not need to check for duplicates here.
for model in self.model_library[:n]:
self._add_model(model)
log.debug(
"Initial ensemble created with score {}".format(
self._ensemble_validation_score()
)
)
def _add_model(self, model, add_weight=1):
""" Add a specific model to the ensemble or increases its weight. """
model, weight = self._models.pop(model.individual._id, (model, 0))
new_weight = weight + add_weight
self._models[model.individual._id] = (model, new_weight)
log.debug(f"Weight {model.individual.short_name('>')} set to {new_weight}.")
def expand_ensemble(self, n: int):
""" Adds new models to the ensemble based on earlier given data.
Parameters
----------
n: int
Number of models to add to current ensemble.
"""
if not n > 0:
raise ValueError("n must be greater than 0.")
for _ in range(n):
best_addition_score = -float("inf")
current_weighted_average = self._averaged_validation_predictions()
current_total_weight = self._total_model_weights()
for model in self.model_library:
if model.score == 0:
continue
candidate_pred = current_weighted_average + (
model.predictions - current_weighted_average
) / (current_total_weight + 1)
candidate_ensemble_score = self._ensemble_validation_score(
candidate_pred
)
if best_addition_score < candidate_ensemble_score:
best_addition, best_addition_score = model, candidate_ensemble_score
self._add_model(best_addition)
self._internal_score = best_addition_score
log.info(
"Ensemble size {} , best score: {}".format(
self._total_model_weights(), best_addition_score
)
)
def fit(self, x, y, timeout=1e6):
""" Constructs an Ensemble out of the library of models.
Parameters
----------
x:
Data to fit the final selection of models on.
y:
Targets corresponding to features x.
timeout: int (default=1e6)
Maximum amount of time in seconds that is allowed for fitting pipelines.
If this time is exceeded, only pipelines fit until that point are taken
into account when making predictions.
Starting the parallelization takes roughly 4 seconds by itself.
"""
if not self._models:
raise RuntimeError(
"You need to call `build` to select models before fitting them."
)
if timeout <= 0:
raise ValueError("timeout must be greater than 0.")
self._fit_models = [
(estimator, weight)
for (model, weight) in self._models.values()
for estimator in model.estimators
]
# for (model, weight) in self._models.values():
# self._fit_models = []
# futures = set()
# with stopit.ThreadingTimeout(timeout) as c_mgr, AsyncEvaluator() as async_:
# for (model, weight) in self._models.values():
# futures.add(
# async_.submit(fit_and_weight, (model.pipeline, X, y, weight))
# )
#
# for _ in self._models.values():
# future = async_.wait_next()
# pipeline, weight = future.result
# if weight > 0:
# self._fit_models.append((pipeline, weight))
#
# if not c_mgr:
# log.info("Fitting of ensemble stopped early.")
def _get_weighted_mean_predictions(self, X, predict_method="predict"):
weighted_predictions = []
for (model, weight) in self._fit_models:
target_prediction = getattr(model, predict_method)(X)
if self._prediction_transformation:
target_prediction = self._prediction_transformation(target_prediction)
weighted_predictions.append(target_prediction * weight)
return sum(weighted_predictions) / self._total_fit_weights()
def __str__(self):
if not self._models:
return "Ensemble with no models."
_str_ = f"Ensemble of {len(self._models)} pipelines.\nR\tW\tScore\tPipeline\n"
models = sorted(self._models.values(), key=lambda x: x[0].validation_score)
for i, (model, weight) in enumerate(models):
_str_ += f"{i}\t{weight}\t{model.validation_score[0]:.4f}\t{model.name}\n"
return _str_
def __getstate__(self):
if self._shrink_on_pickle:
log.info(
"Shrinking before pickle because shrink_on_pickle is True."
"Removing anything that is not needed for predict-functionality."
"Functionality to expand ensemble after unpickle is not available."
)
self._models = None
self._model_library = None
# self._y_true can not be removed as it is needed to ensure proper
# dimensionality of predictions.
# Alternatively, one could just save the number of classes instead.
return self.__dict__.copy()
def fit_and_weight(args):
""" Fit the pipeline given the data. Update weight to 0 if fitting fails.
Parameters
----------
args: Tuple
Expected Tuple of [Pipeline, X, y, weight].
Returns
-------
pipeline
The same pipeline that was provided as input.
weight
If fitting succeeded, return the input weight.
If *any* exception occurred during fitting, weight is 0.
"""
pipeline, X, y, weight = args
try:
pipeline.fit(X, y)
except Exception:
log.warning(f"Exception fitting {pipeline}. Set weight to 0.", exc_info=True)
weight = 0
return pipeline, weight
class EnsembleClassifier(Ensemble):
def __init__(self, metric, y_true, label_encoder=None, *args, **kwargs):
super().__init__(metric, y_true, *args, **kwargs)
self._label_encoder = label_encoder
# For metrics that only require class labels,
# we still want to apply one-hot-encoding to average predictions.
y_as_squeezed_array = y_true.to_numpy().reshape(-1, 1)
self._one_hot_encoder = OneHotEncoder(categories="auto")
self._one_hot_encoder.fit(y_as_squeezed_array)
if self._metric.requires_probabilities:
self._y = self._one_hot_encoder.transform(
self._y.to_numpy().reshape(-1, 1)
).toarray()
if self._prediction_sample is not None:
self._y = self._y[self._prediction_sample]
else:
def one_hot_encode_predictions(predictions):
return self._one_hot_encoder.transform(predictions.reshape(-1, 1))
self._prediction_transformation = one_hot_encode_predictions
def _ensemble_validation_score(self, prediction_to_validate=None):
if prediction_to_validate is None:
prediction_to_validate = self._averaged_validation_predictions()
if self._metric.requires_probabilities:
return self._metric.maximizable_score(self._y, prediction_to_validate)
else:
# argmax returns (N, 1) matrix, need to squeeze it to (N,) for scoring.
class_predictions = self._one_hot_encoder.inverse_transform(
prediction_to_validate.toarray()
)
return self._metric.maximizable_score(self._y, class_predictions)
def predict(self, X):
if self._metric.requires_probabilities:
log.warning(
"Ensemble was tuned with a class-probabilities metric. "
"Using argmax of probabilities, which may not give optimal predictions."
)
class_probabilities = self._get_weighted_mean_predictions(
X, "predict_proba"
)
else:
class_probabilities = self._get_weighted_mean_predictions(
X, "predict"
).toarray()
class_predictions = self._one_hot_encoder.inverse_transform(class_probabilities)
if self._label_encoder:
class_predictions = self._label_encoder.inverse_transform(class_predictions)
return class_predictions.ravel()
def predict_proba(self, X):
if self._metric.requires_probabilities:
return self._get_weighted_mean_predictions(X, "predict_proba")
else:
log.warning(
"Ensemble was tuned with a class label predictions metric, "
"not probabilities. Using weighted mean of class predictions."
)
return self._get_weighted_mean_predictions(X, "predict").toarray()
class EnsembleRegressor(Ensemble):
def _ensemble_validation_score(self, prediction_to_validate=None):
if prediction_to_validate is None:
prediction_to_validate = self._averaged_validation_predictions()
return self._metric.maximizable_score(self._y, prediction_to_validate)
def predict(self, X):
return self._get_weighted_mean_predictions(X)
def build_fit_ensemble(
x,
y,
ensemble_size: int,
timeout: float,
metric: Metric,
evaluation_library: EvaluationLibrary,
encoder: Optional[object] = None,
) -> Ensemble:
""" Construct an Ensemble of models, optimizing for metric. """
start_build = time.time()
log.debug("Building ensemble.")
if metric.task_type == MetricType.REGRESSION:
ensemble = EnsembleRegressor(metric, y, evaluation_library) # type: Ensemble
elif metric.task_type == MetricType.CLASSIFICATION:
ensemble = EnsembleClassifier(metric, y, evaluation_library=evaluation_library)
ensemble._label_encoder = encoder
else:
raise ValueError(f"Unknown metric task type {metric.task_type}")
try:
# Starting with more models in the ensemble should help against overfitting,
# but depending on the total ensemble size, it might leave too little room
# to calibrate the weights or add new models. So we have some adaptive defaults.
if ensemble_size <= 10:
ensemble.build_initial_ensemble(1)
else:
ensemble.build_initial_ensemble(10)
remainder = ensemble_size - ensemble._total_model_weights()
if remainder > 0:
ensemble.expand_ensemble(remainder)
build_time = time.time() - start_build
timeout = timeout - build_time
log.info(f"Ensemble build took {build_time}s. Fit with timeout {timeout}s.")
ensemble.fit(x, y, timeout)
except Exception as e:
log.warning(f"Error during auto ensemble: {e}", exc_info=True)
return ensemble