# ____________________________________________________________________________________
#
# Pyomo: Python Optimization Modeling Objects
# Copyright (c) 2008-2026 National Technology and Engineering Solutions of Sandia, LLC
# Under the terms of Contract DE-NA0003525 with National Technology and Engineering
# Solutions of Sandia, LLC, the U.S. Government retains certain rights in this
# software. This software is distributed under the 3-clause BSD License.
# ____________________________________________________________________________________
import logging
import io
from typing import List, Optional
from pyomo.common.collections import ComponentMap
from pyomo.common.dependencies import attempt_import
from pyomo.common.errors import ApplicationError
from pyomo.common.flags import NOTSET
from pyomo.common.tee import TeeStream, capture_output
from pyomo.core.kernel.objective import minimize, maximize
from pyomo.core.base.var import VarData
from pyomo.core.base.constraint import ConstraintData
from pyomo.core.base.sos import SOSConstraintData
from pyomo.core.base.param import ParamData
from pyomo.core.expr.numvalue import value, is_constant
from pyomo.repn import generate_standard_repn
from pyomo.core.expr.numeric_expr import NPV_MaxExpression, NPV_MinExpression
from pyomo.common.dependencies import numpy as np
from pyomo.core.staleflag import StaleFlagManager
from pyomo.contrib.solver.common.base import PersistentSolverBase, Availability
from pyomo.contrib.solver.common.results import (
Results,
TerminationCondition,
SolutionStatus,
)
from pyomo.contrib.solver.common.config import PersistentBranchAndBoundConfig
from pyomo.contrib.solver.common.persistent import (
PersistentSolverUtils,
PersistentSolverMixin,
)
from pyomo.contrib.solver.common.solution_loader import PersistentSolutionLoader
from pyomo.contrib.solver.common.util import (
NoFeasibleSolutionError,
NoOptimalSolutionError,
NoDualsError,
NoReducedCostsError,
NoSolutionError,
IncompatibleModelError,
)
logger = logging.getLogger(__name__)
highspy, highspy_available = attempt_import('highspy')
class _MutableVarBounds:
def __init__(self, lower_expr, upper_expr, pyomo_var_id, var_map, highs):
self.pyomo_var_id = pyomo_var_id
self.lower_expr = lower_expr
self.upper_expr = upper_expr
self.var_map = var_map
self.highs = highs
def update(self):
col_ndx = self.var_map[self.pyomo_var_id]
lb = value(self.lower_expr)
ub = value(self.upper_expr)
self.highs.changeColBounds(col_ndx, lb, ub)
class _MutableLinearCoefficient:
def __init__(self, pyomo_con, pyomo_var_id, con_map, var_map, expr, highs):
self.expr = expr
self.highs = highs
self.pyomo_var_id = pyomo_var_id
self.pyomo_con = pyomo_con
self.con_map = con_map
self.var_map = var_map
def update(self):
row_ndx = self.con_map[self.pyomo_con]
col_ndx = self.var_map[self.pyomo_var_id]
self.highs.changeCoeff(row_ndx, col_ndx, value(self.expr))
class _MutableObjectiveCoefficient:
def __init__(self, pyomo_var_id, var_map, expr, highs):
self.expr = expr
self.highs = highs
self.pyomo_var_id = pyomo_var_id
self.var_map = var_map
def update(self):
col_ndx = self.var_map[self.pyomo_var_id]
self.highs.changeColCost(col_ndx, value(self.expr))
class _MutableQuadraticCoefficient:
def __init__(self, expr, v1_id, v2_id):
self.expr = expr
self.v1_id = v1_id
self.v2_id = v2_id
class _MutableObjective:
def __init__(
self,
highs,
constant,
linear_coefs,
quadratic_coefs,
pyomo_var_to_solver_var_map,
):
self.highs = highs
self.constant = constant
self.linear_coefs = linear_coefs
self.quadratic_coefs = quadratic_coefs
self._pyomo_var_to_solver_var_map = pyomo_var_to_solver_var_map
# Store the quadratic coefficients in dictionary format
self._initialize_quad_coef_dicts()
# Flag to force first update of quadratic coefficients
self._first_update = True
def _initialize_quad_coef_dicts(self):
self.quad_coef_dict = {}
for coef in self.quadratic_coefs:
self.quad_coef_dict[(coef.v1_id, coef.v2_id)] = value(coef.expr)
self.previous_quad_coef_dict = self.quad_coef_dict.copy()
def update(self):
"""
Update the quadratic objective expression.
"""
needs_quadratic_update = self._first_update
self.constant.update()
for coef in self.linear_coefs:
coef.update()
for coef in self.quadratic_coefs:
current_val = value(coef.expr)
previous_val = self.previous_quad_coef_dict.get((coef.v1_id, coef.v2_id))
if previous_val is not None and current_val != previous_val:
needs_quadratic_update = True
self.quad_coef_dict[(coef.v1_id, coef.v2_id)] = current_val
self.previous_quad_coef_dict[(coef.v1_id, coef.v2_id)] = current_val
# If anything changed, rebuild and pass the Hessian
if needs_quadratic_update:
self._build_and_pass_hessian()
self._first_update = False
def _build_and_pass_hessian(self):
"""Build and pass the Hessian to HiGHS in CSC format"""
if not self.quad_coef_dict:
return
dim = self.highs.getNumCol()
# Build CSC format for the lower triangular part
hessian_value = []
hessian_index = []
hessian_start = [0] * dim
quad_coef_idx_dict = {}
for (v1_id, v2_id), coef in self.quad_coef_dict.items():
v1_ndx = self._pyomo_var_to_solver_var_map[v1_id]
v2_ndx = self._pyomo_var_to_solver_var_map[v2_id]
# Ensure we're storing the lower triangular part
row = max(v1_ndx, v2_ndx)
col = min(v1_ndx, v2_ndx)
# Adjust the diagonal to match Highs' expected format
if v1_ndx == v2_ndx:
coef *= 2.0
quad_coef_idx_dict[(row, col)] = coef
sorted_entries = sorted(
quad_coef_idx_dict.items(), key=lambda x: (x[0][1], x[0][0])
)
last_col = -1
for (row, col), val in sorted_entries:
while col > last_col:
last_col += 1
if last_col < dim:
hessian_start[last_col] = len(hessian_value)
# Add the entry
hessian_index.append(row)
hessian_value.append(val)
while last_col < dim - 1:
last_col += 1
hessian_start[last_col] = len(hessian_value)
nnz = len(hessian_value)
status = self.highs.passHessian(
dim,
nnz,
highspy.HessianFormat.kTriangular,
np.array(hessian_start, dtype=np.int32),
np.array(hessian_index, dtype=np.int32),
np.array(hessian_value, dtype=np.double),
)
if status != highspy.HighsStatus.kOk:
logger.warning(
f"HiGHS returned non-OK status when passing Hessian: {status}"
)
class _MutableObjectiveOffset:
def __init__(self, expr, highs):
self.expr = expr
self.highs = highs
def update(self):
self.highs.changeObjectiveOffset(value(self.expr))
class _MutableConstraintBounds:
def __init__(self, lower_expr, upper_expr, pyomo_con, con_map, highs):
self.lower_expr = lower_expr
self.upper_expr = upper_expr
self.con = pyomo_con
self.con_map = con_map
self.highs = highs
def update(self):
row_ndx = self.con_map[self.con]
lb = value(self.lower_expr)
ub = value(self.upper_expr)
self.highs.changeRowBounds(row_ndx, lb, ub)
[docs]
class Highs(PersistentSolverMixin, PersistentSolverUtils, PersistentSolverBase):
"""
Interface to HiGHS
"""
CONFIG = PersistentBranchAndBoundConfig()
_available = None
[docs]
def __init__(self, **kwds):
treat_fixed_vars_as_params = kwds.pop('treat_fixed_vars_as_params', True)
PersistentSolverBase.__init__(self, **kwds)
PersistentSolverUtils.__init__(
self, treat_fixed_vars_as_params=treat_fixed_vars_as_params
)
self._solver_model = None
self._pyomo_var_to_solver_var_map = {}
self._pyomo_con_to_solver_con_map = {}
self._solver_con_to_pyomo_con_map = {}
self._mutable_helpers = {}
self._mutable_bounds = {}
self._last_results_object: Optional[Results] = None
self._sol = None
[docs]
def available(self):
if highspy_available:
return Availability.FullLicense
return Availability.NotFound
[docs]
def version(self):
try:
version = (
highspy.HIGHS_VERSION_MAJOR,
highspy.HIGHS_VERSION_MINOR,
highspy.HIGHS_VERSION_PATCH,
)
except AttributeError:
# Older versions of Highs do not have the above attributes
# and the solver version can only be obtained by making
# an instance of the solver class.
tmp = highspy.Highs()
version = (tmp.versionMajor(), tmp.versionMinor(), tmp.versionPatch())
return version
def _solve(self):
config = self._active_config
timer = config.timer
options = config.solver_options
ostreams = [io.StringIO()] + config.tee
with capture_output(output=TeeStream(*ostreams), capture_fd=True):
self._solver_model.setOptionValue('log_to_console', True)
if config.threads is not None:
self._solver_model.setOptionValue('threads', config.threads)
if config.time_limit is not None:
self._solver_model.setOptionValue('time_limit', config.time_limit)
if config.rel_gap is not None:
self._solver_model.setOptionValue('mip_rel_gap', config.rel_gap)
if config.abs_gap is not None:
self._solver_model.setOptionValue('mip_abs_gap', config.abs_gap)
for key, option in options.items():
self._solver_model.setOptionValue(key, option)
timer.start('optimize')
if self.version()[:2] >= (1, 8):
self._solver_model.HandleKeyboardInterrupt = True
self._solver_model.run()
timer.stop('optimize')
return self._postsolve(ostreams[0])
def _process_domain_and_bounds(self, var_id):
_v, _lb, _ub, _fixed, _domain_interval, _value = self._vars[var_id]
lb, ub, step = _domain_interval
if lb is None:
lb = -highspy.kHighsInf
if ub is None:
ub = highspy.kHighsInf
if step == 0:
vtype = highspy.HighsVarType.kContinuous
elif step == 1:
vtype = highspy.HighsVarType.kInteger
else:
raise ValueError(
f'Unrecognized domain step: {step} (should be either 0 or 1)'
)
if _fixed:
lb = _value
ub = _value
else:
if _lb is not None or _ub is not None:
if not is_constant(_lb) or not is_constant(_ub):
if _lb is None:
tmp_lb = -highspy.kHighsInf
else:
tmp_lb = _lb
if _ub is None:
tmp_ub = highspy.kHighsInf
else:
tmp_ub = _ub
mutable_bound = _MutableVarBounds(
lower_expr=NPV_MaxExpression((tmp_lb, lb)),
upper_expr=NPV_MinExpression((tmp_ub, ub)),
pyomo_var_id=var_id,
var_map=self._pyomo_var_to_solver_var_map,
highs=self._solver_model,
)
self._mutable_bounds[var_id] = (_v, mutable_bound)
if _lb is not None:
lb = max(value(_lb), lb)
if _ub is not None:
ub = min(value(_ub), ub)
return lb, ub, vtype
def _add_variables(self, variables: List[VarData]):
self._sol = None
if self._last_results_object is not None:
self._last_results_object.solution_loader.invalidate()
lbs = []
ubs = []
indices = []
vtypes = []
current_num_vars = len(self._pyomo_var_to_solver_var_map)
for v in variables:
v_id = id(v)
lb, ub, vtype = self._process_domain_and_bounds(v_id)
lbs.append(lb)
ubs.append(ub)
vtypes.append(vtype)
indices.append(current_num_vars)
self._pyomo_var_to_solver_var_map[v_id] = current_num_vars
current_num_vars += 1
self._solver_model.addVars(
len(lbs), np.array(lbs, dtype=np.double), np.array(ubs, dtype=np.double)
)
self._solver_model.changeColsIntegrality(
len(vtypes), np.array(indices), np.array(vtypes)
)
def _add_parameters(self, params: List[ParamData]):
pass
def _reinit(self):
saved_config = self.config
saved_active_config = self._active_config
self.__init__(treat_fixed_vars_as_params=self._treat_fixed_vars_as_params)
self.config = saved_config
self._active_config = saved_active_config
[docs]
def set_instance(self, model):
config = self._active_config
ostreams = config.tee
if self._last_results_object is not None:
self._last_results_object.solution_loader.invalidate()
if not self.available():
c = self.__class__
raise ApplicationError(
f'Solver {c.__module__}.{c.__qualname__} is not available '
f'({self.available()}).'
)
with capture_output(TeeStream(*ostreams), capture_fd=True):
self._reinit()
self._model = model
self._solver_model = highspy.Highs()
self.add_block(model)
if self._objective is None:
self.set_objective(None)
def _add_constraints(self, cons: List[ConstraintData]):
self._sol = None
if self._last_results_object is not None:
self._last_results_object.solution_loader.invalidate()
current_num_cons = len(self._pyomo_con_to_solver_con_map)
lbs = []
ubs = []
starts = []
var_indices = []
coef_values = []
for con in cons:
repn = generate_standard_repn(
con.body, quadratic=False, compute_values=False
)
if repn.nonlinear_expr is not None:
raise IncompatibleModelError(
f'Highs interface does not support expressions of degree {repn.polynomial_degree()}'
)
starts.append(len(coef_values))
for ndx, coef in enumerate(repn.linear_coefs):
v = repn.linear_vars[ndx]
v_id = id(v)
coef_val = value(coef)
if not is_constant(coef):
mutable_linear_coefficient = _MutableLinearCoefficient(
pyomo_con=con,
pyomo_var_id=v_id,
con_map=self._pyomo_con_to_solver_con_map,
var_map=self._pyomo_var_to_solver_var_map,
expr=coef,
highs=self._solver_model,
)
if con not in self._mutable_helpers:
self._mutable_helpers[con] = []
self._mutable_helpers[con].append(mutable_linear_coefficient)
if coef_val == 0:
continue
var_indices.append(self._pyomo_var_to_solver_var_map[v_id])
coef_values.append(coef_val)
if con.has_lb():
lb = con.lower - repn.constant
else:
lb = -highspy.kHighsInf
if con.has_ub():
ub = con.upper - repn.constant
else:
ub = highspy.kHighsInf
if not is_constant(lb) or not is_constant(ub):
mutable_con_bounds = _MutableConstraintBounds(
lower_expr=lb,
upper_expr=ub,
pyomo_con=con,
con_map=self._pyomo_con_to_solver_con_map,
highs=self._solver_model,
)
if con not in self._mutable_helpers:
self._mutable_helpers[con] = [mutable_con_bounds]
else:
self._mutable_helpers[con].append(mutable_con_bounds)
lbs.append(value(lb))
ubs.append(value(ub))
self._pyomo_con_to_solver_con_map[con] = current_num_cons
self._solver_con_to_pyomo_con_map[current_num_cons] = con
current_num_cons += 1
self._solver_model.addRows(
len(lbs),
np.array(lbs, dtype=np.double),
np.array(ubs, dtype=np.double),
len(coef_values),
np.array(starts),
np.array(var_indices),
np.array(coef_values, dtype=np.double),
)
def _add_sos_constraints(self, cons: List[SOSConstraintData]):
if cons:
raise NotImplementedError(
'Highs interface does not support SOS constraints'
)
def _remove_constraints(self, cons: List[ConstraintData]):
self._sol = None
if self._last_results_object is not None:
self._last_results_object.solution_loader.invalidate()
indices_to_remove = []
for con in cons:
con_ndx = self._pyomo_con_to_solver_con_map.pop(con)
del self._solver_con_to_pyomo_con_map[con_ndx]
indices_to_remove.append(con_ndx)
self._mutable_helpers.pop(con, None)
self._solver_model.deleteRows(
len(indices_to_remove), np.array(list(sorted(indices_to_remove)))
)
con_ndx = 0
new_con_map = {}
for c in self._pyomo_con_to_solver_con_map:
new_con_map[c] = con_ndx
con_ndx += 1
self._pyomo_con_to_solver_con_map.clear()
self._pyomo_con_to_solver_con_map.update(new_con_map)
self._solver_con_to_pyomo_con_map.clear()
self._solver_con_to_pyomo_con_map.update(
{v: k for k, v in self._pyomo_con_to_solver_con_map.items()}
)
def _remove_sos_constraints(self, cons: List[SOSConstraintData]):
if cons:
raise NotImplementedError(
'Highs interface does not support SOS constraints'
)
def _remove_variables(self, variables: List[VarData]):
self._sol = None
if self._last_results_object is not None:
self._last_results_object.solution_loader.invalidate()
indices_to_remove = []
for v in variables:
v_id = id(v)
v_ndx = self._pyomo_var_to_solver_var_map.pop(v_id)
indices_to_remove.append(v_ndx)
self._mutable_bounds.pop(v_id, None)
indices_to_remove.sort()
self._solver_model.deleteVars(
len(indices_to_remove), np.array(list(sorted(indices_to_remove)))
)
v_ndx = 0
new_var_map = {}
for v_id in self._pyomo_var_to_solver_var_map:
new_var_map[v_id] = v_ndx
v_ndx += 1
self._pyomo_var_to_solver_var_map.clear()
self._pyomo_var_to_solver_var_map.update(new_var_map)
def _remove_parameters(self, params: List[ParamData]):
pass
def _update_variables(self, variables: List[VarData]):
self._sol = None
if self._last_results_object is not None:
self._last_results_object.solution_loader.invalidate()
indices = []
lbs = []
ubs = []
vtypes = []
for v in variables:
v_id = id(v)
self._mutable_bounds.pop(v_id, None)
v_ndx = self._pyomo_var_to_solver_var_map[v_id]
lb, ub, vtype = self._process_domain_and_bounds(v_id)
lbs.append(lb)
ubs.append(ub)
vtypes.append(vtype)
indices.append(v_ndx)
self._solver_model.changeColsBounds(
len(indices),
np.array(indices),
np.array(lbs, dtype=np.double),
np.array(ubs, dtype=np.double),
)
self._solver_model.changeColsIntegrality(
len(indices), np.array(indices), np.array(vtypes)
)
[docs]
def update_parameters(self):
self._sol = None
if self._last_results_object is not None:
self._last_results_object.solution_loader.invalidate()
for con, helpers in self._mutable_helpers.items():
for helper in helpers:
helper.update()
for k, (v, helper) in self._mutable_bounds.items():
helper.update()
self._mutable_objective.update()
def _set_objective(self, obj):
self._sol = None
if self._last_results_object is not None:
self._last_results_object.solution_loader.invalidate()
n = len(self._pyomo_var_to_solver_var_map)
indices = np.arange(n)
costs = np.zeros(n, dtype=np.double)
# Initialize empty lists for all coefficient types
mutable_linear_coefficients = []
mutable_quadratic_coefficients = []
if obj is None:
sense = highspy.ObjSense.kMinimize
mutable_constant = _MutableObjectiveOffset(expr=0, highs=self._solver_model)
else:
if obj.sense == minimize:
sense = highspy.ObjSense.kMinimize
elif obj.sense == maximize:
sense = highspy.ObjSense.kMaximize
else:
raise ValueError(f'Objective sense is not recognized: {obj.sense}')
repn = generate_standard_repn(
obj.expr, quadratic=True, compute_values=False
)
if repn.nonlinear_expr is not None or repn.polynomial_degree() > 2:
raise IncompatibleModelError(
f'Highs interface does not support expressions of degree {repn.polynomial_degree()}'
)
for coef, v in zip(repn.linear_coefs, repn.linear_vars):
v_id = id(v)
v_ndx = self._pyomo_var_to_solver_var_map[v_id]
costs[v_ndx] = value(coef)
if not is_constant(coef):
mutable_objective_coef = _MutableObjectiveCoefficient(
pyomo_var_id=v_id,
var_map=self._pyomo_var_to_solver_var_map,
expr=coef,
highs=self._solver_model,
)
mutable_linear_coefficients.append(mutable_objective_coef)
mutable_constant = _MutableObjectiveOffset(
expr=repn.constant, highs=self._solver_model
)
if repn.quadratic_vars and len(repn.quadratic_vars) > 0:
for ndx, (v1, v2) in enumerate(repn.quadratic_vars):
coef = repn.quadratic_coefs[ndx]
mutable_quadratic_coefficients.append(
_MutableQuadraticCoefficient(
expr=coef, v1_id=id(v1), v2_id=id(v2)
)
)
self._solver_model.changeObjectiveSense(sense)
self._solver_model.changeColsCost(n, indices, costs)
self._mutable_objective = _MutableObjective(
self._solver_model,
mutable_constant,
mutable_linear_coefficients,
mutable_quadratic_coefficients,
self._pyomo_var_to_solver_var_map,
)
self._mutable_objective.update()
def _postsolve(self, stream: io.StringIO):
config = self._active_config
timer = config.timer
timer.start('load solution')
highs = self._solver_model
status = highs.getModelStatus()
results = Results()
results.solution_loader = PersistentSolutionLoader(self)
results.solver_name = self.name
results.solver_version = self.version()
results.solver_config = config
results.solver_log = stream.getvalue()
results.timing_info.highs_time = highs.getRunTime()
self._sol = highs.getSolution()
has_feasible_solution = self._sol.value_valid
if status == highspy.HighsModelStatus.kOptimal:
results.solution_status = SolutionStatus.optimal
elif has_feasible_solution:
results.solution_status = SolutionStatus.feasible
else:
results.solution_status = SolutionStatus.noSolution
if status == highspy.HighsModelStatus.kNotset:
results.termination_condition = TerminationCondition.unknown
elif status == highspy.HighsModelStatus.kLoadError:
results.termination_condition = TerminationCondition.error
elif status == highspy.HighsModelStatus.kModelError:
results.termination_condition = TerminationCondition.error
elif status == highspy.HighsModelStatus.kPresolveError:
results.termination_condition = TerminationCondition.error
elif status == highspy.HighsModelStatus.kSolveError:
results.termination_condition = TerminationCondition.error
elif status == highspy.HighsModelStatus.kPostsolveError:
results.termination_condition = TerminationCondition.error
elif status == highspy.HighsModelStatus.kModelEmpty:
results.termination_condition = TerminationCondition.unknown
elif status == highspy.HighsModelStatus.kOptimal:
results.termination_condition = (
TerminationCondition.convergenceCriteriaSatisfied
)
elif status == highspy.HighsModelStatus.kInfeasible:
results.termination_condition = TerminationCondition.provenInfeasible
elif status == highspy.HighsModelStatus.kUnboundedOrInfeasible:
results.termination_condition = TerminationCondition.infeasibleOrUnbounded
elif status == highspy.HighsModelStatus.kUnbounded:
results.termination_condition = TerminationCondition.unbounded
elif status == highspy.HighsModelStatus.kObjectiveBound:
results.termination_condition = TerminationCondition.objectiveLimit
elif status == highspy.HighsModelStatus.kObjectiveTarget:
results.termination_condition = TerminationCondition.objectiveLimit
elif status == highspy.HighsModelStatus.kTimeLimit:
results.termination_condition = TerminationCondition.maxTimeLimit
elif status == highspy.HighsModelStatus.kIterationLimit:
results.termination_condition = TerminationCondition.iterationLimit
elif status == getattr(highspy.HighsModelStatus, "kSolutionLimit", NOTSET):
# kSolutionLimit was introduced in HiGHS v1.5.3 for MIP-related limits
results.termination_condition = TerminationCondition.iterationLimit
elif status == highspy.HighsModelStatus.kUnknown:
results.termination_condition = TerminationCondition.unknown
else:
logger.warning(f'Received unhandled {status=} from solver HiGHS.')
results.termination_condition = TerminationCondition.unknown
if (
results.termination_condition
!= TerminationCondition.convergenceCriteriaSatisfied
and config.raise_exception_on_nonoptimal_result
):
raise NoOptimalSolutionError()
results.incumbent_objective = None
results.objective_bound = None
info = highs.getInfo()
if self._objective is not None:
if has_feasible_solution:
results.incumbent_objective = info.objective_function_value
if info.mip_node_count == -1:
if has_feasible_solution:
results.objective_bound = info.objective_function_value
else:
results.objective_bound = None
else:
results.objective_bound = info.mip_dual_bound
if info.valid:
results.extra_info.simplex_iteration_count = (
info.simplex_iteration_count
)
results.extra_info.ipm_iteration_count = info.ipm_iteration_count
results.extra_info.mip_node_count = info.mip_node_count
results.extra_info.pdlp_iteration_count = info.pdlp_iteration_count
results.extra_info.qp_iteration_count = info.qp_iteration_count
if config.load_solutions:
if has_feasible_solution:
self._load_vars()
else:
raise NoFeasibleSolutionError()
timer.stop('load solution')
return results
def _load_vars(self, vars_to_load=None):
for v, val in self._get_primals(vars_to_load=vars_to_load).items():
v.set_value(val, skip_validation=True)
StaleFlagManager.mark_all_as_stale(delayed=True)
def _get_primals(self, vars_to_load=None):
if self._sol is None or not self._sol.value_valid:
raise NoSolutionError()
res = ComponentMap()
if vars_to_load is None:
var_ids_to_load = []
for v, ref_info in self._referenced_variables.items():
using_cons, using_sos, using_obj = ref_info
if using_cons or using_sos or (using_obj is not None):
var_ids_to_load.append(v)
else:
var_ids_to_load = [id(v) for v in vars_to_load]
var_vals = self._sol.col_value
for v_id in var_ids_to_load:
v = self._vars[v_id][0]
v_ndx = self._pyomo_var_to_solver_var_map[v_id]
res[v] = var_vals[v_ndx]
return res
def _get_reduced_costs(self, vars_to_load=None):
if self._sol is None or not self._sol.dual_valid:
raise NoReducedCostsError()
res = ComponentMap()
if vars_to_load is None:
var_ids_to_load = list(self._vars.keys())
else:
var_ids_to_load = [id(v) for v in vars_to_load]
var_vals = self._sol.col_dual
for v_id in var_ids_to_load:
v = self._vars[v_id][0]
v_ndx = self._pyomo_var_to_solver_var_map[v_id]
res[v] = var_vals[v_ndx]
return res
def _get_duals(self, cons_to_load=None):
if self._sol is None or not self._sol.dual_valid:
raise NoDualsError()
res = {}
if cons_to_load is None:
cons_to_load = list(self._pyomo_con_to_solver_con_map.keys())
duals = self._sol.row_dual
for c in cons_to_load:
c_ndx = self._pyomo_con_to_solver_con_map[c]
res[c] = duals[c_ndx]
return res