Source code for gama.search_methods.async_ea

import logging
from functools import partial
from typing import Optional, Any, Tuple, Dict, List, Callable

import pandas as pd

from gama.genetic_programming.components import Individual
from gama.genetic_programming.operator_set import OperatorSet
from gama.logging.evaluation_logger import EvaluationLogger
from gama.search_methods.base_search import BaseSearch
from gama.utilities.generic.async_evaluator import AsyncEvaluator

log = logging.getLogger(__name__)


[docs]class AsyncEA(BaseSearch): """ Perform asynchronous evolutionary optimization. Parameters ---------- population_size: int, optional (default=50) Maximum number of individuals in the population at any time. max_n_evaluations: int, optional (default=None) If specified, only a maximum of `max_n_evaluations` individuals are evaluated. If None, the algorithm will be run until interrupted by the user or a timeout. restart_callback: Callable[[], bool], optional (default=None) Function which takes no arguments and returns True if search restart. """ def __init__( self, population_size: Optional[int] = None, max_n_evaluations: Optional[int] = None, restart_callback: Optional[Callable[[], bool]] = None, ): super().__init__() # maps hyperparameter -> (set value, default) self._hyperparameters: Dict[str, Tuple[Any, Any]] = dict( population_size=(population_size, 50), restart_callback=(restart_callback, None), max_n_evaluations=(max_n_evaluations, None), ) self.output = [] def get_parent(evaluation, n) -> str: """ retrieves the nth parent if it exists, '' otherwise. """ if len(evaluation.individual.meta.get("parents", [])) > n: return evaluation.individual.meta["parents"][n] return "" self.logger = partial( EvaluationLogger, extra_fields=dict( parent0=partial(get_parent, n=0), parent1=partial(get_parent, n=1), origin=lambda e: e.individual.meta.get("origin", "unknown"), ), ) def dynamic_defaults(self, x: pd.DataFrame, y: pd.DataFrame, time_limit: float): pass def search(self, operations: OperatorSet, start_candidates: List[Individual]): self.output = async_ea( operations, self.output, start_candidates, **self.hyperparameters )
def async_ea( ops: OperatorSet, output: List[Individual], start_candidates: List[Individual], restart_callback: Optional[Callable[[], bool]] = None, max_n_evaluations: Optional[int] = None, population_size: int = 50, ) -> List[Individual]: """ Perform asynchronous evolutionary optimization with given operators. Parameters ---------- ops: OperatorSet Operator set with `evaluate`, `create`, `individual` and `eliminate` functions. output: List[Individual] A list which contains the set of best found individuals during search. start_candidates: List[Individual] A list with candidate individuals which should be used to start search from. restart_callback: Callable[[], bool], optional (default=None) Function which takes no arguments and returns True if search restart. max_n_evaluations: int, optional (default=None) If specified, only a maximum of `max_n_evaluations` individuals are evaluated. If None, the algorithm will be run indefinitely. population_size: int (default=50) Maximum number of individuals in the population at any time. Returns ------- List[Individual] The individuals currently in the population. """ if max_n_evaluations is not None and max_n_evaluations <= 0: raise ValueError( f"n_evaluations must be non-negative or None, is {max_n_evaluations}." ) max_pop_size = population_size current_population = output n_evaluated_individuals = 0 with AsyncEvaluator() as async_: should_restart = True while should_restart: should_restart = False current_population[:] = [] log.info("Starting EA with new population.") for individual in start_candidates: async_.submit(ops.evaluate, individual) while (max_n_evaluations is None) or ( n_evaluated_individuals < max_n_evaluations ): future = ops.wait_next(async_) if future.exception is None: individual = future.result.individual current_population.append(individual) if len(current_population) > max_pop_size: to_remove = ops.eliminate(current_population, 1) current_population.remove(to_remove[0]) if len(current_population) > 2: new_individual = ops.create(current_population, 1)[0] async_.submit(ops.evaluate, new_individual) should_restart = restart_callback is not None and restart_callback() n_evaluated_individuals += 1 if should_restart: log.info("Restart criterion met. Creating new random population.") start_candidates = [ops.individual() for _ in range(max_pop_size)] break return current_population