Source code for fairdo.optimize.multi.nsga

"""
Non-dominated Sorting Genetic Algorithm II (NSGA-II) for multi-objective optimization [1]_.
It maintains a population of solutions and uses non-dominated sorting and crowding distance to select the best solutions.
Fitness functions are minimized by default.
Returns the Pareto front of the population.

References
----------

.. [1] Deb, K., Pratap, A., Agarwal, S., & Meyarivan, T. (2002).
    A fast and elitist multiobjective genetic algorithm: NSGA-II.
    IEEE Transactions on Evolutionary Computation.
"""

import numpy as np
import pathos.multiprocessing as mp

from fairdo.optimize.operators.initialization import random_initialization, variable_initialization
from fairdo.optimize.operators.selection import elitist_selection_multi, tournament_selection_multi
from fairdo.optimize.operators.crossover import onepoint_crossover,\
    uniform_crossover, simulated_binary_crossover,\
    no_crossover, onepoint_crossover
from fairdo.optimize.operators.mutation import fractional_flip_mutation,\
    bit_flip_mutation, shuffle_mutation


[docs]def nsga2(fitness_functions, d, pop_size=100, num_generations=500, initialization=variable_initialization, selection=elitist_selection_multi, crossover=onepoint_crossover, mutation=bit_flip_mutation, return_all_fronts=False): """ Perform NSGA-II (Non-dominated Sorting Genetic Algorithm II) for multi-objective optimization. NSGA-II maintains a population of solutions and uses non-dominated sorting and crowding distance to select the best solutions. Fitness functions are minimized by default. Parameters ---------- fitness_functions : list of callables The list of objective functions to minimize. Each function should take a binary vector and return a scalar value. d : int The number of dimensions. pop_size : int The size of the population. num_generations : int The number of generations. initialization : callable, optional (default=random_initialization) The function to initialize the population. selection : callable, optional (default=tournament_selection_multi) The function to perform selection. crossover : callable, optional (default=uniform_crossover) The function to perform crossover. mutation : callable, optional (default=shuffle_mutation) The function to perform mutation. return_all_fronts : bool, optional (default=False) Whether to return all fronts. If False, only the first front is returned. If True, the ``combined_population`` and ``fitness_values`` are returned along with the ``fronts``. Returns ------- population: ndarray, shape (pop_size, d) The best solution found by NSGA-II. fitness_values: ndarray, shape (pop_size, num_fitness_functions) The fitness values of the best solution found by NSGA-II. fronts: list of ndarrays List of fronts, where each front contains the indices of individuals in that front. Only returned if ``return_all_fronts`` is ``True```.) Notes ----- The fitness functions must map the binary vector to a scalar value, i.e., $f: \\{0, 1\\}^d \\rightarrow \\mathbb{R}$. We used the same operators as the authors of NSGA-II in the original paper [1]. We only changed the selection operator. The original paper uses binary tournament selection, but we use elitist selection with multiple objectives. References ---------- [1] Deb, K., Pratap, A., Agarwal, S., & Meyarivan, T. (2002). A fast and elitist multiobjective genetic algorithm: NSGA-II. https://ieeexplore.ieee.org/document/996017 """ # Generate the initial population population = initialization(pop_size=pop_size, d=d) # Evaluate the fitness of each individual in the population fitness_values = evaluate_population(fitness_functions=fitness_functions, population=population) # The fronts of the population. Initially, the fronts are not known fronts = [np.arange(pop_size)] fronts_lengths = [pop_size] # Perform NSGA-II for the specified number of generations for _ in range(num_generations): # Select parents parents = selection(population=population, fitness_values=fitness_values, fronts_lengths=fronts_lengths, num_parents=2) # Perform crossover offspring = crossover(parents=parents, num_offspring=pop_size) # Perform mutation offspring = mutation(offspring=offspring) # Evaluate the fitness of the offspring offspring_fitness_values = evaluate_population_parallel(fitness_functions, offspring) # Combine the parents and the offspring combined_population = np.concatenate((population, offspring)) combined_fitness_values = np.concatenate((fitness_values, offspring_fitness_values)) # Select the best individuals using non-dominated sorting and crowding distance fronts = non_dominated_sort_fast(combined_fitness_values) # Fit the first fronts to the population size. # The front that doesn't fit will be selected based on crowding distance selected_indices = selection_indices(combined_fitness_values, fronts, pop_size) # Update the population and fitness values population = combined_population[selected_indices] fitness_values = combined_fitness_values[selected_indices] # fronts are indices from combined population, so we need to update them as well fronts_lengths = [len(front) for front in fronts] if return_all_fronts is False: return combined_population[fronts[0]], combined_fitness_values[fronts[0]] else: return combined_population, combined_fitness_values, fronts
[docs]def evaluate_population(fitness_functions, population): """ Evaluate the fitness of each individual in the population using the given fitness functions. Parameters ---------- fitness_functions : list of callables The list of fitness functions to evaluate. population : ndarray, shape (pop_size, d) The population of binary vectors. Returns ------- fitness_values : ndarray, shape (pop_size, num_fitness_functions) The fitness values of each individual in the population for each fitness function. """ num_fitness_functions = len(fitness_functions) fitness_values = np.zeros((population.shape[0], num_fitness_functions)) for i, fitness_function in enumerate(fitness_functions): fitness_values[:, i] = np.apply_along_axis(fitness_function, axis=1, arr=population).flatten() return fitness_values
[docs]def evaluate_population_parallel(fitness_functions, population): """ Evaluate the fitness of each individual in the population using the given fitness functions. Parameters ---------- fitness_functions : list of callables The list of fitness functions to evaluate. population : ndarray, shape (pop_size, d) The population of binary vectors. Returns ------- fitness_values : ndarray, shape (pop_size, num_fitness_functions) The fitness values of each individual in the population for each fitness function. """ try: # use multiprocessing to speed up the evaluation if the population is large enough if mp.cpu_count() > 1 and population.shape[0] >= 200: with mp.Pool(mp.cpu_count()) as pool: num_fitness_functions = len(fitness_functions) fitness_values = np.zeros((population.shape[0], num_fitness_functions)) for i, fitness_function in enumerate(fitness_functions): fitness_values[:, i] = np.array(pool.map(fitness_function, population)) return fitness_values else: return evaluate_population(fitness_functions, population) except Exception as e: print(f"Multiprocessing pool failed with error: {e}") print("Falling back to single process execution") return evaluate_population(fitness_functions, population)
[docs]def non_dominated_sort(fitness_values): """ Perform non-dominated sorting on the given fitness values. Parameters ---------- fitness_values : ndarray, shape (pop_size, num_fitness_functions) The fitness values of each individual in the population for each fitness function. Returns ------- fronts : list of ndarrays List of fronts, where each front contains the indices of individuals in that front. """ dominating_counts, dominated_indices = dom_counts_indices_fast(fitness_values) fronts = [] # Find the first front current_front = np.where(dominating_counts == 0)[0] # Iterate over the fronts while current_front.size > 0: fronts.append(current_front) next_front = [] for i in current_front: for j in dominated_indices[i]: dominating_counts[j] -= 1 if dominating_counts[j] == 0: next_front.append(j) current_front = np.array(next_front) return fronts
[docs]def non_dominated_sort_fast(fitness_values): """ Perform non-dominated sorting on the given fitness values. Faster implementation using broadcasting. Parameters ---------- fitness_values : ndarray, shape (pop_size, num_fitness_functions) The fitness values of each individual in the population for each fitness function. Returns ------- fronts : list of ndarrays List of fronts, where each front contains the indices of individuals in that front. Notes ----- This function uses broadcasting to compare all pairs of individuals in the population. The result is a significant speedup compared to the non-broadcasting implementation. """ dominating_counts, dominated_indices = dom_counts_indices_fast(fitness_values) fronts = [] # Find the first front current_front = np.where(dominating_counts == 0)[0] while current_front.size > 0: fronts.append(current_front) # Next front is a subset of all indices dominated by the current front next_front = np.concatenate([dominated_indices[i] for i in current_front]) # Count the number of dominating solutions for each solution unique_next_front, counts = np.unique(next_front, return_counts=True) # Decrement the dominating counts dominating_counts[unique_next_front] -= counts # Next front is the set of all solutions with no dominating solutions current_front = np.where(dominating_counts == 0)[0] current_front = np.setdiff1d(current_front, np.concatenate(fronts)) return fronts
[docs]def dom_counts_indices(fitness_values): """ Calculates the number of individuals that dominate each individual and the indices of individuals that are dominated by each individual. Parameters ---------- fitness_values : ndarray, shape (pop_size, num_fitness_functions) The fitness values of each individual in the population for each fitness function. Returns ------- dominating_counts : ndarray, shape (pop_size,) The number of individuals that dominate each individual. i-th element of the array is the number of individuals that dominate the i-th individual. dominated_indices : list of ndarrays The indices of individuals that are dominated by each individual. i-th element of the list is an array containing the indices of individuals that are dominated by the i-th individual. """ pop_size = fitness_values.shape[0] dominating_counts = np.zeros(pop_size, dtype=int) dominated_indices = [[] for _ in range(pop_size)] for i in range(pop_size): for j in range(i + 1, pop_size): if all(fitness_values[j] <= fitness_values[i]): dominating_counts[i] += 1 dominated_indices[j].append(i) elif all(fitness_values[i] <= fitness_values[j]): dominating_counts[j] += 1 dominated_indices[i].append(j) return dominating_counts, dominated_indices
[docs]def dom_counts_indices_fast(fitness_values): """ Calculates the number of individuals that dominate each individual and the indices of individuals that are dominated by each individual. Faster implementation using broadcasting. Parameters ---------- fitness_values : ndarray, shape (pop_size, num_fitness_functions) The fitness values of each individual in the population for each fitness function. Returns ------- dominating_counts : ndarray, shape (pop_size,) The number of individuals that dominate each individual. dominated_indices : list of ndarrays The indices of individuals that are dominated by each individual. Notes ----- This function uses broadcasting to compare all pairs of individuals in the population. The result is a significant speedup compared to the non-broadcasting implementation. `dominating_counts` differs from the original implementation. The original implementation counts equal fitness values as dominating, while this implementation does not. """ pop_size = fitness_values.shape[0] dominating_counts = np.zeros(pop_size, dtype=int) dominated_indices = [[] for _ in range(pop_size)] for i in range(pop_size): dominating_counts[i] = np.sum(np.all(fitness_values[i] >= fitness_values, axis=1) & ~np.all(fitness_values[i] == fitness_values, axis=1)) dominated_indices[i] = np.where(np.all(fitness_values[i] <= fitness_values, axis=1) & ~np.all(fitness_values[i] == fitness_values, axis=1))[0] return dominating_counts, dominated_indices
[docs]def selection_indices(combined_fitness_values, fronts, pop_size): """ Select the best individuals from the combined population based on the non-dominated sorting results and crowding distance to maintain diversity. Parameters ---------- combined_fitness_values : ndarray, shape (N, d) Combined population containing both original population and offspring. fronts : list of ndarrays List of fronts, where each front contains the indices of individuals in that front. pop_size : int The size of the population. Returns ------- selected_indices : list Selected indices from the combined population. """ selected_indices = [] remaining_space = pop_size front_idx = 0 # Iterate over fronts until the selected population size reaches pop_size while remaining_space > 0 and front_idx < len(fronts): current_front = fronts[front_idx] if len(current_front) <= remaining_space: # If the current front can fit entirely into the selected population, add it selected_indices.extend(current_front) remaining_space -= len(current_front) else: # If the current front cannot fit entirely, select individuals based on crowding distance crowding_distances = crowding_distance(combined_fitness_values[current_front]) # Select individuals with larger crowding distances first indices = np.argpartition(crowding_distances, -remaining_space)[-remaining_space:] selected_indices.extend(current_front[indices]) remaining_space = 0 front_idx += 1 return selected_indices
[docs]def crowding_distance(fitness_values): """ Calculate crowding distance for each individual in the population. Parameters ---------- fitness_values : ndarray, shape (N, num_fitness_functions) Fitness values of the population. Returns ------- crowding_distances : ndarray, shape (N,) Crowding distances for each individual. """ pop_size, num_objectives = fitness_values.shape crowding_distances = np.zeros(pop_size) for obj_index in range(num_objectives): # Sort the fitness values based on the current objective in ascending order. Best values first. sorted_indices = np.argsort(fitness_values[:, obj_index]) crowding_distances[sorted_indices[0]] = np.inf crowding_distances[sorted_indices[-1]] = np.inf f_max = fitness_values[sorted_indices[-1], obj_index] f_min = fitness_values[sorted_indices[0], obj_index] if f_max == f_min: continue # Crowding distance is the sum of the distances to the previous and next individuals. # It geometrically describes the sum of the lengths of a cuboid. for i in range(1, pop_size - 1): crowding_distances[sorted_indices[i]] += (fitness_values[sorted_indices[i + 1], obj_index] - fitness_values[sorted_indices[i - 1], obj_index])/ (f_max - f_min) return crowding_distances