r"""Fast vectorized infrastructure simulator with model dependencies and rich displays.
This module provides a high-performance simulator for infrastructure maintenance systems
with support for:
- Model dependency injection via ModelContext
- Rich terminal status displays
- Vectorized operations with Numba acceleration
- Comprehensive performance tracking and metrics
- Flexible observability modes (full, partial, noisy)
- Batch policy evaluation capabilities
Example
-------
Basic usage with model dependencies::
from infralib.models.dynamics import MarkovDynamics
from infralib.models.cost import SimpleCost
from infralib.models.budget import FixedBudget
from infralib.simulator import Simulator
import numpy as np
# Setup models
dynamics = MarkovDynamics(n_states=10)
cost = SimpleCost()
budget = FixedBudget(initial_budget=10000)
# Create simulator with rich displays
sim = Simulator(dynamics, cost, budget, rich_display=True)
sim.reset(n_components=5)
# Run simulation step
actions = np.array([0, 1, 2, 0, 1])
states, info = sim.step(actions)
print(f"Total cost: {info['total_cost']:.2f}")
print(f"Failures: {info['failures']}")
Classes
-------
Simulator : Main infrastructure simulation class with model dependencies
Functions
---------
_fast_budget_check : Numba-accelerated budget constraint enforcement
"""
import os
from typing import Any
import numba
import numpy as np
from rich.console import Console, Group
from rich.panel import Panel
from rich.progress import BarColumn, Progress, TextColumn
from rich.table import Table
from .models.base import ModelContext
from .models.budget import BudgetModel
from .models.cost import CostModel
from .models.dynamics import DynamicsModel
from .models.hierarchy import HierarchyModel
from .models.metadata import MetadataModel
@numba.jit(nopython=True)
def _fast_budget_check(
costs: np.ndarray, available_budget: float
) -> tuple[np.ndarray, float]:
"""Fast budget constraint enforcement using numba.
Implements greedy budget allocation by sorting actions by cost and approving
actions in order until budget is exhausted. Uses Numba JIT compilation for
high performance.
Parameters
----------
costs : np.ndarray
Array of costs for each proposed action
available_budget : float
Total budget available for this time step
Returns
-------
tuple[np.ndarray, float]
- allowed_actions: Boolean array indicating which actions are approved
- remaining_budget: Budget remaining after approved actions
Notes
-----
This function uses a greedy allocation strategy that may not be globally
optimal but provides good performance and predictable behavior. Actions
are sorted by cost in ascending order and approved until budget runs out.
"""
n_components = len(costs)
allowed_actions = np.zeros(n_components, dtype=numba.boolean)
remaining_budget = available_budget
# Sort by cost (greedy allocation)
sorted_indices = np.argsort(costs)
for i in range(n_components):
idx = sorted_indices[i]
if costs[idx] <= remaining_budget:
allowed_actions[idx] = True
remaining_budget -= costs[idx]
return allowed_actions, remaining_budget
[docs]
class Simulator:
"""Fast vectorized infrastructure simulator with hierarchy and metadata support."""
[docs]
def __init__(
self,
dynamics: DynamicsModel,
cost: CostModel,
budget: BudgetModel,
hierarchy: HierarchyModel | None = None,
metadata: MetadataModel | None = None,
rich_display: bool = False,
seed: int | None = None,
):
self.dynamics = dynamics
self.cost = cost
self.budget = budget
self.hierarchy = hierarchy
self.metadata = metadata
self.rich_display = rich_display
# Extract type_indices from dynamics model if available
self.type_indices = getattr(dynamics, "type_indices", None)
# Initialize failure_conditions (will be set by environment)
self.failure_conditions = None
# Initialize console if rich display is enabled
if self.rich_display:
self.console = Console()
else:
self.console = None
if seed is not None:
np.random.seed(seed)
# Simulation state
self.n_components = None
self.states = None
self.time_since_inspection = None
self.time_step = 0
# Performance tracking
self.history = {
"states": [],
"actions": [],
"costs": [],
"rewards": [],
"budget_remaining": [],
"failures": [],
}
[docs]
def reset(
self, n_components: int, initial_states: np.ndarray | None = None
) -> np.ndarray:
"""Reset simulator state."""
self.n_components = n_components
# Initialize states
if initial_states is not None:
if len(initial_states) != n_components:
raise ValueError(
f"Initial states length {len(initial_states)} != n_components {n_components}"
)
self.states = initial_states.copy()
else:
# Start in good condition (state 9 out of 10)
self.states = np.full(n_components, 9, dtype=np.int32)
# Reset other simulation variables
self.time_since_inspection = np.zeros(n_components, dtype=np.int32)
self.time_step = 0
self.budget.reset()
# Clear history
self.history = {
"states": [self.states.copy()],
"actions": [],
"costs": [],
"rewards": [],
"budget_remaining": [self.budget.available()],
"failures": [np.sum(self.states == 0)],
}
return self.states.copy()
[docs]
def step(
self, actions: np.ndarray, display_status: bool = False
) -> tuple[np.ndarray, dict[str, Any]]:
"""Execute one simulation step with budget constraints."""
if len(actions) != self.n_components:
raise ValueError(
f"Actions length {len(actions)} != n_components {self.n_components}"
)
# Store previous states
prev_states = self.states.copy()
# Create context for model computations
context = self._create_context(actions)
# Compute costs for proposed actions
costs = self.cost.compute(context)
# Apply budget constraints
allowed_actions, remaining_budget_after = _fast_budget_check(
costs, self.budget.available()
)
# Modify actions based on budget constraints
final_actions = np.where(
allowed_actions, actions, 0
) # Fall back to "do nothing"
# Create context for final actions
final_context = self._create_context(final_actions)
# Apply dynamics with budget-constrained actions
self.states = self.dynamics.compute(final_context)
# Recompute costs for actual actions taken
actual_context = self._create_context(final_actions)
actual_context.states = prev_states # Use previous states for cost calculation
actual_costs = self.cost.compute(actual_context)
# Update budget
total_cost = np.sum(actual_costs)
budget_success = self.budget.update(total_cost)
# Update time tracking
self.time_since_inspection += 1
self.time_since_inspection[final_actions == 1] = 0 # Reset for inspections
self.time_step += 1
# Update cyclic budget if needed
if hasattr(self.budget, "step_time"):
self.budget.step_time()
# Count failures
current_failures = np.sum(self.states == 0)
new_failures = np.sum((self.states == 0) & (prev_states > 0))
# Prepare info dict
info = {
"costs": actual_costs,
"total_cost": total_cost,
"budget_remaining": self.budget.available(),
"budget_success": budget_success,
"failures": current_failures,
"new_failures": new_failures,
"actions_taken": final_actions,
"actions_blocked": np.sum(~allowed_actions),
"time_step": self.time_step,
"mean_condition": np.mean(self.states),
"min_condition": np.min(self.states),
}
# Add hierarchy-based aggregations if available
if self.hierarchy is not None:
info.update(self._compute_hierarchy_metrics())
# Add metadata-based metrics if available
if self.metadata is not None:
info.update(self._compute_metadata_metrics())
# Update history
self.history["states"].append(self.states.copy())
self.history["actions"].append(final_actions.copy())
self.history["costs"].append(actual_costs.copy())
self.history["budget_remaining"].append(self.budget.available())
self.history["failures"].append(current_failures)
# Display status if requested
if display_status:
self.display_status(info)
return self.states.copy(), info
def _compute_hierarchy_metrics(self) -> dict[str, Any]:
"""Compute hierarchy-based aggregation metrics."""
metrics = {}
if self.hierarchy is None:
return metrics
# Use the hierarchy model's compute method
context = self._create_context()
hierarchy_metrics = self.hierarchy.compute(context)
return hierarchy_metrics
def _compute_metadata_metrics(self) -> dict[str, Any]:
"""Compute metadata-based metrics."""
metrics = {}
if self.metadata is None:
return metrics
# Use the metadata model's compute method
context = self._create_context()
metadata_metrics = self.metadata.compute(context)
return metadata_metrics
[docs]
def get_observation(self, observability: str = "full") -> np.ndarray:
"""Get observation based on observability setting."""
if observability == "full":
# Full state observability
obs = np.concatenate(
[
self.states / 10.0, # Normalized states
self.time_since_inspection / 100.0, # Normalized time
[
self.budget.available()
/ (
self.budget.initial_budget
if hasattr(self.budget, "initial_budget")
else 100000.0
)
],
]
)
elif observability == "partial":
# Only recently inspected components are observable
observable_mask = self.time_since_inspection <= 1
observed_states = np.where(
observable_mask, self.states, -1
) # -1 for unobserved
obs = np.concatenate(
[
observed_states / 10.0,
self.time_since_inspection / 100.0,
[
self.budget.available()
/ (
self.budget.initial_budget
if hasattr(self.budget, "initial_budget")
else 100000.0
)
],
]
)
elif observability == "noisy":
# Noisy observations
noise = np.random.normal(0, 0.1, size=self.states.shape)
noisy_states = np.clip(self.states + noise, 0, 10)
obs = np.concatenate(
[
noisy_states / 10.0,
self.time_since_inspection / 100.0,
[
self.budget.available()
/ (
self.budget.initial_budget
if hasattr(self.budget, "initial_budget")
else 100000.0
)
],
]
)
else:
raise ValueError(f"Unknown observability type: {observability}")
return obs
[docs]
def batch_rollout(
self, policy_fn, horizon: int, n_rollouts: int = 1
) -> dict[str, np.ndarray]:
"""Run multiple rollouts in parallel for Monte Carlo evaluation."""
all_returns = []
all_costs = []
all_failures = []
original_state = self.states.copy() if self.states is not None else None
original_time_step = self.time_step
for _rollout in range(n_rollouts):
# Reset for each rollout
if original_state is not None:
self.reset(len(original_state), original_state)
total_return = 0
total_cost = 0
rollout_failures = []
for _step in range(horizon):
# Get observation
obs = self.get_observation()
# Get action from policy
actions = policy_fn(obs)
# Take step
states, info = self.step(actions)
# Simple reward (negative cost and failure penalty)
reward = -info["total_cost"] - info["failures"] * 100
total_return += reward
total_cost += info["total_cost"]
rollout_failures.append(info["failures"])
# Early termination conditions
if info["budget_remaining"] <= 0:
break
if info["failures"] > self.n_components * 0.5: # More than half failed
break
all_returns.append(total_return)
all_costs.append(total_cost)
all_failures.append(rollout_failures)
# Restore original state
if original_state is not None:
self.reset(len(original_state), original_state)
self.time_step = original_time_step
return {
"returns": np.array(all_returns),
"costs": np.array(all_costs),
"failures": np.array(all_failures),
"mean_return": np.mean(all_returns),
"std_return": np.std(all_returns),
"mean_cost": np.mean(all_costs),
}
def _create_context(self, actions: np.ndarray | None = None) -> ModelContext:
"""Create ModelContext for the current simulation state.
Parameters
----------
actions : np.ndarray, optional
Current actions being taken
Returns
-------
ModelContext
Context object with current state and model references
"""
return ModelContext(
states=self.states.copy() if self.states is not None else None,
actions=actions.copy() if actions is not None else None,
time_step=self.time_step,
dynamics=self.dynamics,
cost=self.cost,
budget=self.budget,
hierarchy=self.hierarchy,
metadata=self.metadata,
history=dict(self.history) if self.history else None,
)
[docs]
def create_status_display(
self, info: dict[str, Any] | None = None
) -> tuple[Panel, Progress]:
"""Create rich status display components.
Parameters
----------
info : dict, optional
Information from last simulation step
Returns
-------
tuple[Panel, Progress]
Rich panel with status table and progress bar for budget usage
"""
if not self.rich_display:
raise ValueError(
"Rich display not enabled. Set rich_display=True in constructor."
)
if len(self.history["actions"]) == 0:
# No actions taken yet
table = Table(show_header=False, expand=True)
table.add_row("Current Step:", "0")
table.add_row("Status:", "Initialized, no actions taken")
progress = Progress(
"[progress.description]{task.description}",
BarColumn(),
TextColumn("{task.percentage:>3.0f}%"),
TextColumn(" | Budget: Not started"),
transient=True,
)
progress.add_task("Budget Usage:", total=100, completed=0)
else:
action = self.history["actions"][-1]
action_counts = np.bincount(action, minlength=4)
action_percentages = (action_counts * 100) / len(action)
failure_percentage = (np.sum(self.states == 0) / len(self.states)) * 100
table = Table(show_header=False, expand=True)
table.add_row("Current Step:", f"{self.time_step}")
table.add_row(
"Do Nothing:", f"{action_counts[0]} ({action_percentages[0]:.0f}%)"
)
table.add_row(
"Inspect:", f"{action_counts[1]} ({action_percentages[1]:.0f}%)"
)
table.add_row(
"Repair:", f"{action_counts[2]} ({action_percentages[2]:.0f}%)"
)
table.add_row(
"Replace:", f"{action_counts[3]} ({action_percentages[3]:.0f}%)"
)
table.add_row("Min Condition:", f"{np.min(self.states):.1f}")
table.add_row("Mean Condition:", f"{np.mean(self.states):.1f}")
table.add_row("Max Condition:", f"{np.max(self.states):.1f}")
table.add_row(
"Total Failures:",
f"{np.sum(self.states == 0)} ({failure_percentage:.1f}%)",
)
# Budget information
current_budget = (
self.budget.available()
if hasattr(self.budget, "available")
else self.budget._available_internal()
)
initial_budget = getattr(
self.budget,
"initial_budget",
current_budget + sum(sum(c) for c in self.history["costs"]),
)
if info and "actions_blocked" in info:
table.add_row("Actions Blocked:", f"{info['actions_blocked']}")
if initial_budget > 0:
percent_budget_used = (
(initial_budget - current_budget) / initial_budget
) * 100
else:
percent_budget_used = 0
progress = Progress(
"[progress.description]{task.description}",
BarColumn(),
TextColumn("{task.percentage:>3.0f}%"),
TextColumn(" | Remaining: {task.fields[remaining_budget]:,.0f}"),
transient=True,
)
progress.add_task(
"Budget Usage:",
total=100,
completed=percent_budget_used,
remaining_budget=current_budget,
)
# Create panel
panel = Panel.fit(
table,
title="Infrastructure Simulation Status",
border_style="green"
if np.mean(self.states) > 5
else "yellow"
if np.mean(self.states) > 2
else "red",
padding=(1, 2),
)
return panel, progress
[docs]
def clear_terminal(self):
"""Clear the terminal screen in a cross-platform way."""
if os.name == "nt":
_ = os.system("cls")
else:
_ = os.system("clear")
[docs]
def display_status(self, info: dict[str, Any] | None = None):
"""Display the current simulation status using rich formatting.
Parameters
----------
info : dict, optional
Information from the last simulation step
"""
if not self.rich_display or not self.console:
return
panel, progress = self.create_status_display(info)
self.console.print(Group(panel, progress))