# ____________________________________________________________________________________
#
# Pyomo: Python Optimization Modeling Objects
# Copyright (c) 2008-2026 National Technology and Engineering Solutions of Sandia, LLC
# Under the terms of Contract DE-NA0003525 with National Technology and Engineering
# Solutions of Sandia, LLC, the U.S. Government retains certain rights in this
# software. This software is distributed under the 3-clause BSD License.
# ____________________________________________________________________________________
import re
import sys
import time
import logging
import shlex
from typing import overload
from pyomo.common import Factory
from pyomo.common.enums import SolverAPIVersion
from pyomo.common.errors import ApplicationError
from pyomo.common.collections import Bunch
from pyomo.opt.base.convert import convert_problem
from pyomo.opt.base.formats import ResultsFormat
import pyomo.opt.base.results
logger = logging.getLogger('pyomo.opt')
# The version string is first searched for trunk/Trunk, and if
# found a tuple of infinities is returned. Otherwise, the first
# match of number[.number] where [.number] can repeat 1-3 times
# is used, which is translated into a tuple of size matching
# the keyword length (appending 0's when necessary). If no match
# is found None is returned (although one could argue a tuple of
# 0's might be appropriated).
def _extract_version(x, length=4):
"""
Attempts to extract solver version information from a string.
"""
assert (1 <= length) and (length <= 4)
m = re.search('[t,T]runk', x)
if m is not None:
# Since most version checks are comparing if the current
# version is greater/less than some other version, it makes
# since that a solver advertising trunk should always be greater
# than a version check, hence returning a tuple of infinities
return tuple(float('inf') for i in range(length))
m = re.search(r'[0-9]+(\.[0-9]+){1,3}', x)
if not m is None:
version = tuple(int(i) for i in m.group(0).split('.')[:length])
while len(version) < length:
version += (0,)
return version
return None # (0,0,0,0)[:length]
[docs]
class UnknownSolver:
[docs]
def __init__(self, *args, **kwds):
# super(UnknownSolver,self).__init__(**kwds)
#
# The 'type' is the class type of the solver instance
#
if "type" in kwds:
self.type = kwds["type"]
else: # pragma:nocover
raise ValueError("Expected option 'type' for UnknownSolver constructor")
self.options = {}
self._args = args
self._kwds = kwds
#
# Support "with" statements. Forgetting to call deactivate
# on Plugins is a common source of memory leaks
#
def __enter__(self):
return self
def __exit__(self, t, v, traceback):
pass
[docs]
@classmethod
def api_version(self):
"""
Return the public API supported by this interface.
Returns
-------
~pyomo.common.enums.SolverAPIVersion
A solver API enum object
"""
return SolverAPIVersion.V1
[docs]
def available(self, exception_flag=True):
"""Determine if this optimizer is available."""
if exception_flag:
raise ApplicationError("Solver (%s) not available" % str(self.name))
return False
[docs]
def license_is_valid(self):
"True if the solver is present and has a valid license (if applicable)"
return False
[docs]
def warm_start_capable(self):
"""True is the solver can accept a warm-start solution."""
return False
[docs]
def solve(self, *args, **kwds):
"""Perform optimization and return an SolverResults object."""
self._solver_error('solve')
[docs]
def reset(self):
"""Reset the state of an optimizer"""
self._solver_error('reset')
[docs]
def set_options(self, istr):
"""Set the options in the optimizer from a string."""
self._solver_error('set_options')
def __bool__(self):
return self.available()
def __getattr__(self, attr):
self._solver_error(attr)
def _solver_error(self, method_name):
raise RuntimeError(
"""Attempting to use an unavailable solver.
The SolverFactory was unable to create the solver "%s"
and returned an UnknownSolver object. This error is raised at the point
where the UnknownSolver object was used as if it were valid (by calling
method "%s").
The original solver was created with the following parameters:
\t""" % (self.type, method_name)
+ "\n\t".join("%s: %s" % i for i in sorted(self._kwds.items()))
+ "\n\t_args: %s" % (self._args,)
+ "\n\toptions: %s" % (self.options,)
)
[docs]
class SolverFactoryClass(Factory):
@overload
def __call__(self, _name: None = None, **kwds) -> "SolverFactoryClass": ...
@overload
def __call__(self, _name, **kwds) -> "OptSolver": ...
def __call__(self, _name=None, **kwds):
if _name is None:
return self
_name = str(_name)
if ':' in _name:
_name, subsolver = _name.split(':', 1)
kwds['solver'] = subsolver
elif 'solver' in kwds:
subsolver = kwds['solver']
else:
subsolver = None
opt = None
try:
if _name in self._cls:
opt = self._cls[_name](**kwds)
else:
mode = kwds.get('solver_io', 'nl')
if mode is None:
mode = 'nl'
_implicit_solvers = {'nl': 'asl'}
if "executable" not in kwds:
kwds["executable"] = _name
if mode in _implicit_solvers:
if _implicit_solvers[mode] not in self._cls:
raise RuntimeError(
" The solver plugin was not registered.\n"
" Please confirm that the 'pyomo.environ' package has been imported."
)
opt = self._cls[_implicit_solvers[mode]](**kwds)
if opt is not None:
opt.set_options('solver=' + _name)
except:
err = sys.exc_info()
logger.warning(
"Failed to create solver with name '%s':\n%s" % (_name, err[1]),
exc_info=err,
)
opt = None
if opt is not None and _name != "py" and subsolver is not None:
# py just creates instance of its subsolver, no need for this option
opt.set_options('solver=' + subsolver)
if opt is None:
opt = UnknownSolver(type=_name, **kwds)
opt.name = _name
return opt
#: Global registry/factory for "v1" solver interfaces.
LegacySolverFactory: SolverFactoryClass = SolverFactoryClass('solver type')
SolverFactory = SolverFactoryClass('solver type')
SolverFactory._cls = LegacySolverFactory._cls
SolverFactory._doc = LegacySolverFactory._doc
#
# TODO: It is impossible to load CBC with NL file-io using this function,
# i.e., SolverFactory("cbc", solver_io='nl'),
# this is NOT asl:cbc (same with PICO)
# WEH: Why is there a distinction between SolverFactory('asl:cbc') and SolverFactory('cbc', solver_io='nl')??? This is bad.
#
[docs]
def check_available_solvers(*args):
from pyomo.solvers.plugins.solvers.GUROBI import GUROBISHELL
from pyomo.solvers.plugins.solvers.BARON import BARONSHELL
from pyomo.solvers.plugins.solvers.mosek_direct import MOSEKDirect
logging.disable(logging.WARNING)
ans = []
for arg in args:
if not isinstance(arg, tuple):
name = arg
arg = (arg,)
else:
name = arg[0]
opt = SolverFactory(*arg)
if opt is None or isinstance(opt, UnknownSolver):
continue # not available
if not opt.available(exception_flag=False):
continue # not available
if hasattr(opt, 'executable') and opt.executable() is None:
continue # not available
if not opt.license_is_valid():
continue # not available
# At this point, the solver is available (and licensed)
ans.append(name)
logging.disable(logging.NOTSET)
return ans
def _raise_ephemeral_error(name, keyword=""):
raise AttributeError(
"The property '%s' can no longer be set directly on "
"the solver object. It should instead be passed as a "
"keyword into the solve method%s. It will automatically "
"be reset to its default value after each invocation of "
"solve." % (name, keyword)
)
[docs]
class OptSolver:
"""A generic optimization solver"""
#
# Support "with" statements. Forgetting to call deactivate
# on Plugins is a common source of memory leaks
#
def __enter__(self):
return self
def __exit__(self, t, v, traceback):
pass
[docs]
@classmethod
def api_version(self):
"""
Return the public API supported by this interface.
Returns
-------
~pyomo.common.enums.SolverAPIVersion
A solver API enum object
"""
return SolverAPIVersion.V1
#
# Adding to help track down invalid code after making
# the following attributes private
#
@property
def tee(self):
_raise_ephemeral_error('tee')
@tee.setter
def tee(self, val):
_raise_ephemeral_error('tee')
@property
def suffixes(self):
_raise_ephemeral_error('suffixes')
@suffixes.setter
def suffixes(self, val):
_raise_ephemeral_error('suffixes')
@property
def keepfiles(self):
_raise_ephemeral_error('keepfiles')
@keepfiles.setter
def keepfiles(self, val):
_raise_ephemeral_error('keepfiles')
@property
def soln_file(self):
_raise_ephemeral_error('soln_file')
@soln_file.setter
def soln_file(self, val):
_raise_ephemeral_error('soln_file')
@property
def log_file(self):
_raise_ephemeral_error('log_file')
@log_file.setter
def log_file(self, val):
_raise_ephemeral_error('log_file')
@property
def symbolic_solver_labels(self):
_raise_ephemeral_error('symbolic_solver_labels')
@symbolic_solver_labels.setter
def symbolic_solver_labels(self, val):
_raise_ephemeral_error('symbolic_solver_labels')
@property
def warm_start_solve(self):
_raise_ephemeral_error('warm_start_solve', keyword=" (warmstart)")
@warm_start_solve.setter
def warm_start_solve(self, val):
_raise_ephemeral_error('warm_start_solve', keyword=" (warmstart)")
@property
def warm_start_file_name(self):
_raise_ephemeral_error('warm_start_file_name', keyword=" (warmstart_file)")
@warm_start_file_name.setter
def warm_start_file_name(self, val):
_raise_ephemeral_error('warm_start_file_name', keyword=" (warmstart_file)")
[docs]
def __init__(self, **kwds):
"""Constructor"""
#
# The 'type' is the class type of the solver instance
#
if "type" in kwds:
self.type = kwds["type"]
else: # pragma:nocover
raise ValueError("Expected option 'type' for OptSolver constructor")
#
# The 'name' is either the class type of the solver instance, or a
# assigned name.
#
if "name" in kwds:
self.name = kwds["name"]
else:
self.name = self.type
if "doc" in kwds:
self._doc = kwds["doc"]
else:
if self.type is None: # pragma:nocover
self._doc = ""
elif self.name == self.type:
self._doc = "%s OptSolver" % self.name
else:
self._doc = "%s OptSolver (type %s)" % (self.name, self.type)
#
# Options are persistent, meaning users must modify the
# options dict directly rather than pass them into _presolve
# through the solve command. Everything else is reset inside
# presolve
#
self.options = Bunch()
if 'options' in kwds and not kwds['options'] is None:
for key in kwds['options']:
setattr(self.options, key, kwds['options'][key])
# the symbol map is an attribute of the solver plugin only
# because it is generated in presolve and used to tag results
# so they are interpretable - basically, it persists across
# multiple methods.
self._smap_id = None
# These are ephemeral options that can be set by the user during
# the call to solve, but will be reset to defaults if not given
self._load_solutions = True
self._select_index = 0
self._report_timing = False
self._suffixes = []
self._log_file = None
self._soln_file = None
# overridden by a solver plugin when it returns sparse results
self._default_variable_value = None
# overridden by a solver plugin when it is always available
self._assert_available = False
# overridden by a solver plugin to indicate its input file format
self._problem_format = None
self._valid_problem_formats = []
# overridden by a solver plugin to indicate its results file format
self._results_format = None
self._valid_result_formats = {}
self._results_reader = None
self._problem = None
self._problem_files = None
#
# Used to document meta solvers
#
self._metasolver = False
self._version = None
#
# Data for solver callbacks
#
self._allow_callbacks = False
self._callback = {}
# We define no capabilities for the generic solver; base
# classes must override this
self._capabilities = Bunch()
@staticmethod
def _options_string_to_dict(istr):
ans = {}
istr = istr.strip()
if not istr:
return ans
if istr[0] == "'" or istr[0] == '"':
istr = eval(istr)
tokens = shlex.split(istr)
for token in tokens:
index = token.find('=')
if index == -1:
raise ValueError(
"Solver options must have the form option=value: '%s'" % istr
)
try:
val = eval(token[(index + 1) :])
except:
val = token[(index + 1) :]
ans[token[:index]] = val
return ans
def default_variable_value(self):
return self._default_variable_value
def __bool__(self):
return self.available()
[docs]
def version(self):
"""
Returns a 4-tuple describing the solver executable version.
"""
if self._version is None:
self._version = self._get_version()
return self._version
def _get_version(self):
return None
[docs]
def has_capability(self, cap):
"""
Returns a boolean value representing whether a solver supports
a specific feature. Defaults to 'False' if the solver is unaware
of an option. Expects a string.
Example:
# prints True if solver supports sos1 constraints, and False otherwise
print(solver.has_capability('sos1')
# prints True is solver supports 'feature', and False otherwise
print(solver.has_capability('feature')
Parameters
----------
cap: str
The feature
Returns
-------
val: bool
Whether or not the solver has the specified capability.
"""
if not isinstance(cap, str):
raise TypeError(
"Expected argument to be of type '%s', not "
"'%s'." % (type(str()), type(cap))
)
else:
val = self._capabilities[str(cap)]
if val is None:
return False
else:
return val
[docs]
def available(self, exception_flag=True):
"""True if the solver is available"""
return True
[docs]
def license_is_valid(self):
"True if the solver is present and has a valid license (if applicable)"
return True
[docs]
def warm_start_capable(self):
"""True is the solver can accept a warm-start solution"""
return False
[docs]
def solve(self, *args, **kwds):
"""Solve the problem"""
self.available(exception_flag=True)
#
# If the inputs are models, then validate that they have been
# constructed! Collect suffix names to try and import from solution.
#
from pyomo.core.base.block import BlockData
import pyomo.core.base.suffix
from pyomo.core.kernel.block import IBlock
import pyomo.core.kernel.suffix
_model = None
for arg in args:
if isinstance(arg, (BlockData, IBlock)):
if isinstance(arg, BlockData):
if not arg.is_constructed():
raise RuntimeError(
"Attempting to solve model=%s with unconstructed "
"component(s)" % (arg.name,)
)
_model = arg
# import suffixes must be on the top-level model
if isinstance(arg, BlockData):
model_suffixes = list(
name
for (
name,
comp,
) in pyomo.core.base.suffix.active_import_suffix_generator(arg)
)
else:
assert isinstance(arg, IBlock)
model_suffixes = list(
comp.storage_key
for comp in pyomo.core.kernel.suffix.import_suffix_generator(
arg, active=True, descend_into=False
)
)
if len(model_suffixes) > 0:
kwds_suffixes = kwds.setdefault('suffixes', [])
for name in model_suffixes:
if name not in kwds_suffixes:
kwds_suffixes.append(name)
#
# Handle ephemeral solvers options here. These
# will override whatever is currently in the options
# dictionary, but we will reset these options to
# their original value at the end of this method.
#
orig_options = self.options
self.options = Bunch()
self.options.update(orig_options)
self.options.update(kwds.pop('options', {}))
self.options.update(
self._options_string_to_dict(kwds.pop('options_string', ''))
)
try:
# we're good to go.
initial_time = time.time()
self._presolve(*args, **kwds)
presolve_completion_time = time.time()
if self._report_timing:
print(
" %6.2f seconds required for presolve"
% (presolve_completion_time - initial_time)
)
if not _model is None:
self._initialize_callbacks(_model)
_status = self._apply_solver()
if hasattr(self, '_transformation_data'):
del self._transformation_data
if not hasattr(_status, 'rc'):
logger.warning(
"Solver (%s) did not return a solver status code.\n"
"This is indicative of an internal solver plugin error.\n"
"Please report this to the Pyomo developers."
)
elif _status.rc:
logger.error(
"Solver (%s) returned non-zero return code (%s)"
% (self.name, _status.rc)
)
if self._tee:
logger.error("See the solver log above for diagnostic information.")
elif hasattr(_status, 'log') and _status.log:
logger.error("Solver log:\n" + str(_status.log))
raise ApplicationError("Solver (%s) did not exit normally" % self.name)
solve_completion_time = time.time()
if self._report_timing:
print(
" %6.2f seconds required for solver"
% (solve_completion_time - presolve_completion_time)
)
result = self._postsolve()
result._smap_id = self._smap_id
result._smap = None
if _model:
if isinstance(_model, IBlock):
if len(result.solution) == 1:
result.solution(0).symbol_map = getattr(
_model, "._symbol_maps"
)[result._smap_id]
result.solution(0).default_variable_value = (
self._default_variable_value
)
if self._load_solutions:
_model.load_solution(result.solution(0))
else:
assert len(result.solution) == 0
# see the hack in the write method
# we don't want this to stick around on the model
# after the solve
assert len(getattr(_model, "._symbol_maps")) == 1
delattr(_model, "._symbol_maps")
del result._smap_id
if self._load_solutions and (len(result.solution) == 0):
logger.error("No solution is available")
else:
if self._load_solutions:
_model.solutions.load_from(
result,
select=self._select_index,
default_variable_value=self._default_variable_value,
)
result._smap_id = None
result.solution.clear()
else:
result._smap = _model.solutions.symbol_map[self._smap_id]
_model.solutions.delete_symbol_map(self._smap_id)
postsolve_completion_time = time.time()
if self._report_timing:
print(
" %6.2f seconds required for postsolve"
% (postsolve_completion_time - solve_completion_time)
)
finally:
#
# Reset the options dict
#
self.options = orig_options
return result
def _presolve(self, *args, **kwds):
self._log_file = kwds.pop("logfile", None)
self._soln_file = kwds.pop("solnfile", None)
self._select_index = kwds.pop("select", 0)
self._load_solutions = kwds.pop("load_solutions", True)
self._timelimit = kwds.pop("timelimit", None)
self._report_timing = kwds.pop("report_timing", False)
self._tee = kwds.pop("tee", False)
self._assert_available = kwds.pop("available", True)
self._suffixes = kwds.pop("suffixes", [])
self.available()
if self._problem_format:
write_start_time = time.time()
self._problem_files, self._problem_format, self._smap_id = (
self._convert_problem(
args, self._problem_format, self._valid_problem_formats, **kwds
)
)
total_time = time.time() - write_start_time
if self._report_timing:
print(" %6.2f seconds required to write file" % total_time)
else:
if len(kwds):
raise ValueError(
"Solver="
+ self.type
+ " passed unrecognized keywords: \n\t"
+ ("\n\t".join("%s = %s" % (k, v) for k, v in kwds.items()))
)
if (type(self._problem_files) in (list, tuple)) and (
not isinstance(self._problem_files[0], str)
):
self._problem_files = self._problem_files[0]._problem_files()
if self._results_format is None:
self._results_format = self._default_results_format(self._problem_format)
#
# Disabling this check for now. A solver doesn't have just
# _one_ results format.
#
# if self._results_format not in \
# self._valid_result_formats[self._problem_format]:
# raise ValueError("Results format '"+str(self._results_format)+"' "
# "cannot be used with problem format '"
# +str(self._problem_format)+"' in solver "+self.name)
if self._results_format == ResultsFormat.soln:
self._results_reader = None
else:
self._results_reader = pyomo.opt.base.results.ReaderFactory(
self._results_format
)
def _initialize_callbacks(self, model):
"""Initialize call-back functions"""
pass
def _apply_solver(self):
"""The routine that performs the solve"""
raise NotImplementedError # pragma:nocover
def _postsolve(self):
"""The routine that does solve post-processing"""
return self.results
def _convert_problem(self, args, problem_format, valid_problem_formats, **kwds):
return convert_problem(
args, problem_format, valid_problem_formats, self.has_capability, **kwds
)
def _default_results_format(self, prob_format):
"""Returns the default results format for different problem
formats.
"""
return ResultsFormat.results
[docs]
def reset(self):
"""
Reset the state of the solver
"""
pass
def _get_options_string(self, options=None):
if options is None:
options = self.options
ans = []
for key in options:
val = options[key]
if isinstance(val, str) and ' ' in val:
ans.append("%s=\"%s\"" % (str(key), str(val)))
else:
ans.append("%s=%s" % (str(key), str(val)))
return ' '.join(ans)
def set_options(self, istr):
if isinstance(istr, str):
istr = self._options_string_to_dict(istr)
for key in istr:
if not istr[key] is None:
setattr(self.options, key, istr[key])
[docs]
def set_callback(self, name, callback_fn=None):
"""
Set the callback function for a named callback.
A call-back function has the form:
def fn(solver, model):
pass
where 'solver' is the native solver interface object and 'model' is
a Pyomo model instance object.
"""
if not self._allow_callbacks:
raise ApplicationError("Callbacks disabled for solver %s" % self.name)
if callback_fn is None:
if name in self._callback:
del self._callback[name]
else:
self._callback[name] = callback_fn
def config_block(self, init=False):
from pyomo.scripting.solve_config import default_config_block
return default_config_block(self, init)[0]