r"""Simple infrastructure management environments using config files.
This module provides simple implementations of infrastructure maintenance environments
that can be configured via YAML config files and CSV component data files. These
environments use standard models and are designed for ease of use and educational
purposes.
The environments support:
- Configuration-based setup from YAML and CSV files
- Both POMDP (partial observability) and MDP (full observability) variants
- Stable-baselines3 compatibility
- Multiple reward schemes and termination conditions
- Rich terminal displays and rendering
Example
-------
Using configuration files to create environments::
# Create POMDP environment
env = SimpleInfraEnv.from_config(
config_path='config.yaml',
components_path='components.csv'
)
# Create MDP environment
env = SimpleInfraMDPEnv.from_config(
config_path='config.yaml',
components_path='components.csv'
)
Classes
-------
SimpleInfraEnv : POMDP-style infrastructure environment
SimpleInfraMDPEnv : MDP-style infrastructure environment with component margins
Functions
---------
load_config_data : Load parameters from config and component files
"""
import csv
from typing import Any
import numpy as np
import yaml
from ..models.budget import FixedBudget
from ..models.cost import SimpleCost
from ..models.dynamics import MarkovDynamics, WeibullDynamics
from .base import BaseInfraEnv
[docs]
def load_config_data(config_path: str, components_path: str) -> dict[str, Any]:
"""Load configuration from YAML and CSV files.
Parameters
----------
config_path : str
Path to YAML configuration file
components_path : str
Path to CSV components data file
Returns
-------
dict
Dictionary containing all loaded parameters
Examples
--------
>>> params = load_config_data('config.yaml', 'components.csv')
>>> print(f"Budget: {params['initial_budget']}")
>>> print(f"Components: {len(params['component_types'])}")
"""
# Read config.yaml
with open(config_path) as f:
config = yaml.safe_load(f)
# Initialize lists to store component parameters
component_types = []
num_instances = []
failure_conditions = []
inspect_costs = []
replace_costs = []
repair_cost_params = []
importance_scores = []
dynamics_scale_means = []
dynamics_scale_sds = []
dynamics_shape_means = []
dynamics_shape_sds = []
# Read components.csv
with open(components_path) as f:
reader = csv.DictReader(f)
for row in reader:
component_types.append(row["component_type"])
num_instances.append(int(row["num_instances"]))
failure_conditions.append(float(row["failure_condition"]))
inspect_costs.append(float(row["inspect_cost"]))
replace_costs.append(float(row["replace_cost"]))
repair_cost_params.append(float(row["repair_cost_param"]))
importance_scores.append(float(row["importance_score"]))
dynamics_scale_means.append(float(row["dynamics_scale_mean"]))
dynamics_scale_sds.append(float(row["dynamics_scale_sd"]))
dynamics_shape_means.append(float(row["dynamics_shape_mean"]))
dynamics_shape_sds.append(float(row["dynamics_shape_sd"]))
# Build component_ids
component_ids = []
for t, num in zip(component_types, num_instances, strict=False):
for i in range(num):
component_ids.append(f"{t}{i}")
# Compile all parameters into a dictionary
params = {
"simulation_seed": config["simulation_seed"],
"initial_budget": config["initial_budget"],
"component_types": component_types,
"num_components_per_type": num_instances,
"component_ids": component_ids,
"failure_conditions": failure_conditions,
"inspect_costs": inspect_costs,
"replace_costs": replace_costs,
"repair_cost_params": repair_cost_params,
"importance_scores": importance_scores,
"dynamics_scale_means": dynamics_scale_means,
"dynamics_scale_sds": dynamics_scale_sds,
"dynamics_shape_means": dynamics_shape_means,
"dynamics_shape_sds": dynamics_shape_sds,
"dynamics_model_params": config["dynamics_model"],
"cost_model_params": config["cost_model"],
"budget_model_params": config["budget_model"],
}
return params
[docs]
class SimpleInfraEnv(BaseInfraEnv):
"""Simple POMDP infrastructure environment with configuration support.
This environment simulates infrastructure maintenance under partial observability
where components can only be observed through inspections. The environment uses
configuration files to define component properties and model parameters.
Features:
- POMDP formulation with inspection-based observations
- Configuration-based setup from YAML/CSV files
- Multiple reward schemes and termination conditions
- Support for component types with different characteristics
- Rich terminal displays and basic rendering
Parameters
----------
config_path : str, optional
Path to YAML configuration file
components_path : str, optional
Path to CSV components data file
reward_scheme : {'cost_penalty', 'survival', 'condition'}, default 'cost_penalty'
Reward function to use
max_steps : int, default 100
Maximum episode length
observability : {'full', 'partial', 'noisy'}, default 'partial'
Observation mode (partial recommended for POMDP)
action_type : {'multi_discrete', 'discrete'}, default 'discrete'
Action space format
render_mode : str, optional
Rendering mode
rich_display : bool, default False
Enable rich terminal status displays
Attributes
----------
params : dict
Loaded configuration parameters
failure_thresholds : np.ndarray
Failure thresholds per component
component_types : list
Component type names
Notes
-----
This environment is designed for training RL agents on infrastructure
maintenance problems with realistic component degradation and cost models.
The POMDP formulation requires agents to balance exploration (inspections)
with exploitation (maintenance actions).
Actions are:
- 0: Do nothing
- 1: Inspect component
- 2: Repair component
- 3: Replace component
Observations include last inspection results, time since inspections,
and remaining budget information.
Examples
--------
>>> env = SimpleInfraEnv.from_config('config.yaml', 'components.csv')
>>> obs, info = env.reset()
>>> action = env.action_space.sample()
>>> obs, reward, terminated, truncated, info = env.step(action)
>>> # Check environment with stable-baselines3
>>> from stable_baselines3.common.env_checker import check_env
>>> check_env(env, warn=True)
"""
[docs]
def __init__(
self,
config_path: str | None = None,
components_path: str | None = None,
reward_scheme: str = "cost_penalty",
max_steps: int = 100,
observability: str = "partial",
action_type: str = "discrete",
render_mode: str | None = None,
rich_display: bool = False,
seed: int | None = None,
):
# Load configuration if provided
if config_path and components_path:
self.params = load_config_data(config_path, components_path)
n_components = sum(self.params["num_components_per_type"])
if seed is None:
seed = self.params.get("simulation_seed", None)
else:
# Use defaults for testing/minimal setup
self.params = self._create_default_params()
n_components = 5
self.reward_scheme = reward_scheme
self.failure_thresholds = np.array(self.params["failure_conditions"])
self.component_types = self.params["component_types"]
# Initialize base environment
super().__init__(
n_components=n_components,
max_steps=max_steps,
observability=observability,
action_type=action_type,
render_mode=render_mode,
rich_display=rich_display,
seed=seed,
)
def _create_default_params(self) -> dict[str, Any]:
"""Create default parameters for testing."""
return {
"simulation_seed": 42,
"initial_budget": 2000,
"component_types": ["A", "B", "C", "D", "E"],
"num_components_per_type": [1, 1, 1, 1, 1],
"component_ids": ["A0", "B0", "C0", "D0", "E0"],
"failure_conditions": [40, 40, 40, 40, 40],
"inspect_costs": [10, 20, 30, 40, 50],
"replace_costs": [200, 200, 100, 100, 100],
"repair_cost_params": [2, 2.5, 3, 3.5, 4],
"importance_scores": [1, 1.5, 1.2, 1.8, 2],
"dynamics_scale_means": [37.22, 46.37, 27.45, 35.67, 42.89],
"dynamics_scale_sds": [2.1, 0.4, 0.85, 0.97, 1.5],
"dynamics_shape_means": [2.0, 1.89, 2.1, 1.95, 2.05],
"dynamics_shape_sds": [0.07, 0.05, 0.08, 0.06, 0.09],
"dynamics_model_params": {
"name": "WeibullDynamics",
"num_states": 101,
"num_actions": 4,
"num_obs": 102,
"seed": 42,
},
"cost_model_params": {"name": "StandardCost", "seed": 42},
"budget_model_params": {
"name": "FixedBudget",
"initial_budget": 2000,
"seed": 42,
},
}
def _create_models(self) -> tuple[Any, Any, Any, Any | None, Any | None]:
"""Create models from configuration parameters."""
# Create dynamics model based on config
dynamics_params = self.params["dynamics_model_params"]
if dynamics_params["name"] == "WeibullDynamics":
# FIXED: Use per-type Weibull parameters from CSV (no more averaging!)
# Create type_indices array mapping each component to its type
type_indices = []
for type_idx, num_instances in enumerate(
self.params["num_components_per_type"]
):
type_indices.extend([type_idx] * num_instances)
type_indices = np.array(type_indices)
dynamics = WeibullDynamics(
n_states=dynamics_params["num_states"],
shapes=self.params["dynamics_shape_means"], # Per-type shape parameters
scales=self.params["dynamics_scale_means"], # Per-type scale parameters
type_indices=type_indices, # Component-to-type mapping
repair_effectiveness=0.7,
seed=dynamics_params["seed"],
)
else:
# Fallback to MarkovDynamics
dynamics = MarkovDynamics(
n_states=dynamics_params["num_states"],
base_deterioration_rate=0.1,
repair_effectiveness=0.7,
seed=dynamics_params["seed"],
)
# Create cost model
cost = SimpleCost(
inspect_cost=np.mean(self.params["inspect_costs"]),
repair_cost=np.mean([p * 100 for p in self.params["repair_cost_params"]]),
replace_cost=np.mean(self.params["replace_costs"]),
)
# Create budget model
budget = FixedBudget(initial_budget=self.params["initial_budget"])
# No hierarchy or metadata for simple environment
return dynamics, cost, budget, None, None
def _compute_reward(self, sim_info: dict[str, Any]) -> float:
"""Compute reward based on selected reward scheme."""
if self.reward_scheme == "cost_penalty":
# Penalize costs and failures heavily
cost_penalty = sim_info["total_cost"] / 100.0
failure_penalty = sim_info["failures"] * 50.0
# Small positive reward for surviving
survival_reward = 1.0 if sim_info["failures"] == 0 else 0.0
reward = survival_reward - cost_penalty - failure_penalty
elif self.reward_scheme == "survival":
# Focus on keeping components above failure threshold
states = self.simulator.states
# Create per-component thresholds
if len(self.failure_thresholds) == len(self.component_types):
# Expand thresholds to match component instances
expanded_thresholds = []
for i, (_comp_type, n_instances) in enumerate(
zip(
self.component_types,
self.params["num_components_per_type"],
strict=False,
)
):
expanded_thresholds.extend(
[self.failure_thresholds[i]] * n_instances
)
thresholds = np.array(expanded_thresholds)
else:
thresholds = np.full(len(states), 40) # Default threshold
surviving = np.sum(states > thresholds)
survival_rate = surviving / len(states)
cost_penalty = sim_info["total_cost"] / 1000.0
reward = survival_rate - cost_penalty
elif self.reward_scheme == "condition":
# Reward based on maintaining good condition
mean_condition = sim_info.get("mean_condition", 5.0)
condition_reward = mean_condition / 10.0 # Normalize to [0,1]
cost_penalty = sim_info["total_cost"] / 500.0
failure_penalty = sim_info["failures"] * 2.0
reward = condition_reward - cost_penalty - failure_penalty
else:
# Default to cost penalty
reward = -sim_info["total_cost"] / 100.0 - sim_info["failures"] * 10.0
return float(reward)
def _check_termination(self, sim_info: dict[str, Any]) -> tuple[bool, bool]:
"""Check termination with component-specific failure thresholds."""
terminated = False
truncated = False
# Budget exhausted
if sim_info.get("budget_remaining", 0) <= 0:
terminated = True
# Check component-specific failures
states = self.simulator.states
if len(self.failure_thresholds) == len(self.component_types):
# Expand thresholds to match component instances
expanded_thresholds = []
for i, (_comp_type, n_instances) in enumerate(
zip(
self.component_types,
self.params["num_components_per_type"],
strict=False,
)
):
expanded_thresholds.extend([self.failure_thresholds[i]] * n_instances)
thresholds = np.array(expanded_thresholds)
# Terminate if any component fails
if np.any(states <= thresholds):
terminated = True
else:
# Fallback to generic failure check
if sim_info.get("failures", 0) > 0:
terminated = True
# Max steps reached
if self.current_step >= self.max_steps:
truncated = True
return terminated, truncated
[docs]
@classmethod
def from_config(
cls, config_path: str, components_path: str, **kwargs
) -> "SimpleInfraEnv":
"""Create environment from configuration files.
Parameters
----------
config_path : str
Path to YAML configuration file
components_path : str
Path to CSV components data file
**kwargs
Additional keyword arguments to override defaults
Returns
-------
SimpleInfraEnv
Configured environment instance
Examples
--------
>>> env = SimpleInfraEnv.from_config(
... 'config.yaml', 'components.csv',
... reward_scheme='survival', max_steps=200
... )
"""
return cls(config_path=config_path, components_path=components_path, **kwargs)
[docs]
class SimpleInfraMDPEnv(BaseInfraEnv):
"""Simple MDP infrastructure environment with component margins.
This environment provides full observability of component states and focuses
on the margin between current state and failure threshold. This formulation
is easier to learn for many RL algorithms as it provides direct state information.
Features:
- MDP formulation with full state observability
- Component margins as primary observation
- Configuration-based setup from YAML/CSV files
- Margin-based reward functions
- Support for component types with different failure thresholds
Parameters
----------
config_path : str, optional
Path to YAML configuration file
components_path : str, optional
Path to CSV components data file
reward_scheme : {'margin', 'weighted_margin', 'binary'}, default 'margin'
Reward function to use
max_steps : int, default 100
Maximum episode length
action_type : {'multi_discrete', 'discrete'}, default 'multi_discrete'
Action space format (multi_discrete recommended for MDP)
render_mode : str, optional
Rendering mode
rich_display : bool, default False
Enable rich terminal status displays
Attributes
----------
params : dict
Loaded configuration parameters
failure_thresholds : np.ndarray
Failure thresholds per component type
max_states : int
Maximum component state value
Notes
-----
The MDP formulation uses component margins as the primary state representation:
margin = (current_state - failure_threshold) / (max_state - failure_threshold)
This normalization makes the state space more uniform across component types
and focuses learning on the critical region near failure thresholds.
Observations include:
- Component margins (normalized to [-1, 1] range)
- Normalized remaining budget
Examples
--------
>>> env = SimpleInfraMDPEnv.from_config('config.yaml', 'components.csv')
>>> obs, info = env.reset()
>>> print(f"Component margins: {obs[:-1]}") # All but last element
>>> print(f"Budget remaining: {obs[-1]}") # Last element
"""
[docs]
def __init__(
self,
config_path: str | None = None,
components_path: str | None = None,
reward_scheme: str = "margin",
max_steps: int = 100,
action_type: str = "multi_discrete",
render_mode: str | None = None,
rich_display: bool = False,
seed: int | None = None,
):
# Load configuration if provided
if config_path and components_path:
self.params = load_config_data(config_path, components_path)
n_components = sum(self.params["num_components_per_type"])
if seed is None:
seed = self.params.get("simulation_seed", None)
else:
# Use defaults
self.params = self._create_default_params()
n_components = 5
self.reward_scheme = reward_scheme
self.failure_thresholds = np.array(self.params["failure_conditions"])
self.max_states = (
self.params["dynamics_model_params"]["num_states"] - 1
) # 0-indexed
# Initialize base environment with full observability
super().__init__(
n_components=n_components,
max_steps=max_steps,
observability="full", # MDP uses full observability
action_type=action_type,
render_mode=render_mode,
rich_display=rich_display,
seed=seed,
)
def _create_default_params(self) -> dict[str, Any]:
"""Create default parameters for testing."""
return {
"simulation_seed": 42,
"initial_budget": 2000,
"component_types": ["A", "B", "C", "D", "E"],
"num_components_per_type": [1, 1, 1, 1, 1],
"component_ids": ["A0", "B0", "C0", "D0", "E0"],
"failure_conditions": [40, 40, 40, 40, 40],
"inspect_costs": [10, 20, 30, 40, 50],
"replace_costs": [200, 200, 100, 100, 100],
"repair_cost_params": [2, 2.5, 3, 3.5, 4],
"importance_scores": [1, 1.5, 1.2, 1.8, 2],
"dynamics_scale_means": [37.22, 46.37, 27.45, 35.67, 42.89],
"dynamics_scale_sds": [2.1, 0.4, 0.85, 0.97, 1.5],
"dynamics_shape_means": [2.0, 1.89, 2.1, 1.95, 2.05],
"dynamics_shape_sds": [0.07, 0.05, 0.08, 0.06, 0.09],
"dynamics_model_params": {
"name": "WeibullDynamics",
"num_states": 101,
"num_actions": 4,
"num_obs": 102,
"seed": 42,
},
"cost_model_params": {"name": "StandardCost", "seed": 42},
"budget_model_params": {
"name": "FixedBudget",
"initial_budget": 2000,
"seed": 42,
},
}
def _create_models(self) -> tuple[Any, Any, Any, Any | None, Any | None]:
"""Create models from configuration parameters."""
# Create dynamics model based on config
dynamics_params = self.params["dynamics_model_params"]
if dynamics_params["name"] == "WeibullDynamics":
# FIXED: Use per-type Weibull parameters from CSV (no more averaging!)
# Create type_indices array mapping each component to its type
type_indices = []
for type_idx, num_instances in enumerate(
self.params["num_components_per_type"]
):
type_indices.extend([type_idx] * num_instances)
type_indices = np.array(type_indices)
dynamics = WeibullDynamics(
n_states=dynamics_params["num_states"],
shapes=self.params["dynamics_shape_means"], # Per-type shape parameters
scales=self.params["dynamics_scale_means"], # Per-type scale parameters
type_indices=type_indices, # Component-to-type mapping
repair_effectiveness=0.7,
seed=dynamics_params["seed"],
)
else:
# Fallback to MarkovDynamics
dynamics = MarkovDynamics(
n_states=dynamics_params["num_states"],
base_deterioration_rate=0.1,
repair_effectiveness=0.7,
seed=dynamics_params["seed"],
)
# Create cost model
cost = SimpleCost(
inspect_cost=np.mean(self.params["inspect_costs"]),
repair_cost=np.mean([p * 100 for p in self.params["repair_cost_params"]]),
replace_cost=np.mean(self.params["replace_costs"]),
)
# Create budget model
budget = FixedBudget(initial_budget=self.params["initial_budget"])
return dynamics, cost, budget, None, None
def _get_observation(self) -> np.ndarray:
"""Get MDP observation with component margins and budget."""
states = self.simulator.states
# Calculate component margins
if len(self.failure_thresholds) == len(self.params["component_types"]):
# Expand thresholds to match component instances
expanded_thresholds = []
for i, (_comp_type, n_instances) in enumerate(
zip(
self.params["component_types"],
self.params["num_components_per_type"],
strict=False,
)
):
expanded_thresholds.extend([self.failure_thresholds[i]] * n_instances)
thresholds = np.array(expanded_thresholds)
else:
thresholds = np.full(len(states), 40) # Default threshold
# Compute margins: (state - threshold) / (max_state - threshold)
margins = (states - thresholds) / (self.max_states - thresholds)
# Get normalized budget
budget_available = (
self.simulator.budget.available()
if hasattr(self.simulator.budget, "available")
else self.simulator.budget._available_internal()
)
normalized_budget = budget_available / self.params["initial_budget"]
# Combine margins and budget
observation = np.concatenate([margins, [normalized_budget]])
return observation.astype(np.float32)
def _compute_reward(self, sim_info: dict[str, Any]) -> float:
"""Compute reward based on selected reward scheme."""
states = self.simulator.states
if self.reward_scheme == "margin":
# Reward based on average margin
obs = self._get_observation()
margins = obs[:-1] # All but budget
avg_margin = np.mean(margins)
# Penalize negative margins (failures) heavily
failure_penalty = np.sum(margins < 0) * 10.0
cost_penalty = sim_info["total_cost"] / 1000.0
reward = avg_margin - failure_penalty - cost_penalty
elif self.reward_scheme == "weighted_margin":
# Use importance scores to weight component margins
obs = self._get_observation()
margins = obs[:-1]
# Get importance weights
if len(self.params["importance_scores"]) == len(
self.params["component_types"]
):
expanded_importance = []
for i, (_comp_type, n_instances) in enumerate(
zip(
self.params["component_types"],
self.params["num_components_per_type"],
strict=False,
)
):
expanded_importance.extend(
[self.params["importance_scores"][i]] * n_instances
)
importance = np.array(expanded_importance)
else:
importance = np.ones(len(states))
# Weighted average margin
weighted_margin = np.average(margins, weights=importance)
failure_penalty = np.sum(margins < 0) * 15.0
cost_penalty = sim_info["total_cost"] / 1000.0
reward = weighted_margin - failure_penalty - cost_penalty
elif self.reward_scheme == "binary":
# Binary reward for no failures
obs = self._get_observation()
margins = obs[:-1]
if np.all(margins >= 0):
reward = 1.0 # All components above threshold
else:
reward = -10.0 # At least one failure
# Small cost penalty
reward -= sim_info["total_cost"] / 2000.0
else:
# Default margin reward
obs = self._get_observation()
margins = obs[:-1]
reward = np.mean(margins) - sim_info["total_cost"] / 1000.0
return float(reward)
def _check_termination(self, sim_info: dict[str, Any]) -> tuple[bool, bool]:
"""Check termination based on margins."""
terminated = False
truncated = False
# Budget exhausted
if sim_info.get("budget_remaining", 0) <= 0:
terminated = True
# Check if any component margin is negative (failed)
obs = self._get_observation()
margins = obs[:-1] # All but budget
if np.any(margins < 0):
terminated = True
# Max steps reached
if self.current_step >= self.max_steps:
truncated = True
return terminated, truncated
[docs]
@classmethod
def from_config(
cls, config_path: str, components_path: str, **kwargs
) -> "SimpleInfraMDPEnv":
"""Create MDP environment from configuration files.
Parameters
----------
config_path : str
Path to YAML configuration file
components_path : str
Path to CSV components data file
**kwargs
Additional keyword arguments to override defaults
Returns
-------
SimpleInfraMDPEnv
Configured MDP environment instance
Examples
--------
>>> env = SimpleInfraMDPEnv.from_config(
... 'config.yaml', 'components.csv',
... reward_scheme='weighted_margin'
... )
"""
return cls(config_path=config_path, components_path=components_path, **kwargs)