Source code for pyomo.contrib.solver.solvers.knitro.engine

# ____________________________________________________________________________________
#
# Pyomo: Python Optimization Modeling Objects
# Copyright (c) 2008-2026 National Technology and Engineering Solutions of Sandia, LLC
# Under the terms of Contract DE-NA0003525 with National Technology and Engineering
# Solutions of Sandia, LLC, the U.S. Government retains certain rights in this
# software.  This software is distributed under the 3-clause BSD License.
# ____________________________________________________________________________________

from collections.abc import Callable, Iterable, Mapping, MutableMapping, Sequence
from types import MappingProxyType
from typing import Any, TypeVar

from pyomo.common.enums import ObjectiveSense
from pyomo.common.errors import DeveloperError
from pyomo.common.numeric_types import value
from pyomo.contrib.solver.common.results import SolutionStatus, TerminationCondition

from pyomo.contrib.solver.solvers.knitro.api import knitro
from pyomo.contrib.solver.solvers.knitro.callback import build_callback_handler
from pyomo.contrib.solver.solvers.knitro.package import Package
from pyomo.contrib.solver.solvers.knitro.typing import (
    BoundType,
    Callback,
    ItemData,
    ItemType,
    StructureType,
    ValueType,
)
from pyomo.contrib.solver.solvers.knitro.utils import NonlinearExpressionData
from pyomo.core.base.constraint import ConstraintData
from pyomo.core.base.objective import ObjectiveData
from pyomo.core.base.var import VarData
from pyomo.repn.standard_repn import generate_standard_repn


