Source code for lightwin.optimisation.algorithms.nsga

"""Define :class:`NSGA`, a genetic algorithm for optimisation.

.. warning::
    Implementation not modified since v0.0.0.0.0.1 or so

"""

import logging
from typing import Callable

import numpy as np
from pymoo.algorithms.moo.nsga3 import NSGA3
from pymoo.core.algorithm import Algorithm
from pymoo.core.evaluator import Evaluator
from pymoo.core.population import Population
from pymoo.core.problem import ElementwiseProblem, Problem
from pymoo.core.result import Result
from pymoo.mcdm.pseudo_weights import PseudoWeights
from pymoo.operators.sampling.lhs import LHS
from pymoo.operators.selection.tournament import TournamentSelection
from pymoo.optimize import minimize
from pymoo.termination.default import DefaultMultiObjectiveTermination

from lightwin.failures.set_of_cavity_settings import SetOfCavitySettings
from lightwin.optimisation.algorithms.algorithm import (
    OptimisationAlgorithm,
    OptiSol,
)


[docs] class NSGA(OptimisationAlgorithm): """Non-dominated Sorted Genetic Algorithm.""" supports_constraints = True
[docs] def __init__( self, *args, history_kwargs: dict | None = None, **kwargs ) -> None: """Set additional information.""" if history_kwargs is not None: logging.warning( "History recording not implemented for DownhillSimplexPenalty." ) super().__init__(*args, history_kwargs=history_kwargs, **kwargs)
[docs] def optimize(self) -> OptiSol: """Set up the optimization and solve the problem. Returns ------- success : Tells if the optimisation algorithm managed to converge. optimized_cavity_settings : Best solution found by the optimization algorithm. info : Gives list of solutions, corresponding objective, convergence violation if applicable, etc. """ problem: Problem = MyElementwiseProblem( _wrapper_residuals=self._wrapper_residuals, **self._problem_arguments, ) bias_init = True initial_population = None n_pop = 200 if bias_init: initial_population = self._set_population(problem, n_pop) algorithm = self._set_algorithm(sampling=initial_population) termination = self._set_termination() result: Result = minimize( problem=problem, algorithm=algorithm, termination=termination, selection=TournamentSelection, # seed=None, verbose=True, # display=None, # callback=None, # return_least_infeasible=False, # save_history=False, ) success = True # add least squares solution # result.X = np.vstack((result.X, self.x_0)) # f, g = self._wrapper_residuals(self.x_0) # result.F = np.vstack((result.F, f)) # result.G.append(g) set_of_cavity_settings, info = self._best_solution(result) self._finalize() return success, set_of_cavity_settings, info
@property def _problem_arguments(self) -> dict[str, int | np.ndarray]: """Gather arguments required for :class:`.ElementwiseProblem`.""" _xl, _xu = self._format_variables() kwargs = { "n_var": self.n_var, "n_obj": self.n_obj, "n_ieq_constr": self.n_constr, "xl": _xl, "xu": _xu, } return kwargs
[docs] def _format_variables(self) -> tuple[np.ndarray, np.ndarray]: """Format :class:`.Variable` for this algorithm.""" _xl = [var.limits[0] for var in self.variables] _xu = [var.limits[1] for var in self.variables] return _xl, _xu
@property def x_0(self) -> np.ndarray: """Return initial value used in :class:`.LeastSquares`.""" return np.array([var.x_0 for var in self.variables]) @property def x_max_k_e(self) -> np.ndarray: """Return a solution with maximum electric fields.""" x_max = [] for var in self.variables: if var.name == "k_e": x_max.append(var.limits[1]) continue x_max.append(var.x_0) return x_max
[docs] def _wrapper_residuals( self, var: np.ndarray ) -> tuple[np.ndarray, np.ndarray]: """Compute residuals from an array of variable values.""" cav_settings = self._create_set_of_cavity_settings(var) simulation_output = self.compute_beam_propagation(cav_settings) objective = self.compute_residuals(simulation_output) constraints = self.compute_constraints(simulation_output) return np.abs(objective), constraints
[docs] def _set_algorithm(self, *args, **kwargs) -> Algorithm: """Set `pymoo`s `Algorithm` object.""" algorithm = NSGA3(*args, **kwargs) return algorithm
[docs] def _set_termination(self) -> DefaultMultiObjectiveTermination: """Set the termination condition.""" termination = DefaultMultiObjectiveTermination( n_max_gen=1000, n_max_evals=10000, xtol=1e-8, ftol=1e-8 ) return termination
[docs] def _set_population(self, problem: Problem, n_pop: int) -> Population: """Set population, with some predefined individuals.""" sampling = LHS() initial_variables = sampling(problem, n_pop).get("X") initial_variables[0, :] = self.x_0 initial_variables[1, :] = self.x_max_k_e initial_population = Population.new("X", initial_variables) Evaluator().eval(problem, initial_population) return initial_population
[docs] def _best_solution( self, result: Result ) -> tuple[SetOfCavitySettings, dict[str, np.ndarray]]: """Take the "best" solution.""" approx = _characteristic_points(result) n_f = (result.F - approx["ideal"]) / ( approx["nadir"] - approx["ideal"] ) n_obj = len(self.objectives) weights = np.ones(n_obj) / n_obj idx_best = PseudoWeights(weights).do(n_f) set_of_cavity_settings = self._create_set_of_cavity_settings( result.X[idx_best] ) info = {"X": result.X[idx_best], "F": result.F[idx_best]} logging.info(f"I choose {info['F']} (idx {idx_best})") return set_of_cavity_settings, info
[docs] class MyElementwiseProblem(ElementwiseProblem): """A first test implementation, eval single solution at a time."""
[docs] def __init__( self, _wrapper_residuals: Callable[np.ndarray, np.ndarray], **kwargs: int | np.ndarray, ) -> None: """Create object.""" self._wrapper_residuals = _wrapper_residuals super().__init__(**kwargs)
[docs] def _evaluate( self, x: np.ndarray, out: dict[str, np.ndarray], *args, **kwargs ) -> dict[str, np.ndarray]: """Calculate and return the objectives.""" out["F"], out["G"] = self._wrapper_residuals(x) return out
[docs] def _characteristic_points(result: Result) -> dict[str, np.ndarray]: """Give the ideal and Nadir points as a dict.""" ideal_idx = result.F.argmin(axis=0) ideal = result.F.min(axis=0) # ideal_idx_bis = result.F[:-1].argmin(axis=0) # ideal_bis = result.F[:-1].min(axis=0) nadir_idx = result.F.argmax(axis=0) nadir = result.F.max(axis=0) logging.info( f"Manually added: idx {result.F.shape[0] - 1}\n" f"Ideal points are {ideal} (idx {ideal_idx})\n" # f"(without manually added lsq: {ideal_bis} @ {ideal_idx_bis}\n" f"Nadir points are {nadir} (idx {nadir_idx})" ) approx = {"ideal": ideal, "nadir": nadir} return approx