Source code for fairdo.optimize.single.ga

"""
This module implements the Genetic Algorithm for combinatorial optimization problems.
It reflects the process of natural selection where the fittest individuals are selected for
reproduction to produce the offspring for the next generation [1]_ [2]_.

The optimization algorithm is ``genetic_algorithm``, which takes the fitness ``f``, the dimensionality ``d``,
the population size ``pop_size``, and number of generation ``num_generations`` as inputs.
The algorithm iteratively applies selection, crossover, and mutation operations to
evolve the population over a number of generations.
The genetic operators can be customized by passing
the corresponding functions from ``fairdo.optimize.operators`` as arguments.

References
----------

.. [1] Goldberg, D. E. (1989).
    Genetic Algorithms in Search, Optimization, and Machine Learning.
    Addison-Wesley.

.. [2] Holland, J. H. (1975).
    Adaptation in Natural and Artificial Systems.
    University of Michigan Press.
"""

import pathos.multiprocessing as mp
import numpy as np

from fairdo.optimize.operators.initialization import random_initialization, variable_initialization
from fairdo.optimize.operators.selection import elitist_selection, tournament_selection
from fairdo.optimize.operators.crossover import onepoint_crossover, uniform_crossover
from fairdo.optimize.operators.mutation import fractional_flip_mutation, shuffle_mutation, bit_flip_mutation


[docs]def genetic_algorithm(f, d, pop_size=100, num_generations=500, initialization=random_initialization, selection=elitist_selection, crossover=uniform_crossover, mutation=fractional_flip_mutation, maximize=False, tol=1e-6, patience=50): """ Perform a genetic algorithm. It consists of the following steps which are repeated for a specified number of generations: 1. Generate an initial population of binary vectors. 2. Evaluate the fitness of each vector in the population. 3. Select the best individuals to be parents. (Selection) 4. Create offspring by performing crossover on the parents. (Crossover) 5. Mutate the offspring. (Mutation) Parameters ---------- f: callable The objective function (fitness) to minimize. d: int The number of dimensions. pop_size: int The size of the population. num_generations: int The number of generations. initialization: callable, optional (default=random_initialization) The function to initialize the population. selection: callable, optional (default=elitist_selection) The function to select the parents from the population. crossover: callable, optional (default=uniform_crossover) The function to perform the crossover operation. mutation: callable, optional (default=fractional_flip_mutation) The function to perform the mutation operation. maximize: bool, optional (default=False) Whether to maximize or minimize the fitness function. tol: float, optional (default=1e-6) The tolerance for early stopping. If the best solution found is within tol of the previous best solution, then the algorithm stops. patience: int, optional (default=50) The number of generations to wait before early stopping. Returns ------- best_solution: ndarray, shape (d,) The best solution found by the algorithm. best_fitness: float The fitness of the best solution found by the algorithm. Notes ----- The genetic algorithm is used to maximize the given fitness function. To avoid having to rewrite the selection, crossover, and mutation functions to work with minimization problems, the fitness function is negated if we are minimizing. The fitness function must map the binary vector to a positive value, i.e., :math:`f: \\{0, 1\\}^d \\rightarrow \\mathbb{R}^+`. """ # negate the fitness function if we are minimizing if not maximize: f_orig = f f = lambda x: -f_orig(x) if pop_size < 3: raise ValueError("The population size must be at least 3.") # Generate the initial population population = initialization(pop_size=pop_size, d=d) # Evaluate the function for each vector in the population fitness = evaluate_population(f, population) best_idx = np.argmax(fitness) best_fitness = fitness[best_idx] best_population = population[best_idx] no_improvement_streak = 0 # Perform the genetic algorithm for the specified number of generations for generation in range(num_generations): # Select the parents parents, fitness = selection(population=population, fitness=fitness) # Create the offspring num_offspring = pop_size - parents.shape[0] offspring = crossover(parents=parents, num_offspring=num_offspring) # Mutate the offspring offspring = mutation(offspring=offspring) # Evaluate the fitness of the offspring offspring_fitness = evaluate_population(f, offspring) # Create the new population (allow the parents to be part of the next generation) population = np.concatenate((parents, offspring)) fitness = np.concatenate((fitness, offspring_fitness)) # save the best solution found so far best_idx = np.argmax(offspring_fitness) if offspring_fitness[best_idx] > best_fitness + tol: best_fitness = offspring_fitness[best_idx] best_population = offspring[best_idx] no_improvement_streak = 0 else: # early stopping if the best solution is found no_improvement_streak += 1 if no_improvement_streak >= patience: print(f"Stopping after {generation + 1} generations after stagnating for " f"{no_improvement_streak} generations.") break if not maximize: # negate the fitness back to its original form best_fitness = -best_fitness return best_population, best_fitness
[docs]def evaluate_individual(args): """ Calculates the fitness of an individual. The fitness is the value of the fitness function plus a penalty for individuals that do not satisfy the size constraint. Parameters ---------- args: tuple The arguments to pass to the function. The arguments are (f, individual). Returns ------- fitness: float The fitness of the individual. """ f, individual = args fitness = f(individual) return fitness
[docs]def evaluate_population_single_cpu(f, population): """ Calculates the fitness of each individual in a population. The fitness is the value of the fitness function plus a penalty for individuals that do not satisfy the size constraint. Parameters ---------- f: callable The fitness function to evaluate. n: int The constraint value. population: ndarray, shape (pop_size, d) The population of vectors to evaluate. Returns ------- fitness: ndarray, shape (pop_size,) The fitness values of the population. """ # fallback to single process execution if multiprocessing fails fitness = np.apply_along_axis(f, axis=1, arr=population) return fitness
[docs]def evaluate_population(f, population): """ Calculates the fitness of each individual in a population. The fitness is the value of the fitness function plus a penalty for individuals that do not satisfy the size constraint. Parameters ---------- f: callable The fitness function to evaluate. population: ndarray, shape (pop_size, d) The population of vectors to evaluate. Returns ------- fitness: ndarray, shape (pop_size,) The fitness values of the population. """ try: # use multiprocessing to speed up the evaluation if the population is large enough if mp.cpu_count() > 1 and population.shape[0] >= 200: # use multiprocessing to speed up the evaluation with mp.Pool() as pool: fitness = pool.map(evaluate_individual, [(f, individual) for individual in population]) return np.array(fitness) else: return evaluate_population_single_cpu(f, population) except Exception as e: print(f"Multiprocessing pool failed with error: {e}") print("Falling back to single process execution") return evaluate_population_single_cpu(f, population)