Source code for optimagic.optimization.history

import warnings
from dataclasses import dataclass
from functools import partial
from typing import Any, Callable, Iterable, Literal

import numpy as np
import pandas as pd
from numpy.typing import NDArray
from pybaum import leaf_names, tree_just_flatten

from optimagic.parameters.tree_registry import get_registry
from optimagic.timing import CostModel
from optimagic.typing import Direction, EvalTask, PyTree


@dataclass(frozen=True)
class HistoryEntry:
    params: PyTree
    fun: float | None
    start_time: float
    stop_time: float
    task: EvalTask


[docs]class History: # TODO: add counters for the relevant evaluations def __init__( self, direction: Direction, params: list[PyTree] | None = None, fun: list[float | None] | None = None, start_time: list[float] | None = None, stop_time: list[float] | None = None, batches: list[int] | None = None, task: list[EvalTask] | None = None, ) -> None: """Initialize a history. The history must know the direction of the optimization problem in order to correctly return monotone sequences. The history can be initialized empty, for example for usage during an optimization process, or with data, for example to recover a history from a log. """ _validate_args_are_all_none_or_lists_of_same_length( params, fun, start_time, stop_time, batches, task ) self.direction = direction self._params = params if params is not None else [] self._fun = fun if fun is not None else [] self._start_time = start_time if start_time is not None else [] self._stop_time = stop_time if stop_time is not None else [] self._batches = batches if batches is not None else [] self._task = task if task is not None else [] # ================================================================================== # Methods to add entries to the history # ================================================================================== def add_entry(self, entry: HistoryEntry, batch_id: int | None = None) -> None: if batch_id is None: batch_id = self._get_next_batch_id() self._params.append(entry.params) self._fun.append(entry.fun) self._start_time.append(entry.start_time) self._stop_time.append(entry.stop_time) self._batches.append(batch_id) self._task.append(entry.task) def add_batch( self, batch: list[HistoryEntry], batch_size: int | None = None ) -> None: # The naming is complicated here: # batch refers to the entries to be added to the history in one go # batch_size is a property of a parallelizing algorithm that influences how # the batch_ids are assigned. It is not the same as the length of the batch. if batch_size is None: batch_size = len(batch) start = self._get_next_batch_id() n_batches = int(np.ceil(len(batch) / batch_size)) ids = np.repeat(np.arange(start, start + n_batches), batch_size)[: len(batch)] for entry, id in zip(batch, ids, strict=False): self.add_entry(entry, id) def _get_next_batch_id(self) -> int: if not self._batches: batch = 0 else: batch = self._batches[-1] + 1 return batch # ================================================================================== # Properties and methods to access the history # ================================================================================== # Function data, function value, and monotone function value # ----------------------------------------------------------------------------------
[docs] def fun_data(self, cost_model: CostModel, monotone: bool = False) -> pd.DataFrame: """Return the function value data. Args: cost_model: The cost model that is used to calculate the time measure. monotone: Whether to return the monotone function values. Defaults to False. Returns: pd.DataFrame: The function value data. The columns are: 'fun', 'time' and 'task'. If monotone is False, value is the fun value, otherwise the monotone function value. If dropna is True, rows with missing values are dropped. """ if monotone: fun = self.monotone_fun else: fun = np.array(self.fun, dtype=np.float64) # converts None to nan timings = self._get_total_timings(cost_model) task = _task_to_categorical(self.task) if not self._is_serial(): # In the non-serial case, we take the batching into account and reduce # timings and fun to one value per batch. timings = _apply_reduction_to_batches( data=timings, batch_ids=self.batches, reduction_function=cost_model.aggregate_batch_time, ) min_or_max = ( np.nanmin if self.direction == Direction.MINIMIZE else np.nanmax ) fun = _apply_reduction_to_batches( data=fun, batch_ids=self.batches, reduction_function=min_or_max, # type: ignore[arg-type] ) # Verify that tasks are homogeneous in each batch, and select first if true. tasks_and_batches = pd.DataFrame({"task": task, "batches": self.batches}) grouped_tasks = tasks_and_batches.groupby("batches")["task"] if not grouped_tasks.nunique().eq(1).all(): raise ValueError("Tasks are not homogeneous in each batch.") task = grouped_tasks.first().reset_index(drop=True) time = np.cumsum(timings) return pd.DataFrame({"fun": fun, "time": time, "task": task})
@property def fun(self) -> list[float | None]: return self._fun @property def monotone_fun(self) -> NDArray[np.float64]: """The monotone function value of the history. If the value is None, the output at that position is nan. """ return _calculate_monotone_sequence(self.fun, direction=self.direction) # Acceptance # ---------------------------------------------------------------------------------- @property def is_accepted(self) -> NDArray[np.bool_]: """Boolean indicator whether a function value is accepted. A function value is accepted if it is smaller (or equal) than the monotone function value counterpart in the case of minimization, or larger (or equal) in the case of maximization. If the value is None, the output at that position is False. """ fun_arr = np.array(self.fun, dtype=np.float64) if self.direction == Direction.MINIMIZE: return fun_arr <= self.monotone_fun elif self.direction == Direction.MAXIMIZE: return fun_arr >= self.monotone_fun # Parameter data, params, flat params, and flat params names # ----------------------------------------------------------------------------------
[docs] def params_data( self, dropna: bool = False, collapse_batches: bool = False ) -> pd.DataFrame: """Return the parameter data. Args: dropna: Whether to drop rows with missing function values. These correspond to parameters that were used to calculate pure jacobians. Defaults to False. collapse_batches: Whether to collapse the batches and only keep the parameters that led to the minimal (or maximal) function value in each batch. Defaults to False. Returns: pd.DataFrame: The parameter data. The columns are: 'name' (the parameter names), 'value' (the parameter values), 'task' (the task for which the parameter was used), and 'counter' (a counter that is unique for each row). """ wide = pd.DataFrame(self.flat_params, columns=self.flat_param_names) wide["task"] = _task_to_categorical(self.task) wide["fun"] = self.fun # If requested, we collapse the batches and only keep the parameters that led to # the minimal (or maximal) function value in each batch. if collapse_batches and not self._is_serial(): wide["batches"] = self.batches # Verify that tasks are homogeneous in each batch if not wide.groupby("batches")["task"].nunique().eq(1).all(): raise ValueError("Tasks are not homogeneous in each batch.") # We fill nans with inf or -inf to make sure that the idxmin/idxmax is # well-defined, since there is the possibility that all fun values are nans # in a batch. if self.direction == Direction.MINIMIZE: loc = ( wide.assign(fun_without_nan=wide["fun"].fillna(np.inf)) .groupby("batches")["fun_without_nan"] .idxmin() ) elif self.direction == Direction.MAXIMIZE: loc = ( wide.assign(fun_without_nan=wide["fun"].fillna(-np.inf)) .groupby("batches")["fun_without_nan"] .idxmax() ) wide = wide.loc[loc].drop(columns="batches") # We drop rows with missing values if requested. These correspond to parameters # that were used to calculate pure jacobians. This step must be done before # dropping the fun column and before setting the counter. if dropna: wide = wide.dropna(subset="fun") wide["counter"] = np.arange(len(wide)) long = pd.melt( wide, var_name="name", value_name="value", id_vars=["task", "counter", "fun"], ) data = long.reindex(columns=["counter", "name", "value", "task", "fun"]) return data.set_index(["counter", "name"]).sort_index()
@property def params(self) -> list[PyTree]: return self._params @property def flat_params(self) -> list[list[float]]: return _get_flat_params(self._params) @property def flat_param_names(self) -> list[str]: return _get_flat_param_names(param=self._params[0]) # Time # ---------------------------------------------------------------------------------- def _get_total_timings( self, cost_model: CostModel | Literal["wall_time"] ) -> NDArray[np.float64]: """Return the total timings across all tasks. Args: cost_model: The cost model that is used to calculate the time measure. If "wall_time", the wall time is returned. Returns: np.ndarray: The sum of the timings across all tasks. """ if not isinstance(cost_model, CostModel) and cost_model != "wall_time": raise TypeError("cost_model must be a CostModel or 'wall_time'.") if cost_model == "wall_time": return np.array(self.stop_time, dtype=np.float64) - self.start_time[0] fun_time = self._get_timings_per_task( task=EvalTask.FUN, cost_factor=cost_model.fun ) jac_time = self._get_timings_per_task( task=EvalTask.JAC, cost_factor=cost_model.jac ) fun_and_jac_time = self._get_timings_per_task( task=EvalTask.FUN_AND_JAC, cost_factor=cost_model.fun_and_jac ) return fun_time + jac_time + fun_and_jac_time def _get_timings_per_task( self, task: EvalTask, cost_factor: float | None ) -> NDArray[np.float64]: """Return the time measure per task. Args: task: The task for which the time is calculated. cost_factor: The cost factor used to calculate the time. If None, the time is the difference between the start and stop time, otherwise the time is given by the cost factor. Returns: np.ndarray: The time per task. For entries where the task is not the requested task, the time is 0. """ task_mask = np.array([1 if t == task else 0 for t in self.task]) factor: float | NDArray[np.float64] if cost_factor is None: factor = np.array(self.stop_time, dtype=np.float64) - np.array( self.start_time, dtype=np.float64 ) else: factor = cost_factor return factor * task_mask @property def start_time(self) -> list[float]: return self._start_time @property def stop_time(self) -> list[float]: return self._stop_time # Batches and fast_path # ---------------------------------------------------------------------------------- @property def batches(self) -> list[int]: return self._batches def _is_serial(self) -> bool: return np.array_equal(self.batches, np.arange(len(self.batches))) # Tasks # ---------------------------------------------------------------------------------- @property def task(self) -> list[EvalTask]: return self._task # ================================================================================== # Add deprecated dict access # ================================================================================== @property def time(self) -> list[float]: msg = ( "The attribute `time` of History will be deprecated soon. Use the " "`start_time` method instead." ) warnings.warn(msg, FutureWarning) arr = np.array(self._start_time) return (arr - arr[0]).tolist() @property def criterion(self) -> list[float | None]: msg = "The attribute `criterion` of History is deprecated. Use `fun` instead." warnings.warn(msg, FutureWarning) return self.fun @property def runtime(self) -> list[float]: msg = ( "The attribute `runtime` of History will be deprecated soon. Use the " "`start_time` method instead." ) warnings.warn(msg, FutureWarning) return self.time def __getitem__(self, key: str) -> Any: msg = "dict-like access to History is deprecated. Use attribute access instead." warnings.warn(msg, FutureWarning) return getattr(self, key)
# ====================================================================================== # Functions directly used in History methods # ====================================================================================== def _get_flat_params(params: list[PyTree]) -> list[list[float]]: fast_path = len(params) > 0 and _is_1d_array(params[0]) if fast_path: flatten = lambda x: x.tolist() else: registry = get_registry(extended=True) flatten = partial(tree_just_flatten, registry=registry) return [flatten(p) for p in params] def _get_flat_param_names(param: PyTree) -> list[str]: fast_path = _is_1d_array(param) if fast_path: # Mypy raises an error here because .tolist() returns a str for zero-dimensional # arrays, but the fast path is only taken for 1d arrays, so it can be ignored. return np.arange(param.size).astype(str).tolist() # type: ignore[return-value] registry = get_registry(extended=True) return leaf_names(param, registry=registry) def _is_1d_array(param: PyTree) -> bool: return isinstance(param, np.ndarray) and param.ndim == 1 def _calculate_monotone_sequence( sequence: list[float | None], direction: Direction ) -> NDArray[np.float64]: sequence_arr = np.array(sequence, dtype=np.float64) # converts None to nan nan_mask = np.isnan(sequence_arr) if direction == Direction.MINIMIZE: sequence_arr[nan_mask] = np.inf out = np.minimum.accumulate(sequence_arr) elif direction == Direction.MAXIMIZE: sequence_arr[nan_mask] = -np.inf out = np.maximum.accumulate(sequence_arr) out[nan_mask] = np.nan return out # ====================================================================================== # Misc # ====================================================================================== def _validate_args_are_all_none_or_lists_of_same_length( *args: list[Any] | None, ) -> None: all_none = all(arg is None for arg in args) all_list = all(isinstance(arg, list) for arg in args) if not all_none: if all_list: unique_list_lengths = set(map(len, args)) # type: ignore[arg-type] if len(unique_list_lengths) != 1: raise ValueError("All list arguments must have the same length.") else: raise ValueError("All arguments must be lists of the same length or None.") def _task_to_categorical(task: list[EvalTask]) -> "pd.Series[str]": EvalTaskDtype = pd.CategoricalDtype(categories=[t.value for t in EvalTask]) return pd.Series([t.value for t in task], dtype=EvalTaskDtype) def _apply_reduction_to_batches( data: NDArray[np.float64], batch_ids: list[int], reduction_function: Callable[[Iterable[float]], float], ) -> NDArray[np.float64]: """Apply a reduction operator on batches of data. This function assumes that batch_ids are non-empty and sorted. Args: data: 1d array with data. batch_ids: A list with batch ids whose length is equal to the size of data. Values need to be sorted and can be repeated. reduction_function: A reduction function that takes an iterable of floats as input (e.g., a numpy.ndarray or list of floats) and returns a scalar. The function must be able to handle NaN's. Returns: The transformed data. Has one entry per unique batch id, equal to the result of applying the reduction function to the data of that batch. """ batch_starts, batch_stops = _get_batch_starts_and_stops(batch_ids) batch_results: list[float] = [] for start, stop in zip(batch_starts, batch_stops, strict=True): batch_data = data[start:stop] batch_id = batch_ids[start] try: if np.isnan(batch_data).all(): reduced = np.nan else: reduced = reduction_function(batch_data) except Exception as e: msg = ( f"Calling function {reduction_function.__name__} on batch {batch_id} " "of the History raised an Exception. Please verify that " f"{reduction_function.__name__} is well-defined, takes an iterable of " "floats as input and returns a scalar. The function must be able to " "handle NaN's." ) raise ValueError(msg) from e if not np.isscalar(reduced): msg = ( f"Function {reduction_function.__name__} did not return a scalar for " f"batch {batch_id}. Please verify that {reduction_function.__name__} " "returns a scalar when called on an iterable of floats. The function " "must be able to handle NaN's." ) raise ValueError(msg) batch_results.append(reduced) # type: ignore[arg-type] return np.array(batch_results, dtype=np.float64) def _get_batch_starts_and_stops(batch_ids: list[int]) -> tuple[list[int], list[int]]: """Get start and stop indices of batches. This function assumes that batch_ids are non-empty and sorted. """ ids_arr = np.array(batch_ids, dtype=np.int64) indices = np.where(ids_arr[:-1] != ids_arr[1:])[0] + 1 list_indices: list[int] = indices.tolist() # type: ignore[assignment] starts = [0, *list_indices] stops = [*starts[1:], len(batch_ids)] return starts, stops