# ____________________________________________________________________________________
#
# Pyomo: Python Optimization Modeling Objects
# Copyright (c) 2008-2026 National Technology and Engineering Solutions of Sandia, LLC
# Under the terms of Contract DE-NA0003525 with National Technology and Engineering
# Solutions of Sandia, LLC, the U.S. Government retains certain rights in this
# software. This software is distributed under the 3-clause BSD License.
# ____________________________________________________________________________________
#
# Pyomo.DoE was produced under the Department of Energy Carbon Capture Simulation
# Initiative (CCSI), and is copyright (c) 2022 by the software owners:
# TRIAD National Security, LLC., Lawrence Livermore National Security, LLC.,
# Lawrence Berkeley National Laboratory, Pacific Northwest National Laboratory,
# Battelle Memorial Institute, University of Notre Dame,
# The University of Pittsburgh, The University of Texas at Austin,
# University of Toledo, West Virginia University, et al. All rights reserved.
#
# NOTICE. This Software was developed under funding from the
# U.S. Department of Energy and the U.S. Government consequently retains
# certain rights. As such, the U.S. Government has been granted for itself
# and others acting on its behalf a paid-up, nonexclusive, irrevocable,
# worldwide license in the Software to reproduce, distribute copies to the
# public, prepare derivative works, and perform publicly and display
# publicly, and to permit other to do so.
# ____________________________________________________________________________________
from enum import Enum
from itertools import permutations, product
import json
import logging
import math
from pyomo.common.dependencies import (
numpy as np,
numpy_available,
pandas as pd,
pathlib,
matplotlib as plt,
scipy_available,
)
from pyomo.common.errors import DeveloperError
from pyomo.common.timing import TicTocTimer
from pyomo.contrib.sensitivity_toolbox.sens import get_dsdp
if numpy_available and scipy_available:
from pyomo.contrib.doe.grey_box_utilities import FIMExternalGreyBox
from pyomo.contrib.pynumero.interfaces.external_grey_box import ExternalGreyBoxBlock
import pyomo.environ as pyo
from pyomo.contrib.doe.utils import (
check_FIM,
compute_FIM_metrics,
_SMALL_TOLERANCE_DEFINITENESS,
)
from pyomo.opt import SolverStatus
[docs]
class ObjectiveLib(Enum):
determinant = "determinant" # det(FIM), D-optimality
trace = "trace" # trace(inv(FIM)), A-optimality
pseudo_trace = "pseudo_trace" # trace(FIM), pseudo-A-optimality
minimum_eigenvalue = "minimum_eigenvalue" # min(eig(FIM)), E-optimality
condition_number = "condition_number" # cond(FIM), ME-optimality
zero = "zero" # Constant zero objective, useful for initialization and debugging
[docs]
class FiniteDifferenceStep(Enum):
forward = "forward"
central = "central"
backward = "backward"
[docs]
class DesignOfExperiments:
[docs]
def __init__(
self,
experiment=None,
fd_formula="central",
step=1e-3,
objective_option="determinant",
use_grey_box_objective=False,
scale_constant_value=1.0,
scale_nominal_param_value=False,
prior_FIM=None,
jac_initial=None,
fim_initial=None,
L_diagonal_lower_bound=1e-7,
solver=None,
grey_box_solver=None,
tee=False,
grey_box_tee=False,
get_labeled_model_args=None,
logger_level=logging.WARNING,
improve_cholesky_roundoff_error=False,
_Cholesky_option=True,
_only_compute_fim_lower=True,
):
"""This package enables model-based design of experiments analysis
with Pyomo. Both direct optimization and enumeration modes are
supported.
The package has been refactored from its original form as of August 24. See
the documentation for more information.
Parameters
----------
experiment:
Experiment object that holds the model and labels all the components. The
object should have a ``get_labeled_model`` where a model is returned with
the following labeled sets:
- ``unknown_parameters``,
- ``experimental_inputs``,
- ``experimental_outputs``
fd_formula:
Finite difference formula for computing the sensitivity matrix. Must be
one of [``central``, ``forward``, ``backward``], default: ``central``
step:
Relative step size for the finite difference formula.
default: 1e-3
objective_option:
String representation of the objective option. Current available options
are:
- ``determinant`` (for determinant, or D-optimality),
- ``trace`` (for trace of covariance matrix, or A-optimality),
- ``pseudo_trace`` (for trace of Fisher Information Matrix(FIM), or pseudo A-optimality),
- ``minimum_eigenvalue``, (for E-optimality), or ``condition_number`` (for ME-optimality)
Note: E-optimality and ME-optimality are only supported when using the
grey box objective (i.e., ``grey_box_solver`` is True)
default: ``determinant``
use_grey_box_objective:
Boolean of whether or not to use the grey-box version of the objective
function. True to use grey box, False to use standard.
Default: False (do not use grey box)
scale_constant_value:
Constant scaling for the sensitivity matrix. Every element will be
multiplied by this scaling factor.
default: 1
scale_nominal_param_value:
Boolean for whether or not to scale the sensitivity matrix by the
nominal parameter values. Every column of the sensitivity matrix
will be divided by the respective nominal parameter value.
default: False
prior_FIM:
2D numpy array representing information from prior experiments. If
no value is given, the assumed prior will be a matrix of zeros. This
matrix will be assumed to be scaled as the user has specified (i.e.,
if scale_nominal_param_value is true, we will assume the FIM provided
here has been scaled by the parameter values)
jac_initial:
2D numpy array as the initial values for the sensitivity matrix.
fim_initial:
2D numpy array as the initial values for the FIM.
L_diagonal_lower_bound:
Lower bound for the values of the lower triangular Cholesky factorization
matrix.
default: 1e-7
solver:
A ``solver`` object specified by the user, default=None.
If not specified, default solver is set to IPOPT with MA57.
tee:
Solver option to be passed for verbose output.
get_labeled_model_args:
Additional arguments for the ``get_labeled_model`` function on the
Experiment object.
improve_cholesky_roundoff_error:
Boolean value of whether or not to improve round-off error. If True, it will
apply M[i,i] >= L[i,j]^2. Where, M is the FIM and L is the lower triangular matrix
from Cholesky factorization. If the round-off error is not significant, this
option can be turned off to improve performance by skipping this constraint.
_Cholesky_option:
Boolean value of whether or not to use the cholesky factorization to
compute the determinant for the D-optimality criteria. This parameter
should not be changed unless the user intends to make performance worse
(i.e., compare an existing tool that uses the full FIM to this algorithm)
_only_compute_fim_lower:
If True, only the lower triangle of the FIM is computed. This parameter
should not be changed unless the user intends to make performance worse
(i.e., compare an existing tool that uses the full FIM to this algorithm)
logger_level:
Specify the level of the logger. Change to logging.DEBUG for all messages.
"""
if experiment is None:
raise ValueError("Experiment object must be provided to perform DoE.")
# Check if the Experiment object has callable ``get_labeled_model`` function
if not hasattr(experiment, "get_labeled_model"):
raise ValueError(
"The experiment object must have a ``get_labeled_model`` function"
)
# Set the experiment object from the user
self.experiment = experiment
# Set the finite difference and subsequent step size
self.fd_formula = FiniteDifferenceStep(fd_formula)
self.step = step
# Set the objective type and scaling options:
self.objective_option = ObjectiveLib(objective_option)
self.use_grey_box = use_grey_box_objective
self.scale_constant_value = scale_constant_value
self.scale_nominal_param_value = scale_nominal_param_value
# Set the prior FIM (will be checked upon model construction)
self.prior_FIM = prior_FIM
# Set the initial values for the jacobian, fim, and L matrices
self.jac_initial = jac_initial
self.fim_initial = fim_initial
# Set the lower bound on the Cholesky lower triangular matrix
self.L_diagonal_lower_bound = L_diagonal_lower_bound
# check if user-defined solver is given
if solver:
self.solver = solver
# if not given, use default solver
else:
solver = pyo.SolverFactory("ipopt")
solver.options["linear_solver"] = "ma57"
solver.options["halt_on_ampl_error"] = "yes"
solver.options["max_iter"] = 3000
self.solver = solver
self.tee = tee
self.grey_box_tee = grey_box_tee
if grey_box_solver:
self.grey_box_solver = grey_box_solver
else:
grey_box_solver = pyo.SolverFactory("cyipopt")
grey_box_solver.config.options["linear_solver"] = "ma57"
grey_box_solver.config.options['tol'] = 1e-4
grey_box_solver.config.options['mu_strategy'] = "monotone"
self.grey_box_solver = grey_box_solver
# Set get_labeled_model_args as an empty dict if no arguments are passed
if get_labeled_model_args is None:
get_labeled_model_args = {}
self.get_labeled_model_args = get_labeled_model_args
# Revtrieve logger and set logging level
self.logger = logging.getLogger(__name__)
self.logger.setLevel(level=logger_level)
# Set the private options if passed (only developers should pass these)
self.Cholesky_option = _Cholesky_option
self.only_compute_fim_lower = _only_compute_fim_lower
# To improve round-off error in Cholesky-based objectives
self.improve_cholesky_roundoff_error = improve_cholesky_roundoff_error
# model attribute to avoid rebuilding models
self.model = pyo.ConcreteModel() # Build empty model
# Empty results object
self.results = {}
# May need this attribute for more complicated structures?
# (i.e., no model rebuilding for large models with sequential)
self._built_scenarios = False
# Perform doe
[docs]
def run_doe(self, model=None, results_file=None):
"""
Runs DoE for a single experiment estimation. Can save results in
a file based on the flag.
Parameters
----------
model: model to run the DoE, default: None (self.model)
results_file: string name of the file path to save the results
to in the form of a .json file
default: None --> don't save
"""
# Check results file name
if results_file is not None:
if type(results_file) not in [pathlib.Path, str]:
raise ValueError(
"``results_file`` must be either a Path object or a string."
)
# Start timer
sp_timer = TicTocTimer()
sp_timer.tic(msg=None)
self.logger.info("Beginning experimental optimization.")
# Model is none, set it to self.model
if model is None:
model = self.model
else:
# TODO: Add safe naming when a model is passed by the user.
# doe_block = pyo.Block()
# doe_block_name = unique_component_name(model,
# "design_of_experiments_block")
# model.add_component(doe_block_name, doe_block)
pass
# TODO: potentially work with this for more complicated models
# Create the full DoE model (build scenarios for F.D. scheme)
if not self._built_scenarios:
self.create_doe_model(model=model)
# Add the objective function to the model
if self.use_grey_box:
self.create_grey_box_objective_function(model=model)
else:
self.create_objective_function(model=model)
# Track time required to build the DoE model
build_time = sp_timer.toc(msg=None)
self.logger.info(
"Successfully built the DoE model.\nBuild time: %0.1f seconds" % build_time
)
# Solve the square problem first to
# initialize the fim and
# sensitivity constraints. First, we
# Deactivate objective expression and
# objective constraints (on a block),
# and fix the design variables.
model.objective.deactivate()
model.obj_cons.deactivate()
for comp in model.scenario_blocks[0].experiment_inputs:
comp.fix()
# TODO: safeguard solver call to see if solver terminated successfully
# see below commented code:
# res = self.solver.solve(model, tee=self.tee, load_solutions=False)
# if pyo.check_optimal_termination(res):
# model.load_solution(res)
# else:
# # The solver was unsuccessful, might want to warn the user
# # or terminate gracefully, etc.
model.dummy_obj = pyo.Objective(expr=0, sense=pyo.minimize)
self.solver.solve(model, tee=self.tee)
# Track time to initialize the DoE model
initialization_time = sp_timer.toc(msg=None)
self.logger.info(
(
"Successfully initialized the DoE model."
"\nInitialization time: %0.1f seconds" % initialization_time
)
)
model.dummy_obj.deactivate()
# Reactivate objective and unfix experimental design decisions
for comp in model.scenario_blocks[0].experiment_inputs:
comp.unfix()
model.objective.activate()
model.obj_cons.activate()
if self.use_grey_box:
# Initialize grey box inputs to be fim values currently
for i in model.parameter_names:
for j in model.parameter_names:
if list(model.parameter_names).index(i) >= list(
model.parameter_names
).index(j):
model.obj_cons.egb_fim_block.inputs[(j, i)].set_value(
pyo.value(model.fim[(i, j)])
)
# Set objective value
if self.objective_option == ObjectiveLib.trace:
trace_val = np.trace(np.linalg.pinv(self.get_FIM()))
model.obj_cons.egb_fim_block.outputs["A-opt"].set_value(trace_val)
elif self.objective_option == ObjectiveLib.determinant:
det_val = np.linalg.det(np.array(self.get_FIM()))
model.obj_cons.egb_fim_block.outputs["log-D-opt"].set_value(
np.log(det_val)
)
elif self.objective_option == ObjectiveLib.minimum_eigenvalue:
eig, _ = np.linalg.eig(np.array(self.get_FIM()))
model.obj_cons.egb_fim_block.outputs["E-opt"].set_value(np.min(eig))
elif self.objective_option == ObjectiveLib.condition_number:
eig, _ = np.linalg.eig(np.array(self.get_FIM()))
cond_number = np.log(np.abs(np.max(eig) / np.min(eig)))
model.obj_cons.egb_fim_block.outputs["ME-opt"].set_value(cond_number)
# If the model has L, initialize it with the solved FIM
if hasattr(model, "L"):
# Get the FIM values
fim_vals = [
pyo.value(model.fim[i, j])
for i in model.parameter_names
for j in model.parameter_names
]
fim_np = np.array(fim_vals).reshape(
(len(model.parameter_names), len(model.parameter_names))
)
# Need to compute the full FIM before
# initializing the Cholesky factorization
if self.only_compute_fim_lower:
fim_np = fim_np + fim_np.T - np.diag(np.diag(fim_np))
# Check if the FIM is positive definite
# If not, add jitter to the diagonal
# to ensure positive definiteness
min_eig = np.min(np.linalg.eigvals(fim_np))
if min_eig < _SMALL_TOLERANCE_DEFINITENESS:
# Raise the minimum eigenvalue to at
# least _SMALL_TOLERANCE_DEFINITENESS
jitter = np.min(
[
-min_eig + _SMALL_TOLERANCE_DEFINITENESS,
_SMALL_TOLERANCE_DEFINITENESS,
]
)
else:
# No jitter needed
jitter = 0
# Add jitter to the diagonal to ensure positive definiteness
L_vals_sq = np.linalg.cholesky(
fim_np + jitter * np.eye(len(model.parameter_names))
)
for i, c in enumerate(model.parameter_names):
for j, d in enumerate(model.parameter_names):
model.L[c, d].value = L_vals_sq[i, j]
# Initialize the inverse of L if it exists
if hasattr(model, "L_inv"):
L_inv_vals = np.linalg.inv(L_vals_sq)
for i, c in enumerate(model.parameter_names):
for j, d in enumerate(model.parameter_names):
if i >= j:
model.L_inv[c, d].value = L_inv_vals[i, j]
else:
model.L_inv[c, d].value = 0.0
# Initialize the cov_trace if it exists
if hasattr(model, "cov_trace"):
initial_cov_trace = np.sum(L_inv_vals**2)
model.cov_trace.value = initial_cov_trace
if hasattr(model, "determinant"):
model.determinant.value = np.linalg.det(np.array(self.get_FIM()))
# Solve the full model, which has now been initialized with the square solve
if self.use_grey_box:
res = self.grey_box_solver.solve(model, tee=self.grey_box_tee)
else:
res = self.solver.solve(model, tee=self.tee)
# Track time used to solve the DoE model
solve_time = sp_timer.toc(msg=None)
self.logger.info(
(
"Successfully optimized experiment."
"\nSolve time: %0.1f seconds" % solve_time
)
)
self.logger.info(
"Total time for build, initialization, and solve: %0.1f seconds"
% (build_time + initialization_time + solve_time)
)
# Avoid accidental carry-over of FIM information
fim_local = self.get_FIM()
# Make sure stale results don't follow the DoE object instance
self.results = {}
self.results["Solver Status"] = res.solver.status
self.results["Termination Condition"] = res.solver.termination_condition
if type(res.solver.message) is str:
results_message = res.solver.message
elif type(res.solver.message) is bytes:
results_message = res.solver.message.decode("utf-8")
self.results["Termination Message"] = results_message
# Important quantities for optimal design
self.results["FIM"] = fim_local
self.results["Sensitivity Matrix"] = self.get_sensitivity_matrix()
self.results["Experiment Design"] = self.get_experiment_input_values()
self.results["Experiment Design Names"] = [
str(pyo.ComponentUID(k, context=model.scenario_blocks[0]))
for k in model.scenario_blocks[0].experiment_inputs
]
self.results["Experiment Outputs"] = self.get_experiment_output_values()
self.results["Experiment Output Names"] = [
str(pyo.ComponentUID(k, context=model.scenario_blocks[0]))
for k in model.scenario_blocks[0].experiment_outputs
]
self.results["Unknown Parameters"] = self.get_unknown_parameter_values()
self.results["Unknown Parameter Names"] = [
str(pyo.ComponentUID(k, context=model.scenario_blocks[0]))
for k in model.scenario_blocks[0].unknown_parameters
]
self.results["Measurement Error"] = self.get_measurement_error_values()
self.results["Measurement Error Names"] = [
str(pyo.ComponentUID(k, context=model.scenario_blocks[0]))
for k in model.scenario_blocks[0].measurement_error
]
self.results["Prior FIM"] = [list(row) for row in list(self.prior_FIM)]
# Saving some stats on the FIM for convenience
self.results["Objective expression"] = str(self.objective_option).split(".")[-1]
self.results["log10 A-opt"] = np.log10(np.trace(np.linalg.inv(fim_local)))
self.results["log10 pseudo A-opt"] = np.log10(np.trace(fim_local))
self.results["log10 D-opt"] = np.log10(np.linalg.det(fim_local))
self.results["log10 E-opt"] = np.log10(min(np.linalg.eig(fim_local)[0]))
self.results["FIM Condition Number"] = np.linalg.cond(fim_local)
# Solve timing stats
self.results["Build Time"] = build_time
self.results["Initialization Time"] = initialization_time
self.results["Solve Time"] = solve_time
self.results["Wall-clock Time"] = build_time + initialization_time + solve_time
# Settings used to generate the optimal DoE
self.results["Finite Difference Scheme"] = str(self.fd_formula).split(".")[-1]
self.results["Finite Difference Step"] = self.step
self.results["Nominal Parameter Scaling"] = self.scale_nominal_param_value
# TODO: Add more useful fields to the results object?
# TODO: Add MetaData from the user to the results object? Or leave to the user?
# If the user specifies to save the file, do it here as a json
if results_file is not None:
with open(results_file, "w") as file:
json.dump(self.results, file)
# Perform multi-experiment doe (sequential, or ``greedy`` approach)
def run_multi_doe_sequential(self, N_exp=1):
raise NotImplementedError("Multiple experiment optimization not yet supported.")
# Perform multi-experiment doe (simultaneous, optimal approach)
def run_multi_doe_simultaneous(self, N_exp=1):
raise NotImplementedError("Multiple experiment optimization not yet supported.")
# Compute FIM for the DoE object
[docs]
def compute_FIM(self, model=None, method="sequential"):
"""
Computes the FIM for the experimental design that is
initialized from the experiment`s ``get_labeled_model()``
function.
Parameters
----------
model: model to compute FIM, default: None, (self.compute_FIM_model)
method: string to specify which method should be used
options are ``kaug`` and ``sequential``
Returns
-------
computed FIM: 2D numpy array of the FIM
"""
if model is None:
self.compute_FIM_model = self.experiment.get_labeled_model(
**self.get_labeled_model_args
).clone()
model = self.compute_FIM_model
else:
# TODO: Add safe naming when a model is passed by the user.
# doe_block = pyo.Block()
# doe_block_name = unique_component_name(model,
# "design_of_experiments_block")
# model.add_component(doe_block_name, doe_block)
# self.compute_FIM_model = model
pass
self.check_model_labels(model=model)
# Set length values for the model features
self.n_parameters = len(model.unknown_parameters)
self.n_measurement_error = len(model.measurement_error)
self.n_experiment_inputs = len(model.experiment_inputs)
self.n_experiment_outputs = len(model.experiment_outputs)
# Check FIM input, if it exists. Otherwise, set the prior_FIM attribute
if self.prior_FIM is None:
self.prior_FIM = np.zeros(
(len(model.unknown_parameters), len(model.unknown_parameters))
)
else:
self.check_model_FIM(FIM=self.prior_FIM)
# TODO: Add a check to see if the model has an objective and deactivate it.
# This solve should only be a square solve without any obj function.
if method == "sequential":
self._sequential_FIM(model=model)
self._computed_FIM = self.seq_FIM
elif method == "kaug":
self._kaug_FIM(model=model)
self._computed_FIM = self.kaug_FIM
else:
raise ValueError(
(
"The method provided, {}, must be either `sequential` "
"or `kaug`".format(method)
)
)
return self._computed_FIM
# Use a sequential method to get the FIM
def _sequential_FIM(self, model=None):
"""
Used to compute the FIM using a sequential approach,
solving the model consecutively under each of the
finite difference scenarios to build the sensitivity
matrix to subsequently compute the FIM.
"""
# Build a single model instance
if model is None:
self.compute_FIM_model = self.experiment.get_labeled_model(
**self.get_labeled_model_args
).clone()
model = self.compute_FIM_model
# Create suffix to keep track of parameter scenarios
if hasattr(model, "parameter_scenarios"):
model.del_component(model.parameter_scenarios)
model.parameter_scenarios = pyo.Suffix(direction=pyo.Suffix.LOCAL)
# Populate parameter scenarios, and scenario
# inds based on finite difference scheme
if self.fd_formula == FiniteDifferenceStep.central:
model.parameter_scenarios.update(
(2 * ind, k) for ind, k in enumerate(model.unknown_parameters.keys())
)
model.parameter_scenarios.update(
(2 * ind + 1, k)
for ind, k in enumerate(model.unknown_parameters.keys())
)
model.scenarios = range(len(model.unknown_parameters) * 2)
elif self.fd_formula in [
FiniteDifferenceStep.forward,
FiniteDifferenceStep.backward,
]:
model.parameter_scenarios.update(
(ind + 1, k) for ind, k in enumerate(model.unknown_parameters.keys())
)
model.scenarios = range(len(model.unknown_parameters) + 1)
else:
raise DeveloperError(
"Finite difference option not recognized. Please "
"contact the developers as you should not see this error."
)
# Fix design variables
for comp in model.experiment_inputs:
comp.fix()
measurement_vals = []
# In a loop.....
# Calculate measurement values for each scenario
for s in model.scenarios:
# Perturbation to be (1 + diff) * param_value
if self.fd_formula == FiniteDifferenceStep.central:
diff = self.step * (
(-1) ** s
) # Positive perturbation, even; negative, odd
elif self.fd_formula == FiniteDifferenceStep.backward:
diff = (
self.step * -1 * (s != 0)
) # Backward always negative perturbation; 0 at s = 0
elif self.fd_formula == FiniteDifferenceStep.forward:
diff = self.step * (s != 0) # Forward always positive; 0 at s = 0
# If we are doing forward/backward, no change for s=0
skip_param_update = (
self.fd_formula
in [FiniteDifferenceStep.forward, FiniteDifferenceStep.backward]
) and (s == 0)
if not skip_param_update:
param = model.parameter_scenarios[s]
# Update parameter values for the given finite difference scenario
param.set_value(model.unknown_parameters[param] * (1 + diff))
else:
continue
# Simulate the model
try:
res = self.solver.solve(model, tee=self.tee)
pyo.assert_optimal_termination(res)
except:
# TODO: Make error message more verbose,
# (i.e., add unknown parameter values so the user
# can try to solve the model instance outside of
# the pyomo.DoE framework)
raise RuntimeError(
"Model from experiment did not solve appropriately."
" Make sure the model is well-posed."
)
# Reset value of parameter to default value
# before computing finite difference perturbation
param.set_value(model.unknown_parameters[param])
# Extract the measurement values for the scenario and append
measurement_vals.append(
[pyo.value(k) for k, v in model.experiment_outputs.items()]
)
# Use the measurement outputs to make the Q matrix
measurement_vals_np = np.array(measurement_vals).T
self.seq_jac = np.zeros(
(
len(model.experiment_outputs.items()),
len(model.unknown_parameters.items()),
)
)
# Counting variable for loop
i = 0
# Loop over parameter values and grab correct
# columns for finite difference calculation
for k, v in model.unknown_parameters.items():
curr_step = v * self.step
if self.fd_formula == FiniteDifferenceStep.central:
col_1 = 2 * i
col_2 = 2 * i + 1
curr_step *= 2
elif self.fd_formula == FiniteDifferenceStep.forward:
col_1 = i
col_2 = 0
elif self.fd_formula == FiniteDifferenceStep.backward:
col_1 = 0
col_2 = i
# If scale_nominal_param_value is active, scale
# by nominal parameter value (v)
scale_factor = (1.0 / curr_step) * self.scale_constant_value
if self.scale_nominal_param_value:
scale_factor *= v
# Calculate the column of the sensitivity matrix
self.seq_jac[:, i] = (
measurement_vals_np[:, col_1] - measurement_vals_np[:, col_2]
) * scale_factor
# Increment the count
i += 1
# TODO: As more complex measurement error schemes
# are put in place, this needs to change
# Add independent (non-correlated) measurement
# error for FIM calculation
cov_y = np.zeros((len(model.measurement_error), len(model.measurement_error)))
count = 0
for k, v in model.measurement_error.items():
cov_y[count, count] = 1 / v**2
count += 1
# Compute and record FIM
self.seq_FIM = self.seq_jac.T @ cov_y @ self.seq_jac + self.prior_FIM
# Use kaug to get FIM
def _kaug_FIM(self, model=None):
"""
Used to compute the FIM using kaug, a sensitivity-based
approach that directly computes the FIM.
Parameters
----------
model: model to compute FIM, default: None, (self.compute_FIM_model)
"""
# Remake compute_FIM_model if model is None.
# compute_FIM_model needs to be the right version for function to work.
if model is None:
self.compute_FIM_model = self.experiment.get_labeled_model(
**self.get_labeled_model_args
).clone()
model = self.compute_FIM_model
# add zero (dummy/placeholder) objective function
if not hasattr(model, "objective"):
model.objective = pyo.Objective(expr=0, sense=pyo.minimize)
# Fix design variables to make the problem square
for comp in model.experiment_inputs:
comp.fix()
self.solver.solve(model, tee=self.tee)
# Probe the solved model for dsdp results (sensitivities s.t. parameters)
params_dict = {k.name: v for k, v in model.unknown_parameters.items()}
params_names = list(params_dict.keys())
dsdp_re, col = get_dsdp(model, params_names, params_dict, tee=self.tee)
# analyze result
dsdp_array = dsdp_re.toarray().T
# store dsdp returned
dsdp_extract = []
# get right lines from results
measurement_index = []
# loop over measurement variables and their time points
for k, v in model.experiment_outputs.items():
name = k.name
try:
kaug_no = col.index(name)
measurement_index.append(kaug_no)
# get right line of dsdp
dsdp_extract.append(dsdp_array[kaug_no])
except:
# k_aug does not provide value for fixed variables
self.logger.debug("The variable is fixed: %s", name)
# produce the sensitivity for fixed variables
zero_sens = np.zeros(len(params_names))
# for fixed variables, the sensitivity are a zero vector
dsdp_extract.append(zero_sens)
# Extract and calculate sensitivity if scaled by constants or parameters.
jac = [[] for k in params_names]
for d in range(len(dsdp_extract)):
for k, v in model.unknown_parameters.items():
p = params_names.index(k.name) # Index of parameter in np array
# if scaled by parameter value or constant value
sensi = dsdp_extract[d][p] * self.scale_constant_value
if self.scale_nominal_param_value:
sensi *= v
jac[p].append(sensi)
# record kaug jacobian
self.kaug_jac = np.array(jac).T
# Compute FIM
if self.prior_FIM is None:
self.prior_FIM = np.zeros((len(params_names), len(params_names)))
else:
self.check_model_FIM(FIM=self.prior_FIM)
# Constructing the Covariance of the measurements for the FIM calculation
# The following assumes independent measurement error.
cov_y = np.zeros((len(model.measurement_error), len(model.measurement_error)))
count = 0
for k, v in model.measurement_error.items():
cov_y[count, count] = 1 / v**2
count += 1
# TODO: need to add a covariance matrix for measurements (sigma inverse)
# i.e., cov_y = self.cov_y or model.cov_y
# Still deciding where this would be best.
self.kaug_FIM = self.kaug_jac.T @ cov_y @ self.kaug_jac + self.prior_FIM
# Create the DoE model (with ``scenarios`` from finite differencing scheme)
[docs]
def create_doe_model(self, model=None):
"""
Add equations to compute sensitivities, FIM, and objective.
Builds the DoE model. Adds the scenarios, the sensitivity matrix
Q, the FIM, as well as the objective function to the model.
The function alters the ``model`` input.
In the single experiment case, ``model`` will be self.model. In the
multi-experiment case, ``model`` will be one experiment to be enumerated.
Parameters
----------
model: model to add finite difference scenarios
"""
if model is None:
model = self.model
else:
# TODO: Add safe naming when a model is passed by the user.
# doe_block = pyo.Block()
# doe_block_name = unique_component_name(model,
# "design_of_experiments_block")
# model.add_component(doe_block_name, doe_block)
pass
# Developer recommendation: use the Cholesky
# decomposition for D-optimality. The explicit
# formula is available for benchmarking purposes
# and is NOT recommended.
if (
self.only_compute_fim_lower
and self.objective_option == ObjectiveLib.determinant
and not self.Cholesky_option
):
raise ValueError(
"Cannot compute determinant with explicit formula "
"if only_compute_fim_lower is True."
)
# Generate scenarios for finite difference formulae
self._generate_scenario_blocks(model=model)
# Set names for indexing sensitivity matrix (jacobian) and FIM
scen_block_ind = min(
[
k.name.split(".").index("scenario_blocks[0]")
for k in model.scenario_blocks[0].unknown_parameters.keys()
]
)
model.parameter_names = pyo.Set(
initialize=[
".".join(k.name.split(".")[(scen_block_ind + 1) :])
for k in model.scenario_blocks[0].unknown_parameters.keys()
]
)
model.output_names = pyo.Set(
initialize=[
".".join(k.name.split(".")[(scen_block_ind + 1) :])
for k in model.scenario_blocks[0].experiment_outputs.keys()
]
)
def identity_matrix(m, i, j):
if i == j:
return 1
else:
return 0
### Initialize the Jacobian if provided by the user
# If the user provides an initial Jacobian, convert it to a dictionary
if self.jac_initial is not None:
dict_jac_initialize = {}
for i, bu in enumerate(model.output_names):
for j, un in enumerate(model.parameter_names):
# Jacobian is a numpy array, rows are experimental
# outputs, columns are unknown parameters
dict_jac_initialize[(bu, un)] = self.jac_initial[i][j]
# Initialize the Jacobian matrix
def initialize_jac(m, i, j):
# If provided by the user, use the values now stored in the dictionary
if self.jac_initial is not None:
return dict_jac_initialize[(i, j)]
# Otherwise initialize to 0.1 (which is an arbitrary non-zero value)
else:
raise DeveloperError(
"Jacobian being initialized when the jac_initial attribute "
"is None. Please contact the developers as you should not "
"see this error."
)
model.sensitivity_jacobian = pyo.Var(
model.output_names, model.parameter_names, initialize=initialize_jac
)
# Initialize the FIM
if self.fim_initial is not None:
dict_fim_initialize = {
(bu, un): self.fim_initial[i, j]
for i, bu in enumerate(model.parameter_names)
for j, un in enumerate(model.parameter_names)
}
if self.objective_option == ObjectiveLib.trace:
fim_initial_inv = np.linalg.pinv(self.fim_initial)
dict_fim_inv_initialize = {
(bu, un): fim_initial_inv[i, j]
for i, bu in enumerate(model.parameter_names)
for j, un in enumerate(model.parameter_names)
}
def initialize_fim(m, j, d):
return dict_fim_initialize[(j, d)]
def initialize_fim_inv(m, j, d):
return dict_fim_inv_initialize[(j, d)]
if self.fim_initial is not None:
model.fim = pyo.Var(
model.parameter_names, model.parameter_names, initialize=initialize_fim
)
if self.objective_option == ObjectiveLib.trace:
model.fim_inv = pyo.Var(
model.parameter_names,
model.parameter_names,
initialize=initialize_fim_inv,
)
else:
model.fim = pyo.Var(
model.parameter_names, model.parameter_names, initialize=identity_matrix
)
if self.objective_option == ObjectiveLib.trace:
model.fim_inv = pyo.Var(
model.parameter_names,
model.parameter_names,
initialize=identity_matrix,
)
# To-Do: Look into this functionality.....
# if cholesky, define L elements as variables
if self.Cholesky_option and self.objective_option in (
ObjectiveLib.determinant,
ObjectiveLib.trace,
):
model.L = pyo.Var(
model.parameter_names, model.parameter_names, initialize=identity_matrix
)
# If trace objective, also need L inverse
if self.objective_option == ObjectiveLib.trace:
model.L_inv = pyo.Var(
model.parameter_names,
model.parameter_names,
initialize=identity_matrix,
)
# loop over parameter name
for i, c in enumerate(model.parameter_names):
for j, d in enumerate(model.parameter_names):
# fix the 0 half of L matrix to be 0.0
if i < j:
model.L[c, d].fix(0.0)
if self.objective_option == ObjectiveLib.trace:
model.L_inv[c, d].fix(0.0)
# Give LB to the diagonal entries
if self.L_diagonal_lower_bound:
if c == d:
model.L[c, d].setlb(self.L_diagonal_lower_bound)
if self.objective_option == ObjectiveLib.trace:
model.L_inv[c, d].setlb(self.L_diagonal_lower_bound)
# jacobian rule
def jacobian_rule(m, n, p):
"""
m: Pyomo model
n: experimental output
p: unknown parameter
"""
fd_step_mult = 1
cuid = pyo.ComponentUID(n)
param_ind = m.parameter_names.data().index(p)
# Different FD schemes lead to different scenarios for the computation
if self.fd_formula == FiniteDifferenceStep.central:
s1 = param_ind * 2
s2 = param_ind * 2 + 1
fd_step_mult = 2
elif self.fd_formula == FiniteDifferenceStep.forward:
s1 = param_ind + 1
s2 = 0
elif self.fd_formula == FiniteDifferenceStep.backward:
s1 = 0
s2 = param_ind + 1
var_up = cuid.find_component_on(m.scenario_blocks[s1])
var_lo = cuid.find_component_on(m.scenario_blocks[s2])
param = m.parameter_scenarios[max(s1, s2)]
param_loc = pyo.ComponentUID(param).find_component_on(m.scenario_blocks[0])
param_val = m.scenario_blocks[0].unknown_parameters[param_loc]
param_diff = param_val * fd_step_mult * self.step
if self.scale_nominal_param_value:
return (
m.sensitivity_jacobian[n, p]
== (var_up - var_lo)
/ param_diff
* param_val
* self.scale_constant_value
)
else:
return (
m.sensitivity_jacobian[n, p]
== (var_up - var_lo) / param_diff * self.scale_constant_value
)
# A constraint to calculate elements in Hessian matrix
# transfer prior FIM to be Expressions
fim_initial_dict = {
(bu, un): self.prior_FIM[i][j]
for i, bu in enumerate(model.parameter_names)
for j, un in enumerate(model.parameter_names)
}
def read_prior(m, i, j):
return fim_initial_dict[(i, j)]
model.prior_FIM = pyo.Expression(
model.parameter_names, model.parameter_names, rule=read_prior
)
# Off-diagonal elements are symmetric, so only
# half of the off-diagonal elements need to be
# specified.
def fim_rule(m, p, q):
"""
m: Pyomo model
p: unknown parameter
q: unknown parameter
"""
p_ind = list(m.parameter_names).index(p)
q_ind = list(m.parameter_names).index(q)
# If the row is less than the column, skip the constraint
# This logic is consistent with making the FIM a lower
# triangular matrix (as is done later in this function)
if p_ind < q_ind:
if self.only_compute_fim_lower:
return pyo.Constraint.Skip
else:
return m.fim[p, q] == m.fim[q, p]
else:
return (
m.fim[p, q]
== sum(
1
/ m.scenario_blocks[0].measurement_error[
pyo.ComponentUID(n).find_component_on(m.scenario_blocks[0])
]
** 2
* m.sensitivity_jacobian[n, p]
* m.sensitivity_jacobian[n, q]
for n in m.output_names
)
+ m.prior_FIM[p, q]
)
model.jacobian_constraint = pyo.Constraint(
model.output_names, model.parameter_names, rule=jacobian_rule
)
model.fim_constraint = pyo.Constraint(
model.parameter_names, model.parameter_names, rule=fim_rule
)
if self.only_compute_fim_lower:
# Fix the upper half of the FIM matrix elements to be 0.0.
# This eliminates extra variables and ensures the expected number of
# degrees of freedom in the optimization problem.
for ind_p, p in enumerate(model.parameter_names):
for ind_q, q in enumerate(model.parameter_names):
if ind_p < ind_q:
model.fim[p, q].fix(0.0)
if self.objective_option == ObjectiveLib.trace:
model.fim_inv[p, q].fix(0.0)
# Create scenario block structure
def _generate_scenario_blocks(self, model=None):
"""
Generates the modeling blocks corresponding to the scenarios for
the finite differencing scheme to compute the sensitivity jacobian
to compute the FIM.
The function alters the ``model`` input.
In the single experiment case, ``model`` will be self.model. In the
multi-experiment case, ``model`` will be one experiment to be enumerated.
Parameters
----------
model: model to add finite difference scenarios
"""
# If model is none, assume it is self.model
if model is None:
model = self.model
# Generate initial scenario to populate unknown parameter values
model.base_model = self.experiment.get_labeled_model(
**self.get_labeled_model_args
).clone()
# Check the model that labels are correct
self.check_model_labels(model=model.base_model)
# Gather lengths of label structures for later use in the model build process
self.n_parameters = len(model.base_model.unknown_parameters)
self.n_measurement_error = len(model.base_model.measurement_error)
self.n_experiment_inputs = len(model.base_model.experiment_inputs)
self.n_experiment_outputs = len(model.base_model.experiment_outputs)
if self.n_measurement_error != self.n_experiment_outputs:
raise ValueError(
"Number of experiment outputs, {}, and length of measurement error, "
"{}, do not match. Please check model labeling.".format(
self.n_experiment_outputs, self.n_measurement_error
)
)
self.logger.info("Experiment output and measurement error lengths match.")
# Check that the user input FIM and Jacobian are the correct dimension
if self.prior_FIM is not None:
self.check_model_FIM(FIM=self.prior_FIM)
else:
self.prior_FIM = np.zeros((self.n_parameters, self.n_parameters))
if self.fim_initial is not None:
self.check_model_FIM(FIM=self.fim_initial)
else:
self.fim_initial = np.eye(self.n_parameters) + self.prior_FIM
if self.jac_initial is not None:
self.check_model_jac(self.jac_initial)
else:
self.jac_initial = np.eye(self.n_experiment_outputs, self.n_parameters)
# Make a new Suffix to hold which scenarios
# are associated with parameters
model.parameter_scenarios = pyo.Suffix(direction=pyo.Suffix.LOCAL)
# Populate parameter scenarios, and scenario
# inds based on finite difference scheme
if self.fd_formula == FiniteDifferenceStep.central:
model.parameter_scenarios.update(
(2 * ind, k)
for ind, k in enumerate(model.base_model.unknown_parameters.keys())
)
model.parameter_scenarios.update(
(2 * ind + 1, k)
for ind, k in enumerate(model.base_model.unknown_parameters.keys())
)
model.scenarios = range(len(model.base_model.unknown_parameters) * 2)
elif self.fd_formula in [
FiniteDifferenceStep.forward,
FiniteDifferenceStep.backward,
]:
model.parameter_scenarios.update(
(ind + 1, k)
for ind, k in enumerate(model.base_model.unknown_parameters.keys())
)
model.scenarios = range(len(model.base_model.unknown_parameters) + 1)
else:
raise DeveloperError(
"Finite difference option not recognized. Please contact "
"the developers as you should not see this error."
)
# Run base model to get initialized model and check model function
for comp in model.base_model.experiment_inputs:
comp.fix()
try:
res = self.solver.solve(model.base_model, tee=self.tee)
assert res.solver.termination_condition == "optimal"
self.logger.info("Model from experiment solved.")
except:
raise RuntimeError(
"Model from experiment did not solve appropriately. "
"Make sure the model is well-posed."
)
for comp in model.base_model.experiment_inputs:
comp.unfix()
# Generate blocks for finite difference scenarios
def build_block_scenarios(b, s):
# Generate model for the finite difference scenario
m = b.model()
b.transfer_attributes_from(m.base_model.clone())
# Forward/Backward difference have a stationary
# case (s == 0), no parameter to perturb
if self.fd_formula in [
FiniteDifferenceStep.forward,
FiniteDifferenceStep.backward,
]:
if s == 0:
return
param = m.parameter_scenarios[s]
# Perturbation to be (1 + diff) * param_value
if self.fd_formula == FiniteDifferenceStep.central:
diff = self.step * (
(-1) ** s
) # Positive perturbation, even; negative, odd
elif self.fd_formula == FiniteDifferenceStep.backward:
diff = self.step * -1 # Backward always negative perturbation
elif self.fd_formula == FiniteDifferenceStep.forward:
diff = self.step # Forward always positive
else:
# TODO: add an error message for this as not being implemented yet
diff = 0
pass
# Update parameter values for the given finite difference scenario
pyo.ComponentUID(param, context=m.base_model).find_component_on(
b
).set_value(m.base_model.unknown_parameters[param] * (1 + diff))
# Fix experiment inputs before solve (enforce square solve)
for comp in b.experiment_inputs:
comp.fix()
res = self.solver.solve(b, tee=self.tee)
# Unfix experiment inputs after square solve
for comp in b.experiment_inputs:
comp.unfix()
model.scenario_blocks = pyo.Block(model.scenarios, rule=build_block_scenarios)
# TODO: this might have to change if experiment inputs have
# a different value in the Suffix (currently it is the CUID)
design_vars = [k for k, v in model.scenario_blocks[0].experiment_inputs.items()]
# Add constraints to equate block design with global design:
for ind, d in enumerate(design_vars):
con_name = "global_design_eq_con_" + str(ind)
# Constraint rule for global design constraints
def global_design_fixing(m, s):
if s == 0:
return pyo.Constraint.Skip
block_design_var = pyo.ComponentUID(
d, context=m.scenario_blocks[0]
).find_component_on(m.scenario_blocks[s])
return d == block_design_var
model.add_component(
con_name, pyo.Constraint(model.scenarios, rule=global_design_fixing)
)
# Clean up the base model used to generate the scenarios
model.del_component(model.base_model)
# TODO: consider this logic? Multi-block systems need something more fancy
self._built_scenarios = True
# Create objective function
[docs]
def create_objective_function(self, model=None):
"""
Generates the objective function as an expression and as a
Pyomo Objective object
The function alters the ``model`` input.
In the single experiment case, ``model`` will be self.model. In the
multi-experiment case, ``model`` will be one experiment to be enumerated.
Parameters
----------
model: model to add finite difference scenarios
"""
if model is None:
model = self.model
if self.objective_option not in [
ObjectiveLib.determinant,
ObjectiveLib.trace,
ObjectiveLib.pseudo_trace,
ObjectiveLib.zero,
]:
raise DeveloperError(
"Objective option not recognized. Please contact the "
"developers as you should not see this error."
)
if not hasattr(model, "fim"):
raise RuntimeError(
"Model provided does not have variable `fim`. Please make "
"sure the model is built properly before creating the objective."
)
small_number = 1e-10
# Make objective block for constraints connected to objective
model.obj_cons = pyo.Block()
# Assemble the FIM matrix. This is helpful for initialization!
fim_vals = [
model.fim[bu, un].value
for i, bu in enumerate(model.parameter_names)
for j, un in enumerate(model.parameter_names)
]
fim = np.array(fim_vals).reshape(
len(model.parameter_names), len(model.parameter_names)
)
### Initialize the Cholesky decomposition matrix
if self.Cholesky_option and self.objective_option in (
ObjectiveLib.determinant,
ObjectiveLib.trace,
):
# Calculate the eigenvalues of the FIM matrix
eig = np.linalg.eigvals(fim)
# If the smallest eigenvalue is (practically) negative,
# add a diagonal matrix to make it positive definite
if min(eig) < small_number:
fim = fim + np.eye(len(model.parameter_names)) * (
small_number - min(eig)
)
# Compute the Cholesky decomposition of the FIM matrix
L = np.linalg.cholesky(fim)
# Initialize the Cholesky matrix
for i, c in enumerate(model.parameter_names):
for j, d in enumerate(model.parameter_names):
model.L[c, d].value = L[i, j]
# Compute L inverse for trace objective and initialize
if self.objective_option == ObjectiveLib.trace:
L_inv = np.linalg.inv(L)
for i, c in enumerate(model.parameter_names):
for j, d in enumerate(model.parameter_names):
model.L_inv[c, d].value = L_inv[i, j]
def cholesky_imp(b, c, d):
"""
Calculate Cholesky L matrix using algebraic constraints
"""
# If the row is greater than or equal to the column, we are in the
# lower triangle region of the L and FIM matrices.
# This region is where our equations are well-defined.
m = b.model()
if list(m.parameter_names).index(c) >= list(m.parameter_names).index(d):
return m.fim[c, d] == sum(
m.L[c, m.parameter_names.at(k + 1)]
* m.L[d, m.parameter_names.at(k + 1)]
for k in range(list(m.parameter_names).index(d) + 1)
)
else:
# This is the empty half of L above the diagonal
return pyo.Constraint.Skip
# If trace objective, need L inverse constraints
if self.Cholesky_option and self.objective_option == ObjectiveLib.trace:
def cholesky_inv_imp(b, c, d):
"""
Calculate Cholesky L inverse matrix using algebraic constraints
"""
# If the row is greater than or equal to the column, we are in the
# lower triangle region of the L_inv matrix.
# This region is where our equations are well-defined.
m = b.model()
if list(m.parameter_names).index(c) >= list(m.parameter_names).index(d):
return m.fim_inv[c, d] == sum(
m.L_inv[m.parameter_names.at(k + 1), c]
* m.L_inv[m.parameter_names.at(k + 1), d]
for k in range(
list(m.parameter_names).index(c), len(m.parameter_names)
)
)
else:
# This is the empty half of L_inv above the diagonal
return pyo.Constraint.Skip
# If trace objective, need L * L^-1 = Identity matrix constraints
def cholesky_LLinv_imp(b, c, d):
"""
Calculate Cholesky L * L inverse matrix using algebraic constraints
"""
# If the row is greater than or equal to the column, we are in the
# lower triangle region of the L and L_inv matrices.
# This region is where our equations are well-defined.
m = b.model()
param_list = list(m.parameter_names)
idx_c = param_list.index(c)
idx_d = param_list.index(d)
# Do not need to calculate upper triangle entries
if idx_c < idx_d:
return pyo.Constraint.Skip
target_value = 1 if idx_c == idx_d else 0
return (
sum(
m.L[c, m.parameter_names.at(k + 1)]
* m.L_inv[m.parameter_names.at(k + 1), d]
for k in range(len(m.parameter_names))
)
== target_value
)
# To improve round off error in Cholesky decomposition
if self.improve_cholesky_roundoff_error:
def cholesky_fim_diag(b, c, d):
"""
M[c,c] >= L[c,d]^2 to improve round off error
"""
m = b.model()
return m.fim[c, c] >= m.L[c, d] ** 2
def cholesky_fim_inv_diag(b, c, d):
"""
M_inv[c,c] >= L_inv[c,d]^2 to improve round off error
"""
m = b.model()
return m.fim_inv[c, c] >= m.L_inv[c, d] ** 2
def cov_trace_calc(b):
"""
Calculate trace of covariance matrix (inverse of FIM).
Can scale each element with 1000 for performance
"""
m = b.model()
return m.cov_trace == sum(m.fim_inv[j, j] for j in m.parameter_names)
def trace_calc(b):
"""
Calculate FIM elements. Can scale each element with 1000 for performance
"""
m = b.model()
return m.fim_trace == sum(m.fim[j, j] for j in m.parameter_names)
def determinant_general(b):
r"""Calculate determinant. Can be applied to FIM of any size.
det(A) = \sum_{\sigma in \S_n} (sgn(\sigma) * \Prod_{i=1}^n a_{i,\sigma_i})
Use permutation() to get permutations, sgn() to get signature
"""
m = b.model()
r_list = list(range(len(m.parameter_names)))
# get all permutations
object_p = permutations(r_list)
list_p = list(object_p)
# generate a name_order to iterate \sigma_i
det_perm = 0
for i in range(len(list_p)):
name_order = []
x_order = list_p[i]
# sigma_i is the value in the i-th
# position after the reordering \sigma
for x in range(len(x_order)):
for y, element in enumerate(m.parameter_names):
if x_order[x] == y:
name_order.append(element)
# det(A) = sum_{\sigma \in \S_n} (sgn(\sigma) *
# \Prod_{i=1}^n a_{i,\sigma_i})
det_perm = sum(
self._sgn(list_p[d])
* math.prod(
m.fim[m.parameter_names.at(val + 1), m.parameter_names.at(ind + 1)]
for ind, val in enumerate(list_p[d])
)
for d in range(len(list_p))
)
return m.determinant == det_perm
if self.Cholesky_option and self.objective_option == ObjectiveLib.determinant:
model.obj_cons.cholesky_cons = pyo.Constraint(
model.parameter_names, model.parameter_names, rule=cholesky_imp
)
model.objective = pyo.Objective(
expr=2 * sum(pyo.log10(model.L[j, j]) for j in model.parameter_names),
sense=pyo.maximize,
)
elif self.objective_option == ObjectiveLib.determinant:
# if not Cholesky but determinant, calculating
# det and evaluate the OBJ with det
model.determinant = pyo.Var(
initialize=np.linalg.det(fim), bounds=(small_number, None)
)
model.obj_cons.determinant_rule = pyo.Constraint(rule=determinant_general)
model.objective = pyo.Objective(
expr=pyo.log10(model.determinant + 1e-6), sense=pyo.maximize
)
elif self.objective_option == ObjectiveLib.trace:
if not self.Cholesky_option:
raise ValueError(
"objective_option='trace' currently only implemented with ``_Cholesky option=True``."
)
# if Cholesky and trace, calculating
# the OBJ with trace
model.cov_trace = pyo.Var(
initialize=np.trace(np.linalg.inv(fim)), bounds=(small_number, None)
)
model.obj_cons.cholesky_cons = pyo.Constraint(
model.parameter_names, model.parameter_names, rule=cholesky_imp
)
model.obj_cons.cholesky_inv_cons = pyo.Constraint(
model.parameter_names, model.parameter_names, rule=cholesky_inv_imp
)
model.obj_cons.cholesky_LLinv_cons = pyo.Constraint(
model.parameter_names, model.parameter_names, rule=cholesky_LLinv_imp
)
if self.improve_cholesky_roundoff_error:
model.obj_cons.cholesky_fim_diag_cons = pyo.Constraint(
model.parameter_names, model.parameter_names, rule=cholesky_fim_diag
)
model.obj_cons.cholesky_fim_inv_diag_cons = pyo.Constraint(
model.parameter_names,
model.parameter_names,
rule=cholesky_fim_inv_diag,
)
model.obj_cons.cov_trace_rule = pyo.Constraint(rule=cov_trace_calc)
model.objective = pyo.Objective(expr=model.cov_trace, sense=pyo.minimize)
elif self.objective_option == ObjectiveLib.pseudo_trace:
# if not determinant or Cholesky, calculating
# the OBJ with trace
model.fim_trace = pyo.Var(
initialize=np.trace(fim), bounds=(small_number, None)
)
model.obj_cons.trace_rule = pyo.Constraint(rule=trace_calc)
model.objective = pyo.Objective(
expr=pyo.log10(model.fim_trace), sense=pyo.maximize
)
# TODO: Add warning (should be unreachable) if the user calls
# the grey box objectives with the standard model
elif self.objective_option == ObjectiveLib.zero:
# add dummy objective function
model.objective = pyo.Objective(expr=0)
def create_grey_box_objective_function(self, model=None):
# Add external grey box block to a block named ``obj_cons`` to
# reuse material for initializing the objective-free square model
if model is None:
model = model = self.model
# TODO: Make this naming convention robust
model.obj_cons = pyo.Block()
# Create FIM External Grey Box object
grey_box_FIM = FIMExternalGreyBox(
doe_object=self,
objective_option=self.objective_option,
logger_level=self.logger.getEffectiveLevel(),
)
# Attach External Grey Box Model
# to the model as an External
# Grey Box Block
model.obj_cons.egb_fim_block = ExternalGreyBoxBlock(external_model=grey_box_FIM)
# Adding constraints to for all grey box input values to equate to fim values
def FIM_egb_cons(m, p1, p2):
"""
m: Pyomo model
p1: parameter 1
p2: parameter 2
"""
# Using upper triangular FIM to
# make numerics better.
if list(model.parameter_names).index(p1) >= list(
model.parameter_names
).index(p2):
return model.fim[(p1, p2)] == m.egb_fim_block.inputs[(p2, p1)]
else:
return pyo.Constraint.Skip
# Add the FIM and External Grey
# Box inputs constraints
model.obj_cons.FIM_equalities = pyo.Constraint(
model.parameter_names, model.parameter_names, rule=FIM_egb_cons
)
# Add objective based on user provided
# type within ObjectiveLib
if self.objective_option == ObjectiveLib.trace:
model.objective = pyo.Objective(
expr=model.obj_cons.egb_fim_block.outputs["A-opt"], sense=pyo.minimize
)
elif self.objective_option == ObjectiveLib.determinant:
model.objective = pyo.Objective(
expr=model.obj_cons.egb_fim_block.outputs["log-D-opt"],
sense=pyo.maximize,
)
elif self.objective_option == ObjectiveLib.minimum_eigenvalue:
model.objective = pyo.Objective(
expr=model.obj_cons.egb_fim_block.outputs["E-opt"], sense=pyo.maximize
)
elif self.objective_option == ObjectiveLib.condition_number:
model.objective = pyo.Objective(
expr=model.obj_cons.egb_fim_block.outputs["ME-opt"], sense=pyo.minimize
)
# Else error not needed for spurious objective
# options as the error will always appear
# when creating the FIMExternalGreyBox block
# Check to see if the model has all the required suffixes
[docs]
def check_model_labels(self, model=None):
"""
Checks if the model contains the necessary suffixes for the
DoE model to be constructed automatically.
Parameters
----------
model: model for suffix checking
"""
# Check that experimental outputs exist
try:
outputs = [k.name for k, v in model.experiment_outputs.items()]
except:
raise RuntimeError(
"Experiment model does not have suffix " + '"experiment_outputs".'
)
# Check that experimental inputs exist
try:
outputs = [k.name for k, v in model.experiment_inputs.items()]
except:
raise RuntimeError(
"Experiment model does not have suffix " + '"experiment_inputs".'
)
# Check that unknown parameters exist
try:
outputs = [k.name for k, v in model.unknown_parameters.items()]
except:
raise RuntimeError(
"Experiment model does not have suffix " + '"unknown_parameters".'
)
# Check that measurement errors exist
try:
outputs = [k.name for k, v in model.measurement_error.items()]
except:
raise RuntimeError(
"Experiment model does not have suffix " + '"measurement_error".'
)
self.logger.info("Model has expected labels.")
# Check the FIM shape against what is expected from the model.
[docs]
def check_model_FIM(self, model=None, FIM=None):
"""
Checks if the specified matrix, FIM, matches the shape expected
from the model. This method should only be called after the
model has been probed for the length of the unknown parameter,
experiment input, experiment output, and measurement error
has been stored to the object.
Parameters
----------
model: model for suffix checking, Default: None, (self.model)
FIM: FIM value to check on the model
"""
if model is None:
model = self.model
if FIM.shape != (self.n_parameters, self.n_parameters):
raise ValueError(
"Shape of FIM provided should be n parameters by n parameters, "
"or {} by {}, FIM provided has shape {} by {}".format(
self.n_parameters, self.n_parameters, FIM.shape[0], FIM.shape[1]
)
)
check_FIM(FIM)
self.logger.info(
"FIM provided matches expected dimensions from model "
"and is approximately positive (semi) definite."
)
# Check the jacobian shape against what is expected from the model.
def check_model_jac(self, jac=None):
if jac.shape != (self.n_experiment_outputs, self.n_parameters):
raise ValueError(
"Shape of Jacobian provided should be n experiment outputs "
"by n parameters, or {} by {}, Jacobian provided has "
"shape {} by {}".format(
self.n_experiment_outputs,
self.n_parameters,
jac.shape[0],
jac.shape[1],
)
)
self.logger.info("Jacobian provided matches expected dimensions from model.")
# Update the FIM for the specified model
[docs]
def update_FIM_prior(self, model=None, FIM=None):
"""
Updates the prior FIM on the model object. This may be useful when
running a loop and the user doesn't want to rebuild the model
because it is expensive to build/initialize.
Parameters
----------
model: model where FIM prior is to be updated, Default: None, (self.model)
FIM: 2D np array to be the new FIM prior, Default: None
"""
if model is None:
model = self.model
# Check FIM input
if FIM is None:
raise ValueError(
"FIM input for update_FIM_prior must be a 2D, square numpy array."
)
if not hasattr(model, "fim"):
raise RuntimeError(
"``fim`` is not defined on the model provided. "
"Please build the model first."
)
self.check_model_FIM(model=model, FIM=FIM)
# Update FIM prior
for ind1, p1 in enumerate(model.parameter_names):
for ind2, p2 in enumerate(model.parameter_names):
model.prior_FIM[p1, p2].set_value(FIM[ind1, ind2])
self.logger.info("FIM prior has been updated.")
# TODO: Add an update function for the parameter values?
# Closed loop parameter estimation?
def update_unknown_parameter_values(self, model=None, param_vals=None):
raise NotImplementedError(
"Updating unknown parameter values not yet supported."
)
# Evaluates FIM and statistics for a
# full factorial space (same as run_grid_search)
[docs]
def compute_FIM_full_factorial(
self, model=None, design_ranges=None, method="sequential"
):
"""
Will run a simulation-based full factorial exploration of
the experimental input space (i.e., a ``grid search`` or
``parameter sweep``) to understand how the FIM metrics
change as a function of the experimental design space.
Parameters
----------
model: DoE model, optional
model to perform the full factorial exploration on
design_ranges: dict
dictionary of lists, of the form {<var_name>: [start, stop, numsteps]}
method: str, optional
to specify which method should be used.
Options are ``kaug`` and ``sequential``
Returns
-------
fim_factorial_results: dict
A dictionary of the results with the following keys and their corresponding
values as a list:
- keys of model's experiment_inputs
- "log10 D-opt": list of log10(D-optimality)
- "log10 A-opt": list of log10(A-optimality)
- "log10 pseudo A-opt": list of log10(trace(FIM))
- "log10 E-opt": list of log10(E-optimality)
- "log10 ME-opt": list of log10(ME-optimality)
- "eigval_min": list of minimum eigenvalues
- "eigval_max": list of maximum eigenvalues
- "det_FIM": list of determinants
- "trace_cov": list of traces of covariance matrix
- "trace_FIM": list of traces of FIM
- "solve_time": list of solve times
Raises
------
ValueError
If the design_ranges' keys do not match the model's experiment_inputs' keys.
"""
# Start timer
sp_timer = TicTocTimer()
sp_timer.tic(msg=None)
self.logger.info("Beginning Full Factorial Design.")
# Make new model for factorial design
self.factorial_model = self.experiment.get_labeled_model(
**self.get_labeled_model_args
).clone()
model = self.factorial_model
# Permute the inputs to be aligned with the experiment input indices
design_ranges_enum = {k: np.linspace(*v) for k, v in design_ranges.items()}
design_map = {
ind: (k[0].name, k[0])
for ind, k in enumerate(model.experiment_inputs.items())
}
# Make the full space
try:
valid_inputs = 0
des_ranges = []
for k, v in design_map.items():
if v[0] in design_ranges_enum.keys():
des_ranges.append(design_ranges_enum[v[0]])
valid_inputs += 1
assert valid_inputs > 0
factorial_points = product(*des_ranges)
except:
raise ValueError(
"Design ranges keys must be a subset of experimental design names."
)
# TODO: Add more objective types? i.e., modified-E; G-opt; V-opt; etc?
# TODO: Also, make this a result object, or more user friendly.
fim_factorial_results = {k.name: [] for k, v in model.experiment_inputs.items()}
fim_factorial_results.update(
{
"log10 D-opt": [],
"log10 A-opt": [],
"log10 pseudo A-opt": [],
"log10 E-opt": [],
"log10 ME-opt": [],
"eigval_min": [],
"eigval_max": [],
"det_FIM": [],
"trace_cov": [],
"trace_FIM": [],
"solve_time": [],
}
)
successes = 0
failures = 0
total_points = np.prod(
np.array([len(v) for k, v in design_ranges_enum.items()])
)
time_set = []
curr_point = 1 # Initial current point
for design_point in factorial_points:
# Fix design variables at fixed experimental design point
for i in range(len(design_point)):
design_map[i][1].fix(design_point[i])
# Timing and logging objects
self.logger.info("=======Iteration Number: %s =====", curr_point)
iter_timer = TicTocTimer()
iter_timer.tic(msg=None)
# Compute FIM with given options
try:
curr_point = successes + failures + 1
# Logging information for each run
self.logger.info("This is run %s out of %s.", curr_point, total_points)
# Attempt the FIM computation
self.compute_FIM(model=model, method=method)
successes += 1
# iteration time
iter_t = iter_timer.toc(msg=None)
time_set.append(iter_t)
# More logging
self.logger.info(
"The code has run for %s seconds.", round(sum(time_set), 2)
)
self.logger.info(
"Estimated remaining time: %s seconds",
round(
sum(time_set) / (curr_point) * (total_points - curr_point + 1),
2,
),
)
except:
self.logger.warning(
":::::::::::Warning: Cannot converge this run.::::::::::::"
)
failures += 1
self.logger.warning("failed count:", failures)
self._computed_FIM = np.zeros(self.prior_FIM.shape)
iter_t = iter_timer.toc(msg=None)
time_set.append(iter_t)
FIM = self._computed_FIM
(
det_FIM,
trace_cov,
trace_FIM,
E_vals,
E_vecs,
D_opt,
A_opt,
pseudo_A_opt,
E_opt,
ME_opt,
) = compute_FIM_metrics(FIM)
# Append the values for each of the experiment inputs
for k, v in model.experiment_inputs.items():
fim_factorial_results[k.name].append(pyo.value(k))
fim_factorial_results["log10 D-opt"].append(D_opt)
fim_factorial_results["log10 A-opt"].append(A_opt)
fim_factorial_results["log10 pseudo A-opt"].append(pseudo_A_opt)
fim_factorial_results["log10 E-opt"].append(E_opt)
fim_factorial_results["log10 ME-opt"].append(ME_opt)
fim_factorial_results["eigval_min"].append(E_vals.min())
fim_factorial_results["eigval_max"].append(E_vals.max())
fim_factorial_results["det_FIM"].append(det_FIM)
fim_factorial_results["trace_cov"].append(trace_cov)
fim_factorial_results["trace_FIM"].append(trace_FIM)
fim_factorial_results["solve_time"].append(time_set[-1])
self.fim_factorial_results = fim_factorial_results
return self.fim_factorial_results
# TODO: Overhaul plotting functions to not use strings
# TODO: Make the plotting functionalities work for >2 design features
def _curve1D(
self,
title_text,
xlabel_text,
font_axes=16,
font_tick=14,
figure_file_name=None,
log_scale=True,
):
"""
Draw 1D sensitivity curves for all design criteria
Parameters
----------
title_text: name of the figure, a string
xlabel_text: x label title, a string.
In a 1D sensitivity curve, it is the design
variable by which the curve is drawn.
font_axes: axes label font size
font_tick: tick label font size
figure_file_name: string or Path, path to save the figure as
log_scale: if True, the result matrix will be scaled by log10
Returns
--------
5 Figures of 1D sensitivity curves for each criterion
"""
if figure_file_name is not None:
show_fig = False
else:
show_fig = True
# extract the range of the DOF design variable
x_range = self.figure_result_data[self.figure_sens_des_vars[0]].values.tolist()
# decide if the results are log scaled
if log_scale:
y_range_A = np.log10(self.figure_result_data["log10 A-opt"].values.tolist())
y_range_pseudo_A = np.log10(
self.figure_result_data["log10 pseudo A-opt"].values.tolist()
)
y_range_D = np.log10(self.figure_result_data["log10 D-opt"].values.tolist())
y_range_E = np.log10(self.figure_result_data["log10 E-opt"].values.tolist())
y_range_ME = np.log10(
self.figure_result_data["log10 ME-opt"].values.tolist()
)
else:
y_range_A = self.figure_result_data["log10 A-opt"].values.tolist()
y_range_pseudo_A = self.figure_result_data[
"log10 pseudo A-opt"
].values.tolist()
y_range_D = self.figure_result_data["log10 D-opt"].values.tolist()
y_range_E = self.figure_result_data["log10 E-opt"].values.tolist()
y_range_ME = self.figure_result_data["log10 ME-opt"].values.tolist()
# Draw A-optimality
fig = plt.pyplot.figure()
plt.pyplot.rc("axes", titlesize=font_axes)
plt.pyplot.rc("axes", labelsize=font_axes)
plt.pyplot.rc("xtick", labelsize=font_tick)
plt.pyplot.rc("ytick", labelsize=font_tick)
ax = fig.add_subplot(111)
params = {"mathtext.default": "regular"}
# plt.rcParams.update(params)
ax.plot(x_range, y_range_A)
ax.scatter(x_range, y_range_A)
ax.set_ylabel("$log_{10}$(Trace of Covariance)")
ax.set_xlabel(xlabel_text)
plt.pyplot.title(title_text + ": A-optimality")
if show_fig:
plt.pyplot.show()
else:
plt.pyplot.savefig(
pathlib.Path(figure_file_name + "_A_opt.png"), format="png", dpi=450
)
# Draw pseudo A-optimality
fig = plt.pyplot.figure()
plt.pyplot.rc("axes", titlesize=font_axes)
plt.pyplot.rc("axes", labelsize=font_axes)
plt.pyplot.rc("xtick", labelsize=font_tick)
plt.pyplot.rc("ytick", labelsize=font_tick)
ax = fig.add_subplot(111)
params = {"mathtext.default": "regular"}
# plt.rcParams.update(params)
ax.plot(x_range, y_range_pseudo_A)
ax.scatter(x_range, y_range_pseudo_A)
ax.set_ylabel("$log_{10}$(Trace of FIM)")
ax.set_xlabel(xlabel_text)
plt.pyplot.title(title_text + ": pseudo A-optimality")
if show_fig:
plt.pyplot.show()
else:
plt.pyplot.savefig(
pathlib.Path(figure_file_name + "_pseudo_A_opt.png"),
format="png",
dpi=450,
)
# Draw D-optimality
fig = plt.pyplot.figure()
plt.pyplot.rc("axes", titlesize=font_axes)
plt.pyplot.rc("axes", labelsize=font_axes)
plt.pyplot.rc("xtick", labelsize=font_tick)
plt.pyplot.rc("ytick", labelsize=font_tick)
ax = fig.add_subplot(111)
params = {"mathtext.default": "regular"}
# plt.rcParams.update(params)
ax.plot(x_range, y_range_D)
ax.scatter(x_range, y_range_D)
ax.set_ylabel("$log_{10}$(Determinant)")
ax.set_xlabel(xlabel_text)
plt.pyplot.title(title_text + ": D-optimality")
if show_fig:
plt.pyplot.show()
else:
plt.pyplot.savefig(
pathlib.Path(figure_file_name + "_D_opt.png"), format="png", dpi=450
)
# Draw E-optimality
fig = plt.pyplot.figure()
plt.pyplot.rc("axes", titlesize=font_axes)
plt.pyplot.rc("axes", labelsize=font_axes)
plt.pyplot.rc("xtick", labelsize=font_tick)
plt.pyplot.rc("ytick", labelsize=font_tick)
ax = fig.add_subplot(111)
params = {"mathtext.default": "regular"}
# plt.rcParams.update(params)
ax.plot(x_range, y_range_E)
ax.scatter(x_range, y_range_E)
ax.set_ylabel("$log_{10}$ (Minimum eigenvalue)")
ax.set_xlabel(xlabel_text)
plt.pyplot.title(title_text + ": E-optimality")
if show_fig:
plt.pyplot.show()
else:
plt.pyplot.savefig(
pathlib.Path(figure_file_name + "_E_opt.png"), format="png", dpi=450
)
# Draw Modified E-optimality
fig = plt.pyplot.figure()
plt.pyplot.rc("axes", titlesize=font_axes)
plt.pyplot.rc("axes", labelsize=font_axes)
plt.pyplot.rc("xtick", labelsize=font_tick)
plt.pyplot.rc("ytick", labelsize=font_tick)
ax = fig.add_subplot(111)
params = {"mathtext.default": "regular"}
# plt.rcParams.update(params)
ax.plot(x_range, y_range_ME)
ax.scatter(x_range, y_range_ME)
ax.set_ylabel("$log_{10}$(Condition number)")
ax.set_xlabel(xlabel_text)
plt.pyplot.title(title_text + ": Modified E-optimality")
if show_fig:
plt.pyplot.show()
else:
plt.pyplot.savefig(
pathlib.Path(figure_file_name + "_ME_opt.png"), format="png", dpi=450
)
def _heatmap(
self,
title_text,
xlabel_text,
ylabel_text,
font_axes=16,
font_tick=14,
figure_file_name=None,
log_scale=True,
):
"""
Draw 2D heatmaps for all design criteria
Parameters
----------
title_text: name of the figure, a string
xlabel_text: x label title, a string.
In a 2D heatmap, it should be the second
design variable in the design_ranges
ylabel_text: y label title, a string.
In a 2D heatmap, it should be the first
design variable in the dv_ranges
font_axes: axes label font size
font_tick: tick label font size
figure_file_name: string or Path, path to save the figure as
log_scale: if True, the result matrix will be scaled by log10
Returns
--------
4 Figures of 2D heatmap for each criterion
"""
if figure_file_name is not None:
show_fig = False
else:
show_fig = True
des_names = [k for k, v in self.figure_fixed_des_vars.items()]
sens_ranges = {}
for i in self.figure_sens_des_vars:
sens_ranges[i] = list(self.figure_result_data[i].unique())
x_range = sens_ranges[self.figure_sens_des_vars[0]]
y_range = sens_ranges[self.figure_sens_des_vars[1]]
# extract the design criteria values
A_range = self.figure_result_data["log10 A-opt"].values.tolist()
pseudo_A_range = self.figure_result_data["log10 pseudo A-opt"].values.tolist()
D_range = self.figure_result_data["log10 D-opt"].values.tolist()
E_range = self.figure_result_data["log10 E-opt"].values.tolist()
ME_range = self.figure_result_data["log10 ME-opt"].values.tolist()
# reshape the design criteria values for heatmaps
cri_a = np.asarray(A_range).reshape(len(x_range), len(y_range))
cri_pseudo_a = np.asarray(pseudo_A_range).reshape(len(x_range), len(y_range))
cri_d = np.asarray(D_range).reshape(len(x_range), len(y_range))
cri_e = np.asarray(E_range).reshape(len(x_range), len(y_range))
cri_e_cond = np.asarray(ME_range).reshape(len(x_range), len(y_range))
self.cri_a = cri_a
self.cri_pseudo_a = cri_pseudo_a
self.cri_d = cri_d
self.cri_e = cri_e
self.cri_e_cond = cri_e_cond
# decide if log scaled
if log_scale:
hes_a = np.log10(self.cri_a)
hes_pseudo_a = np.log10(self.cri_pseudo_a)
hes_e = np.log10(self.cri_e)
hes_d = np.log10(self.cri_d)
hes_e2 = np.log10(self.cri_e_cond)
else:
hes_a = self.cri_a
hes_pseudo_a = self.cri_pseudo_a
hes_e = self.cri_e
hes_d = self.cri_d
hes_e2 = self.cri_e_cond
# set heatmap x,y ranges
xLabel = x_range
yLabel = y_range
# A-optimality
fig = plt.pyplot.figure()
plt.pyplot.rc("axes", titlesize=font_axes)
plt.pyplot.rc("axes", labelsize=font_axes)
plt.pyplot.rc("xtick", labelsize=font_tick)
plt.pyplot.rc("ytick", labelsize=font_tick)
ax = fig.add_subplot(111)
params = {"mathtext.default": "regular"}
plt.pyplot.rcParams.update(params)
ax.set_yticks(range(len(yLabel)))
ax.set_yticklabels(yLabel)
ax.set_ylabel(ylabel_text)
ax.set_xticks(range(len(xLabel)))
ax.set_xticklabels(xLabel)
ax.set_xlabel(xlabel_text)
im = ax.imshow(hes_a.T, cmap=plt.pyplot.cm.hot_r)
ba = plt.pyplot.colorbar(im)
ba.set_label("log10(trace(cov))")
plt.pyplot.title(title_text + ": A-optimality")
if show_fig:
plt.pyplot.show()
else:
plt.pyplot.savefig(
pathlib.Path(figure_file_name + "_A_opt.png"), format="png", dpi=450
)
# pseudo A-optimality
fig = plt.pyplot.figure()
plt.pyplot.rc("axes", titlesize=font_axes)
plt.pyplot.rc("axes", labelsize=font_axes)
plt.pyplot.rc("xtick", labelsize=font_tick)
plt.pyplot.rc("ytick", labelsize=font_tick)
ax = fig.add_subplot(111)
params = {"mathtext.default": "regular"}
plt.pyplot.rcParams.update(params)
ax.set_yticks(range(len(yLabel)))
ax.set_yticklabels(yLabel)
ax.set_ylabel(ylabel_text)
ax.set_xticks(range(len(xLabel)))
ax.set_xticklabels(xLabel)
ax.set_xlabel(xlabel_text)
im = ax.imshow(hes_pseudo_a.T, cmap=plt.pyplot.cm.hot_r)
ba = plt.pyplot.colorbar(im)
ba.set_label("log10(trace(FIM))")
plt.pyplot.title(title_text + ": pseudo A-optimality")
if show_fig:
plt.pyplot.show()
else:
plt.pyplot.savefig(
pathlib.Path(figure_file_name + "_pseudo_A_opt.png"),
format="png",
dpi=450,
)
# D-optimality
fig = plt.pyplot.figure()
plt.pyplot.rc("axes", titlesize=font_axes)
plt.pyplot.rc("axes", labelsize=font_axes)
plt.pyplot.rc("xtick", labelsize=font_tick)
plt.pyplot.rc("ytick", labelsize=font_tick)
ax = fig.add_subplot(111)
params = {"mathtext.default": "regular"}
plt.pyplot.rcParams.update(params)
ax.set_yticks(range(len(yLabel)))
ax.set_yticklabels(yLabel)
ax.set_ylabel(ylabel_text)
ax.set_xticks(range(len(xLabel)))
ax.set_xticklabels(xLabel)
ax.set_xlabel(xlabel_text)
im = ax.imshow(hes_d.T, cmap=plt.pyplot.cm.hot_r)
ba = plt.pyplot.colorbar(im)
ba.set_label("log10(det(FIM))")
plt.pyplot.title(title_text + ": D-optimality")
if show_fig:
plt.pyplot.show()
else:
plt.pyplot.savefig(
pathlib.Path(figure_file_name + "_D_opt.png"), format="png", dpi=450
)
# E-optimality
fig = plt.pyplot.figure()
plt.pyplot.rc("axes", titlesize=font_axes)
plt.pyplot.rc("axes", labelsize=font_axes)
plt.pyplot.rc("xtick", labelsize=font_tick)
plt.pyplot.rc("ytick", labelsize=font_tick)
ax = fig.add_subplot(111)
params = {"mathtext.default": "regular"}
plt.pyplot.rcParams.update(params)
ax.set_yticks(range(len(yLabel)))
ax.set_yticklabels(yLabel)
ax.set_ylabel(ylabel_text)
ax.set_xticks(range(len(xLabel)))
ax.set_xticklabels(xLabel)
ax.set_xlabel(xlabel_text)
im = ax.imshow(hes_e.T, cmap=plt.pyplot.cm.hot_r)
ba = plt.pyplot.colorbar(im)
ba.set_label("log10(minimal eig(FIM))")
plt.pyplot.title(title_text + ": E-optimality")
if show_fig:
plt.pyplot.show()
else:
plt.pyplot.savefig(
pathlib.Path(figure_file_name + "_E_opt.png"), format="png", dpi=450
)
# Modified E-optimality
fig = plt.pyplot.figure()
plt.pyplot.rc("axes", titlesize=font_axes)
plt.pyplot.rc("axes", labelsize=font_axes)
plt.pyplot.rc("xtick", labelsize=font_tick)
plt.pyplot.rc("ytick", labelsize=font_tick)
ax = fig.add_subplot(111)
params = {"mathtext.default": "regular"}
plt.pyplot.rcParams.update(params)
ax.set_yticks(range(len(yLabel)))
ax.set_yticklabels(yLabel)
ax.set_ylabel(ylabel_text)
ax.set_xticks(range(len(xLabel)))
ax.set_xticklabels(xLabel)
ax.set_xlabel(xlabel_text)
im = ax.imshow(hes_e2.T, cmap=plt.pyplot.cm.hot_r)
ba = plt.pyplot.colorbar(im)
ba.set_label("log10(cond(FIM))")
plt.pyplot.title(title_text + ": Modified E-optimality")
if show_fig:
plt.pyplot.show()
else:
plt.pyplot.savefig(
pathlib.Path(figure_file_name + "_ME_opt.png"), format="png", dpi=450
)
# Gets the FIM from an existing model
[docs]
def get_FIM(self, model=None):
"""
Gets the FIM values from the model specified
Parameters
----------
model: model to grab FIM from, Default: None, (self.model)
Returns
-------
FIM: 2D list representation of the FIM (can be cast to numpy)
"""
if model is None:
model = self.model
if not hasattr(model, "fim"):
raise RuntimeError(
"Model provided does not have variable `fim`. Please make sure "
"the model is built properly before calling `get_FIM`"
)
fim_vals = [
pyo.value(model.fim[i, j])
for i in model.parameter_names
for j in model.parameter_names
]
fim_np = np.array(fim_vals).reshape(
(len(model.parameter_names), len(model.parameter_names))
)
# FIM is a lower triangular matrix for the optimal DoE problem.
# Exploit symmetry to fill in the zeros.
for i in range(len(model.parameter_names)):
for j in range(len(model.parameter_names)):
if j < i:
fim_np[j, i] = fim_np[i, j]
return [list(row) for row in list(fim_np)]
# Gets the sensitivity matrix from an existing model
[docs]
def get_sensitivity_matrix(self, model=None):
"""
Gets the sensitivity matrix (Q) values from the model specified.
Parameters
----------
model: model to grab Q from, Default: None, (self.model)
Returns
-------
Q: 2D list representation of the sensitivity matrix (can be cast to numpy)
"""
if model is None:
model = self.model
if not hasattr(model, "sensitivity_jacobian"):
raise RuntimeError(
"Model provided does not have variable `sensitivity_jacobian`. "
"Please make sure the model is built properly before calling "
"`get_sensitivity_matrix`"
)
Q_vals = [
pyo.value(model.sensitivity_jacobian[i, j])
for i in model.output_names
for j in model.parameter_names
]
Q_np = np.array(Q_vals).reshape(
(len(model.output_names), len(model.parameter_names))
)
return [list(row) for row in list(Q_np)]
# Gets the experiment input values from an existing model
# Gets the unknown parameter values from an existing model
[docs]
def get_unknown_parameter_values(self, model=None):
"""
Gets the unknown parameter values (theta)
from the model specified.
Parameters
----------
model: model to grab theta from,
default: None, (self.model)
Returns
-------
theta: 1D list of unknown parameter values at which
this experiment was designed
"""
if model is None:
model = self.model
if not hasattr(model, "unknown_parameters"):
if not hasattr(model, "scenario_blocks"):
raise RuntimeError(
"Model provided does not have expected structure. Please make "
"sure model is built properly before calling "
"`get_unknown_parameter_values`"
)
theta_vals = [
pyo.value(k)
for k, v in model.scenario_blocks[0].unknown_parameters.items()
]
else:
theta_vals = [pyo.value(k) for k, v in model.unknown_parameters.items()]
return theta_vals
# Gets the experiment output values from an existing model
[docs]
def get_experiment_output_values(self, model=None):
"""
Gets the experiment output values (y hat)
from the model specified.
Parameters
----------
model: model to grab y hat from,
default: None, (self.model)
Returns
-------
y_hat: 1D list of experiment output values from the design experiment
"""
if model is None:
model = self.model
if not hasattr(model, "experiment_outputs"):
if not hasattr(model, "scenario_blocks"):
raise RuntimeError(
"Model provided does not have expected structure. Please make "
"sure model is built properly before calling "
"`get_experiment_output_values`"
)
y_hat_vals = [
pyo.value(k)
for k, v in model.scenario_blocks[0].experiment_outputs.items()
]
else:
y_hat_vals = [pyo.value(k) for k, v in model.experiment_outputs.items()]
return y_hat_vals
# TODO: For more complicated error structures, this should become
# get cov_y, or so, and this method will be deprecated
# Gets the measurement error values from an existing model
[docs]
def get_measurement_error_values(self, model=None):
"""
Gets the experiment output values (sigma)
from the model specified.
Parameters
----------
model: model to grab sigma values from,
default: None, (self.model)
Returns
-------
sigma_diag: 1D list of measurement errors used to design the experiment
"""
if model is None:
model = self.model
if not hasattr(model, "measurement_error"):
if not hasattr(model, "scenario_blocks"):
raise RuntimeError(
"Model provided does not have expected structure. Please make "
"sure model is built properly before calling "
"`get_measurement_error_values`"
)
sigma_vals = [
pyo.value(k)
for k, v in model.scenario_blocks[0].measurement_error.items()
]
else:
sigma_vals = [pyo.value(k) for k, v in model.measurement_error.items()]
return sigma_vals
# Helper function for determinant calculation
def _sgn(self, p):
"""
This is a helper function for when constructing the determinant formula
without the Cholesky factorization.
Parameters
-----------
p: the permutation (a list)
Returns
-------
1 if the number of exchange is an even number
-1 if the number is an odd number
"""
if len(p) == 1:
return 1
trans = 0
for i in range(0, len(p)):
j = i + 1
for j in range(j, len(p)):
if p[i] > p[j]:
trans = trans + 1
if (trans % 2) == 0:
return 1
else:
return -1