Source code for rework_pysatl_mpest.estimators.iterative.steps.maximization_step

"""Provides the Maximization-step for an iterative estimation pipeline.

This module defines the `MaximizationStep` class, a concrete implementation of
:class:`~rework_pysatl_mpest.estimators.iterative.pipeline_step.PipelineStep`.
This step is responsible for performing the Maximization (M-step) in an
Expectation-Maximization (EM) like algorithm. It updates the parameters of the
mixture model components and their weights to maximize the expected
log-likelihood, using the responsibilities computed in a preceding
Expectation-step.
"""

__author__ = "Danil Totmyanin"
__copyright__ = "Copyright (c) 2025 PySATL project"
__license__ = "SPDX-License-Identifier: MIT"


from collections.abc import Mapping, Sequence
from types import MappingProxyType
from typing import Callable, ClassVar

import numpy as np

from ....distributions import ContinuousDistribution
from ....optimizers import Optimizer
from .._strategies import q_function_strategy
from ..pipeline_state import PipelineState
from ..pipeline_step import PipelineStep
from .block import MaximizationStrategy, OptimizationBlock


[docs] class MaximizationStep(PipelineStep): """A pipeline step that performs the Maximization (M-step). This step updates the parameters of each component in the mixture model, as well as the mixture weights, based on the responsibility matrix :attr:`H` calculated in the Expectation-step. The update process is configured through a sequence of :class:`OptimizationBlock` objects, each defining a specific optimization task. Parameters ---------- blocks : Sequence[OptimizationBlock] A sequence of configuration blocks that define the optimization tasks. Each block specifies a component, its parameters to optimize, and the maximization strategy to use. optimizer : Optimizer A numerical optimizer instance used to find the optimal parameters when an analytical solution is not available for a given strategy. Attributes ---------- blocks : list[OptimizationBlock] The list of optimization tasks to be performed. optimizer : Optimizer The numerical optimizer used for parameter estimation. Methods ------- .. autosummary:: :toctree: generated/ run """ _strategies: ClassVar[Mapping[MaximizationStrategy, Callable]] = MappingProxyType( {MaximizationStrategy.QFUNCTION: q_function_strategy} ) def __init__(self, blocks: Sequence[OptimizationBlock], optimizer: Optimizer): self.blocks = list(blocks) self.optimizer = optimizer @property def available_next_steps(self) -> list[type[PipelineStep]]: """list[type[PipelineStep]]: Defines the valid subsequent steps. Specifies that a :class:`MaximizationStep` should typically be followed by an :class:`ExpectationStep` to complete one iteration of the EM algorithm. """ from rework_pysatl_mpest.estimators.iterative.steps.expectation_step import ExpectationStep return [ExpectationStep] def _update_components_params(self, component: ContinuousDistribution, params: dict[str, float]): """Helper method to update parameters for a single component. Parameters ---------- component : ContinuousDistribution The component whose parameters are to be updated. params : dict[str, float] A dictionary mapping parameter names to their new optimized values. """ param_names = list(params.keys()) param_values = list(params.values()) component.set_params_from_vector(param_names, param_values)
[docs] def run(self, state: PipelineState) -> PipelineState: """Executes the M-step. This method iterates through the configured optimization blocks, updates the parameters for each specified component using the appropriate strategy, and then recalculates the mixture weights based on the sum of responsibilities. Parameters ---------- state : PipelineState The current state of the pipeline. Must contain the responsibility matrix :attr:`H` and the mixture model :attr:`curr_mixture`. Returns ------- PipelineState The updated pipeline state with the modified :attr:`curr_mixture`. If the :attr:`H` matrix is not available in the input state, the state is returned with an error set, and no modifications are made. """ if state.H is None: error = ValueError("Responsibility matrix H is not computed.") state.error = error return state results = [] curr_mixture = state.curr_mixture for block in self.blocks: strategy = self._strategies[block.maximization_strategy] component_id, new_params = strategy(curr_mixture[block.component_id], state, block, self.optimizer) results.append((component_id, new_params)) for result in results: component_id, params = result self._update_components_params(curr_mixture[component_id], params) responsibilities_sum = np.sum(state.H, axis=0) new_weights = responsibilities_sum / state.X.shape[0] curr_mixture.log_weights = np.log(new_weights + 1e-30) return state