Source code for infralib.models.cost

"""Cost models with unified interface."""

import numpy as np

from .base import BaseModel, ModelContext


[docs] class CostModel(BaseModel): """Base class for cost models with unified interface."""
[docs] def compute(self, context: ModelContext) -> np.ndarray: """Compute costs using context. Args: context: Must contain states and actions Returns: costs: Array of costs per component """ if context.actions is None: raise ValueError("Actions required in context for cost computation") self.validate_context(context) next_states = None if context.dynamics: next_states = context.dynamics.compute(context) return self._compute_costs(context, next_states)
def _compute_costs( self, context: ModelContext, next_states: np.ndarray | None ) -> np.ndarray: """Internal cost computation to be implemented by subclasses.""" raise NotImplementedError
[docs] def compute_legacy( self, states: np.ndarray, actions: np.ndarray, next_states: np.ndarray ) -> np.ndarray: """Legacy interface for backward compatibility.""" context = ModelContext(states=states, actions=actions) return self._compute_costs(context, next_states)
[docs] class SimpleCost(CostModel): """Fixed costs per action type with validation."""
[docs] @classmethod def get_parameter_spec(cls): return { "inspect_cost": (float, (0.0, 1000.0), "Cost of inspection action"), "repair_cost": (float, (0.0, 10000.0), "Cost of repair action"), "replace_cost": (float, (0.0, 50000.0), "Cost of replacement action"), "failure_penalty": ( float, (0.0, 100000.0), "Penalty for component failure", ), }
[docs] def __init__( self, inspect_cost: float = 10.0, repair_cost: float = 100.0, replace_cost: float = 1000.0, failure_penalty: float = 5000.0, ): super().__init__( inspect_cost=inspect_cost, repair_cost=repair_cost, replace_cost=replace_cost, failure_penalty=failure_penalty, )
def _setup(self): """Setup cost arrays after validation.""" self.action_costs = np.array( [ 0.0, self.params["inspect_cost"], self.params["repair_cost"], self.params["replace_cost"], ] ) self.failure_penalty = self.params["failure_penalty"] def _compute_costs( self, context: ModelContext, next_states: np.ndarray | None ) -> np.ndarray: """Compute costs based on actions and failures.""" states = context.states actions = context.actions if next_states is None: next_states = states costs = self.action_costs[actions] failure_mask = (next_states == 0) & (states > 0) costs[failure_mask] += self.failure_penalty return costs
[docs] def reset(self, context: ModelContext | None = None): """Reset cost model (nothing to reset for simple costs).""" pass
[docs] class LengthBasedCost(CostModel): """Costs scaled by component length (for road networks)."""
[docs] @classmethod def get_parameter_spec(cls): return { "inspect_cost_per_km": (float, (0.0, 500.0), "Inspection cost per km"), "repair_cost_per_km": (float, (0.0, 5000.0), "Repair cost per km"), "replace_cost_per_km": (float, (0.0, 25000.0), "Replacement cost per km"), "failure_penalty_per_km": (float, (0.0, 50000.0), "Failure penalty per km"), }
[docs] def __init__( self, inspect_cost_per_km: float = 50.0, repair_cost_per_km: float = 500.0, replace_cost_per_km: float = 5000.0, failure_penalty_per_km: float = 10000.0, component_lengths: np.ndarray = None, ): super().__init__( inspect_cost_per_km=inspect_cost_per_km, repair_cost_per_km=repair_cost_per_km, replace_cost_per_km=replace_cost_per_km, failure_penalty_per_km=failure_penalty_per_km, ) self.component_lengths = component_lengths
def _setup(self): """Setup cost arrays after validation.""" self.cost_per_km = np.array( [ 0.0, self.params["inspect_cost_per_km"], self.params["repair_cost_per_km"], self.params["replace_cost_per_km"], ] ) self.failure_penalty_per_km = self.params["failure_penalty_per_km"] if self.component_lengths is None: self.component_lengths = np.ones(1)
[docs] def set_component_lengths(self, lengths: np.ndarray): """Set component lengths for cost calculation.""" self.component_lengths = lengths
def _compute_costs( self, context: ModelContext, next_states: np.ndarray | None ) -> np.ndarray: """Compute length-scaled costs.""" states = context.states actions = context.actions if next_states is None: next_states = states if context.metadata and hasattr(context.metadata, "get_bulk_attribute"): try: lengths = context.metadata.get_bulk_attribute( range(len(states)), "length_km" ) except (KeyError, AttributeError): lengths = self._get_lengths(len(states)) else: lengths = self._get_lengths(len(states)) costs = self.cost_per_km[actions] * lengths failure_mask = (next_states == 0) & (states > 0) costs[failure_mask] += self.failure_penalty_per_km * lengths[failure_mask] return costs def _get_lengths(self, n_components: int) -> np.ndarray: """Get component lengths array.""" if len(self.component_lengths) == 1: return np.full(n_components, self.component_lengths[0]) return self.component_lengths
[docs] def reset(self, context: ModelContext | None = None): """Reset cost model.""" pass
[docs] class NonlinearCost(CostModel): """Nonlinear cost model reflecting deterioration-dependent repair costs."""
[docs] @classmethod def get_parameter_spec(cls): return { "inspect_cost": (float, (0.0, 1000.0), "Base inspection cost"), "replacement_cost": (float, (0.0, 50000.0), "Full replacement cost"), "cost_sensitivity": (float, (1.0, 5.0), "Cost sensitivity parameter"), "min_repair_fraction": (float, (0.1, 0.5), "Minimum repair cost fraction"), "failure_threshold": (int, (0, 10), "State below which component fails"), "failure_penalty": (float, (0.0, 100000.0), "Failure penalty"), }
[docs] def __init__( self, inspect_cost: float = 10.0, replacement_cost: float = 1000.0, cost_sensitivity: float = 2.0, min_repair_fraction: float = 0.2, failure_threshold: int = 1, failure_penalty: float = 5000.0, n_states: int = 10, ): super().__init__( inspect_cost=inspect_cost, replacement_cost=replacement_cost, cost_sensitivity=cost_sensitivity, min_repair_fraction=min_repair_fraction, failure_threshold=failure_threshold, failure_penalty=failure_penalty, ) self.n_states = n_states
def _setup(self): """Setup cost parameters after validation.""" self.inspect_cost = self.params["inspect_cost"] self.replacement_cost = self.params["replacement_cost"] self.cost_sensitivity = self.params["cost_sensitivity"] self.min_repair_fraction = self.params["min_repair_fraction"] self.failure_threshold = self.params["failure_threshold"] self.failure_penalty = self.params["failure_penalty"] def _compute_costs( self, context: ModelContext, next_states: np.ndarray | None ) -> np.ndarray: """Compute nonlinear costs based on component condition.""" states = context.states actions = context.actions if next_states is None: next_states = states costs = np.zeros_like(states, dtype=float) inspect_mask = actions == 1 costs[inspect_mask] = self.inspect_cost repair_mask = actions == 2 repair_costs = self._compute_repair_costs(states[repair_mask]) costs[repair_mask] = repair_costs replace_mask = actions == 3 costs[replace_mask] = self.replacement_cost failure_mask = (next_states <= self.failure_threshold) & ( states > self.failure_threshold ) costs[failure_mask] += self.failure_penalty return costs
[docs] def reset(self, context: ModelContext | None = None): """Reset cost model.""" pass
def _compute_repair_costs(self, states: np.ndarray) -> np.ndarray: """Compute state-dependent repair costs.""" if len(states) == 0: return np.array([]) normalized_condition = states / (self.n_states - 1) cost_multiplier = ( (1 - normalized_condition) ** self.cost_sensitivity ) + self.min_repair_fraction return cost_multiplier * self.replacement_cost