[docs] def parse_bounds( items: Iterable[ItemType], idx_map: Mapping[int, int] ) -> Mapping[BoundType, MutableMapping[int, float]]: bounds_map = {bnd_type: {} for bnd_type in BoundType} for item in items: i = idx_map[id(item)] if isinstance(item, VarData): if item.fixed: bounds_map[BoundType.EQ][i] = value(item.value) continue lb, ub = item.bounds if lb is not None: bounds_map[BoundType.LO][i] = lb if ub is not None: bounds_map[BoundType.UP][i] = ub elif isinstance(item, ConstraintData): lb, _, ub = item.to_bounded_expression(evaluate_bounds=True) if item.equality: bounds_map[BoundType.EQ][i] = lb continue if lb is not None: bounds_map[BoundType.LO][i] = lb if ub is not None: bounds_map[BoundType.UP][i] = ub return bounds_map
[docs] def parse_types( items: Iterable[ItemType], idx_map: Mapping[int, int] ) -> Mapping[int, int]: types_map = {} for item in items: i = idx_map[id(item)] if isinstance(item, VarData): if item.is_binary(): types_map[i] = knitro.KN_VARTYPE_BINARY elif item.is_integer(): types_map[i] = knitro.KN_VARTYPE_INTEGER elif item.is_continuous(): types_map[i] = knitro.KN_VARTYPE_CONTINUOUS else: msg = f"Variable {item.name} has unsupported type." raise ValueError(msg) return types_map
[docs] def api_set_param(param_type: int) -> Callable[..., None]: if param_type == knitro.KN_PARAMTYPE_INTEGER: return knitro.KN_set_int_param elif param_type == knitro.KN_PARAMTYPE_FLOAT: return knitro.KN_set_double_param elif param_type == knitro.KN_PARAMTYPE_STRING: return knitro.KN_set_char_param raise DeveloperError(f"Unsupported KNITRO parameter type: {param_type}")
[docs] def api_get_values( item_type: type[ItemType], value_type: ValueType ) -> Callable[..., list[float] | None]: if item_type is VarData: if value_type == ValueType.PRIMAL: return knitro.KN_get_var_primal_values elif value_type == ValueType.DUAL: return knitro.KN_get_var_dual_values elif item_type is ConstraintData: if value_type == ValueType.DUAL: return knitro.KN_get_con_dual_values elif value_type == ValueType.PRIMAL: return knitro.KN_get_con_values raise DeveloperError( f"Unsupported KNITRO item type or value type: {item_type}, {value_type}" )
[docs] def api_add_items(item_type: type[ItemType]) -> Callable[..., list[int] | None]: if item_type is VarData: return knitro.KN_add_vars elif item_type is ConstraintData: return knitro.KN_add_cons raise DeveloperError(f"Unsupported KNITRO item type: {item_type}")
[docs] def api_set_bnds( item_type: type[ItemType], bound_type: BoundType ) -> Callable[..., None]: if item_type is VarData: if bound_type == BoundType.EQ: return knitro.KN_set_var_fxbnds elif bound_type == BoundType.LO: return knitro.KN_set_var_lobnds elif bound_type == BoundType.UP: return knitro.KN_set_var_upbnds elif item_type is ConstraintData: if bound_type == BoundType.EQ: return knitro.KN_set_con_eqbnds elif bound_type == BoundType.LO: return knitro.KN_set_con_lobnds elif bound_type == BoundType.UP: return knitro.KN_set_con_upbnds raise DeveloperError( f"Unsupported KNITRO item type or bound type: {item_type}, {bound_type}" )
[docs] def api_set_types(item_type: type[ItemType]) -> Callable[..., None]: if item_type is VarData: return knitro.KN_set_var_types raise DeveloperError(f"Unsupported KNITRO item type: {item_type}")
[docs] def api_add_struct(is_obj: bool, structure_type: StructureType) -> Callable[..., None]: if is_obj: if structure_type == StructureType.CONSTANT: return knitro.KN_add_obj_constant elif structure_type == StructureType.LINEAR: return knitro.KN_add_obj_linear_struct elif structure_type == StructureType.QUADRATIC: return knitro.KN_add_obj_quadratic_struct else: if structure_type == StructureType.CONSTANT: return knitro.KN_add_con_constants elif structure_type == StructureType.LINEAR: return knitro.KN_add_con_linear_struct elif structure_type == StructureType.QUADRATIC: return knitro.KN_add_con_quadratic_struct raise DeveloperError( f"Unsupported KNITRO structure type: is_obj={is_obj}, structure_type={structure_type}" )
[docs] class Engine: """A wrapper around the KNITRO API for a single optimization problem.""" has_objective: bool maps: Mapping[type[ItemData], MutableMapping[int, int]] nonlinear_map: MutableMapping[int | None, NonlinearExpressionData] nonlinear_diff_order: int _kc: Any | None _status: int | None
[docs] def __init__(self, *, nonlinear_diff_order: int = 2) -> None: self.has_objective = False # Maps: # VarData -> {id(var): idx in KNITRO} # ConstraintData -> {id(con): idx in KNITRO} self.maps = MappingProxyType({VarData: {}, ConstraintData: {}}) # Nonlinear map: # None -> objective nonlinear expression # idx_con -> constraint nonlinear expression self.nonlinear_map = {} self.nonlinear_diff_order = nonlinear_diff_order self._kc = None self._status = None
def __enter__(self) -> "Engine": self.renew() return self def __exit__(self, exc_type, exc_value, traceback) -> None: self.close() def __del__(self) -> None: self.close() def renew(self) -> None: self.close() self._kc = Package.create_context() self.has_objective = False for item_type in self.maps: self.maps[item_type].clear() self.nonlinear_map.clear() # TODO: remove this when the tolerance tests are fixed in test_solvers tol = 1e-8 self.set_options(ftol=tol, opttol=tol, xtol=tol) def close(self) -> None: if self._kc is not None: self.execute(knitro.KN_free) self._kc = None T = TypeVar("T") def execute(self, api_func: Callable[..., T], *args, **kwargs) -> T: if self._kc is None: msg = "KNITRO context has not been initialized or has been freed." raise RuntimeError(msg) return api_func(self._kc, *args, **kwargs) def add_vars(self, variables: Sequence[VarData]) -> None: self.add_items(VarData, variables) self.set_types(VarData, variables) self.set_bounds(VarData, variables) def add_cons(self, cons: Sequence[ConstraintData]) -> None: self.add_items(ConstraintData, cons) self.set_bounds(ConstraintData, cons) self.set_con_structures(cons) def set_obj(self, obj: ObjectiveData) -> None: self.has_objective = True self.set_obj_goal(obj.sense) self.set_obj_structures(obj) def set_options(self, **options) -> None: for param, val in options.items(): self.set_option(param, val) def set_outlev(self, level: int | None = None) -> None: if level is None: level = knitro.KN_OUTLEV_ALL self.set_options(outlev=level) def set_time_limit(self, time_limit: float) -> None: self.set_options(maxtime_cpu=time_limit) def set_num_threads(self, nthreads: int) -> None: self.set_options(threads=nthreads) def solve(self) -> int: self.register_callbacks() self._status = self.execute(knitro.KN_solve) return self._status def get_idx_vars(self, variables: Iterable[VarData]) -> list[int]: return self.get_idxs(VarData, variables) def get_status(self) -> int: if self._status is None: msg = "Solver has not been run. No status is available!" raise RuntimeError(msg) return self._status def get_number_iters(self) -> int: return self.execute(knitro.KN_get_number_iters) def get_mip_number_nodes(self) -> int: return self.execute(knitro.KN_get_mip_number_nodes) def get_mip_abs_gap(self) -> float: return self.execute(knitro.KN_get_mip_abs_gap) def get_mip_rel_gap(self) -> float: return self.execute(knitro.KN_get_mip_rel_gap) def get_mip_number_solves(self) -> int: return self.execute(knitro.KN_get_mip_number_solves) def get_num_solutions(self) -> int: _, _, x, _ = self.execute(knitro.KN_get_solution) return 1 if x is not None else 0 def get_solve_time(self) -> float: return self.execute(knitro.KN_get_solve_time_real) def get_obj_value(self) -> float | None: if not self.has_objective: return None if self.get_solution_status() in { SolutionStatus.optimal, SolutionStatus.feasible, }: return self.execute(knitro.KN_get_obj_value) return None def get_obj_bound(self) -> float | None: if not self.has_objective: return None return self.execute(knitro.KN_get_mip_relaxation_bnd) def get_idxs( self, item_type: type[ItemType], items: Iterable[ItemType] ) -> list[int]: idx_map = self.maps[item_type] return [idx_map[id(item)] for item in items] def get_values( self, item_type: type[ItemType], value_type: ValueType, items: Iterable[ItemType], ) -> list[float] | None: func = api_get_values(item_type, value_type) idxs = self.get_idxs(item_type, items) return self.execute(func, idxs) def set_option(self, param: str, val) -> None: param_id = self.execute(knitro.KN_get_param_id, param) param_type = self.execute(knitro.KN_get_param_type, param_id) func = api_set_param(param_type) self.execute(func, param_id, val) def set_obj_goal(self, sense: ObjectiveSense) -> None: obj_goal = ( knitro.KN_OBJGOAL_MINIMIZE if sense == ObjectiveSense.minimize else knitro.KN_OBJGOAL_MAXIMIZE ) self.execute(knitro.KN_set_obj_goal, obj_goal) def add_items(self, item_type: type[ItemType], items: Sequence[ItemType]) -> None: func = api_add_items(item_type) idxs = self.execute(func, len(items)) if idxs is not None: self.maps[item_type].update(zip(map(id, items), idxs)) def set_bounds(self, item_type: type[ItemType], items: Iterable[ItemType]) -> None: bounds_map = parse_bounds(items, self.maps[item_type]) for bound_type, bounds in bounds_map.items(): if not bounds: continue func = api_set_bnds(item_type, bound_type) self.execute(func, bounds.keys(), bounds.values()) def set_types(self, item_type: type[ItemType], items: Iterable[ItemType]) -> None: types_map = parse_types(items, self.maps[item_type]) if types_map: func = api_set_types(item_type) self.execute(func, types_map.keys(), types_map.values()) def set_con_structures(self, cons: Iterable[ConstraintData]) -> None: for con in cons: i = self.maps[ConstraintData][id(con)] self.add_structures(i, con.body) def set_obj_structures(self, obj: ObjectiveData) -> None: self.add_structures(None, obj.expr) def add_structures(self, i: int | None, expr) -> None: repn = generate_standard_repn(expr) if repn is None: return is_obj = i is None base_args = () if is_obj else (i,) structure_type_seq: list[StructureType] = [] args_seq: list[tuple[Any, ...]] = [] if repn.constant is not None: structure_type_seq += [StructureType.CONSTANT] args_seq += [(repn.constant,)] if repn.linear_vars: idx_lin_vars = self.get_idx_vars(repn.linear_vars) lin_coefs = list(repn.linear_coefs) structure_type_seq += [StructureType.LINEAR] args_seq += [(idx_lin_vars, lin_coefs)] if repn.quadratic_vars: quad_vars1, quad_vars2 = zip(*repn.quadratic_vars) idx_quad_vars1 = self.get_idx_vars(quad_vars1) idx_quad_vars2 = self.get_idx_vars(quad_vars2) quad_coefs = list(repn.quadratic_coefs) structure_type_seq += [StructureType.QUADRATIC] args_seq += [(idx_quad_vars1, idx_quad_vars2, quad_coefs)] for structure_type, args in zip(structure_type_seq, args_seq): func = api_add_struct(is_obj, structure_type) self.execute(func, *base_args, *args) if repn.nonlinear_expr is not None: self.nonlinear_map[i] = NonlinearExpressionData( repn.nonlinear_expr, repn.nonlinear_vars, var_map=self.maps[VarData], diff_order=self.nonlinear_diff_order, ) def add_callback( self, i: int | None, expr: NonlinearExpressionData, callback: Callback ) -> None: is_obj = i is None idx_cons = [i] if not is_obj else None cb = self.execute(knitro.KN_add_eval_callback, is_obj, idx_cons, callback.func) if expr.diff_order >= 1: idx_vars = self.get_idx_vars(expr.grad_vars) obj_grad_idx_vars = idx_vars if is_obj else None jac_idx_cons = [i] * len(idx_vars) if not is_obj else None jac_idx_vars = idx_vars if not is_obj else None self.execute( knitro.KN_set_cb_grad, cb, obj_grad_idx_vars, jac_idx_cons, jac_idx_vars, callback.grad, ) if expr.diff_order >= 2: hess_vars1, hess_vars2 = zip(*expr.hess_vars) hess_idx_vars1 = self.get_idx_vars(hess_vars1) hess_idx_vars2 = self.get_idx_vars(hess_vars2) self.execute( knitro.KN_set_cb_hess, cb, hess_idx_vars1, hess_idx_vars2, callback.hess ) def register_callbacks(self) -> None: for i, expr in self.nonlinear_map.items(): self.register_callback(i, expr) def register_callback(self, i: int | None, expr: NonlinearExpressionData) -> None: callback = build_callback_handler(expr, idx=i).expand() self.add_callback(i, expr, callback)
[docs] def get_solution_status(self) -> SolutionStatus: """ Map KNITRO status codes to Pyomo SolutionStatus values. See https://www.artelys.com/app/docs/knitro/3_referenceManual/returnCodes.html """ if self._status is None: msg = "Solver has not been run. No solution status is available!" raise RuntimeError(msg) elif self._status in { knitro.KN_RC_OPTIMAL, knitro.KN_RC_OPTIMAL_OR_SATISFACTORY, knitro.KN_RC_NEAR_OPT, }: return SolutionStatus.optimal elif self._status in { knitro.KN_RC_FEAS_XTOL, knitro.KN_RC_FEAS_NO_IMPROVE, knitro.KN_RC_FEAS_FTOL, -103, # KN_RC_FEAS_BEST -104, # KN_RC_FEAS_MULTISTART knitro.KN_RC_ITER_LIMIT_FEAS, knitro.KN_RC_TIME_LIMIT_FEAS, knitro.KN_RC_FEVAL_LIMIT_FEAS, knitro.KN_RC_MIP_EXH_FEAS, knitro.KN_RC_MIP_TERM_FEAS, knitro.KN_RC_MIP_SOLVE_LIMIT_FEAS, knitro.KN_RC_MIP_NODE_LIMIT_FEAS, }: return SolutionStatus.feasible elif self._status in { knitro.KN_RC_INFEASIBLE, knitro.KN_RC_INFEAS_CON_BOUNDS, knitro.KN_RC_INFEAS_VAR_BOUNDS, knitro.KN_RC_INFEAS_MULTISTART, }: return SolutionStatus.infeasible else: return SolutionStatus.noSolution
[docs] def get_termination_condition(self) -> TerminationCondition: """ Map KNITRO status codes to Pyomo TerminationCondition values. See https://www.artelys.com/app/docs/knitro/3_referenceManual/returnCodes.html """ if self._status is None: msg = "Solver has not been run. No termination condition is available!" raise RuntimeError(msg) elif self._status in { knitro.KN_RC_OPTIMAL, knitro.KN_RC_OPTIMAL_OR_SATISFACTORY, knitro.KN_RC_NEAR_OPT, }: return TerminationCondition.convergenceCriteriaSatisfied elif self._status in { knitro.KN_RC_INFEAS_NO_IMPROVE, knitro.KN_RC_INFEAS_MULTISTART, }: return TerminationCondition.locallyInfeasible elif self._status in { knitro.KN_RC_INFEASIBLE, knitro.KN_RC_INFEAS_CON_BOUNDS, knitro.KN_RC_INFEAS_VAR_BOUNDS, }: return TerminationCondition.provenInfeasible elif self._status in {knitro.KN_RC_UNBOUNDED, knitro.KN_RC_UNBOUNDED_OR_INFEAS}: return TerminationCondition.infeasibleOrUnbounded elif self._status in { knitro.KN_RC_ITER_LIMIT_FEAS, knitro.KN_RC_FEVAL_LIMIT_FEAS, knitro.KN_RC_MIP_EXH_FEAS, knitro.KN_RC_MIP_TERM_FEAS, knitro.KN_RC_MIP_SOLVE_LIMIT_FEAS, knitro.KN_RC_MIP_NODE_LIMIT_FEAS, knitro.KN_RC_ITER_LIMIT_INFEAS, knitro.KN_RC_FEVAL_LIMIT_INFEAS, knitro.KN_RC_MIP_EXH_INFEAS, knitro.KN_RC_MIP_SOLVE_LIMIT_INFEAS, knitro.KN_RC_MIP_NODE_LIMIT_INFEAS, }: return TerminationCondition.iterationLimit elif self._status in { knitro.KN_RC_TIME_LIMIT_FEAS, knitro.KN_RC_TIME_LIMIT_INFEAS, }: return TerminationCondition.maxTimeLimit elif self._status == knitro.KN_RC_USER_TERMINATION: return TerminationCondition.interrupted elif -500 >= self._status >= -600: return TerminationCondition.error else: return TerminationCondition.unknown