# ____________________________________________________________________________________
#
# Pyomo: Python Optimization Modeling Objects
# Copyright (c) 2008-2026 National Technology and Engineering Solutions of Sandia, LLC
# Under the terms of Contract DE-NA0003525 with National Technology and Engineering
# Solutions of Sandia, LLC, the U.S. Government retains certain rights in this
# software. This software is distributed under the 3-clause BSD License.
# ____________________________________________________________________________________
import logging
import os
import subprocess
import datetime
import time
import io
import re
import sys
import time
import threading
from typing import Optional, Tuple, Union, Mapping, List, Dict, Any, Sequence
from pyomo.common import Executable
from pyomo.common.config import (
ConfigDict,
ConfigList,
ConfigValue,
document_configdict,
document_class_CONFIG,
ADVANCED_OPTION,
)
from pyomo.common.errors import (
ApplicationError,
InfeasibleConstraintException,
MouseTrap,
)
from pyomo.common.fileutils import to_legal_filename
from pyomo.common.tempfiles import TempfileManager
from pyomo.common.timing import HierarchicalTimer, default_timer
from pyomo.core.base.var import VarData
from pyomo.core.staleflag import StaleFlagManager
from pyomo.repn.plugins.nl_writer import NLWriter, NLWriterInfo
from pyomo.contrib.solver.common.base import SolverBase, Availability
from pyomo.contrib.solver.common.config import SolverConfig
from pyomo.contrib.solver.common.factory import LegacySolverWrapper
from pyomo.contrib.solver.common.results import (
Results,
TerminationCondition,
SolutionStatus,
)
from pyomo.contrib.solver.solvers.asl_sol_reader import (
asl_solve_code_to_solution_status,
parse_asl_sol_file,
ASLSolFileData,
ASLSolFileSolutionLoader,
)
from pyomo.contrib.solver.common.util import NoOptimalSolutionError, NoSolutionError
from pyomo.common.tee import TeeStream
from pyomo.core.expr.visitor import replace_expressions
from pyomo.core.expr.numvalue import value
from pyomo.core.base.suffix import Suffix
from pyomo.common.collections import ComponentMap
from pyomo.solvers.amplfunc_merge import amplfunc_merge
logger = logging.getLogger(__name__)
# Acceptable chars for the end of the alpha_pr column
# in ipopt's output, per https://coin-or.github.io/Ipopt/OUTPUT.html
_ALPHA_PR_CHARS = set("fFhHkKnNRwSstTr")
def _option_to_cmd(opt: str, val: str | int | float):
"""Convert a option / value pair into a valid command line argument."""
if isinstance(val, str):
if '"' not in val:
return f'{opt}="{val}"'
elif "'" not in val:
return f"{opt}='{val}'"
else:
raise ValueError(
f"solver_option '{opt}' contained value {val!r} with "
"both single and double quotes. Ipopt cannot parse "
"command line options with escaped quote characters."
)
else:
return f'{opt}={val}'
[docs]
@document_configdict()
class IpoptConfig(SolverConfig):
[docs]
def __init__(
self,
description=None,
doc=None,
implicit=False,
implicit_domain=None,
visibility=0,
):
super().__init__(
description=description,
doc=doc,
implicit=implicit,
implicit_domain=implicit_domain,
visibility=visibility,
)
self.executable: Executable = self.declare(
'executable',
ConfigValue(
domain=Executable,
default='ipopt',
description="Preferred executable for ipopt. Defaults to searching "
"the ``PATH`` for the first available ``ipopt``.",
),
)
self.writer_config: ConfigDict = self.declare(
'writer_config', NLWriter.CONFIG()
)
[docs]
class IpoptSolutionLoader(ASLSolFileSolutionLoader):
[docs]
def get_reduced_costs(
self, vars_to_load: Optional[Sequence[VarData]] = None
) -> Mapping[VarData, float]:
if self._nl_info.eliminated_vars:
raise MouseTrap(
'Complete reduced costs are not available when variables have '
'been presolved from the model. Turn presolve off '
'(solver.config.writer_config.linear_presolve=False) to get '
'reduced costs.'
)
zl_map = self._sol_data.var_suffixes.get('ipopt_zL_out', {})
zu_map = self._sol_data.var_suffixes.get('ipopt_zU_out', {})
# TBD: is it an error if Ipopt fails to return RC info?
# if not (zl_map or zu_map):
# raise?
if self._nl_info.scaling:
# Unscale the zl and zu maps:
inv_obj_scale = 1.0
if self._nl_info.scaling.objectives:
inv_obj_scale /= self._nl_info.scaling.objectives[self._sol_data.objno]
var_scale = self._nl_info.scaling.variables
zl_map = {k: v * var_scale[k] * inv_obj_scale for k, v in zl_map.items()}
zu_map = {k: v * var_scale[k] * inv_obj_scale for k, v in zu_map.items()}
rc = ComponentMap()
for ndx, v in enumerate(self._nl_info.variables):
_rc = 0.0
if ndx in zl_map:
# Note *any* value in zl has an absolute value at least
# as big as 0. No need to test and just overwrite _rc:
_rc = zl_map[ndx]
if ndx in zu_map:
zu = zu_map[ndx]
if abs(zu) > abs(_rc):
_rc = zu
rc[v] = _rc
if vars_to_load is not None:
# Note vars_to_load could contain variables that were
# eliminated (so use get()):
rc = ComponentMap((v, rc.get(v, 0)) for v in vars_to_load)
return rc
#: The set of all ipopt options that can be passed to Ipopt on the command line
ipopt_command_line_options = {
'acceptable_compl_inf_tol',
'acceptable_constr_viol_tol',
'acceptable_dual_inf_tol',
'acceptable_tol',
'alpha_for_y',
'bound_frac',
'bound_mult_init_val',
'bound_push',
'bound_relax_factor',
'compl_inf_tol',
'constr_mult_init_max',
'constr_viol_tol',
'diverging_iterates_tol',
'dual_inf_tol',
'expect_infeasible_problem',
'file_print_level',
'halt_on_ampl_error',
'hessian_approximation',
'honor_original_bounds',
'linear_scaling_on_demand',
'linear_solver',
'linear_system_scaling',
'ma27_pivtol',
'ma27_pivtolmax',
'ma57_pivot_order',
'ma57_pivtol',
'ma57_pivtolmax',
'max_cpu_time',
'max_iter',
'max_refinement_steps',
'max_soc',
'maxit',
'min_refinement_steps',
'mu_init',
'mu_max',
'mu_oracle',
'mu_strategy',
'nlp_scaling_max_gradient',
'nlp_scaling_method',
'obj_scaling_factor',
'option_file_name',
'outlev',
'output_file',
'pardiso_matching_strategy',
'print_level',
'print_options_documentation',
'print_user_options',
'required_infeasibility_reduction',
'slack_bound_frac',
'slack_bound_push',
'tol',
'wantsol',
'warm_start_bound_push',
'warm_start_init_point',
'warm_start_mult_bound_push',
'watchdog_shortened_iter_trigger',
}
#: The set of options we forbid the user from setting (with reasons)
unallowed_ipopt_options = {
'wantsol': 'The solver interface requires the sol file to be created',
'option_file_name': (
'Pyomo generates the ipopt options file as part of the `solve` '
'method. Add all options to config.solver_options instead.'
),
}
[docs]
@document_class_CONFIG(methods=['solve'])
class Ipopt(SolverBase):
"""Interface to the Ipopt NLP solver (NL file based)"""
#: Global class configuration;
#: see :ref:`pyomo.contrib.solver.solvers.ipopt.Ipopt::CONFIG`.
CONFIG = IpoptConfig()
#: cache of availability / version information
_exe_cache: dict[str : tuple[int] | None] = {}
#: default timeout to use when attempting to get the ipopt version number
_version_timeout = 2
[docs]
def __init__(self, **kwds: Any) -> None:
super().__init__(**kwds)
#: Instance configuration;
#: see :ref:`pyomo.contrib.solver.solvers.ipopt.Ipopt::CONFIG`.
self.config = self.config
[docs]
def available(self) -> Availability:
return (
Availability.NotFound
if self.version() is None
else Availability.FullLicense
)
[docs]
def version(self) -> tuple[int, int, int] | None:
return self._get_version(self.config.executable.path())
def _get_version(self, exe):
try:
return self._exe_cache[exe]
except KeyError:
pass
if exe is None:
# No executable (either we couldn't find a matching file, or
# the file is not executable)
self._exe_cache[None] = None
return None
# Run the executable and look for the version
results = subprocess.run(
[str(exe), '--version'],
timeout=self._version_timeout,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
check=False,
)
# Note that we expect the command to run without error, AND that
# it returns a string starting "ipopt <version>". That prevents
# us from trying to use other (even ASL) executables as if they
# were ipopt
fields = results.stdout.split(maxsplit=2)
if results.returncode:
ver = None
elif len(fields) != 3 or fields[0].lower() != 'ipopt':
ver = None
else:
try:
ver = tuple(int(i) for i in fields[1].split('.'))
except (ValueError, TypeError):
ver = None
if ver is None:
logger.warning(
f"Failed parsing Ipopt version: '{exe} --version':\n\n{results.stdout}"
)
self._exe_cache[exe] = ver
return ver
[docs]
def has_linear_solver(self, linear_solver: str) -> bool:
"""Determine if Ipopt has access to the specified linear solver
This solves a small problem to detect if the Ipopt executable
has access to the specified linear solver.
Parameters
----------
linear_solver : str
The linear solver to test. Accepts any string that is valid
for the ``linear_solver`` Ipopt option.
"""
import pyomo.core as AML
m = AML.ConcreteModel()
m.x = AML.Var()
m.o = AML.Objective(expr=(m.x - 2) ** 2)
results = self.solve(
m,
tee=False,
raise_exception_on_nonoptimal_result=False,
load_solutions=False,
solver_options={'linear_solver': linear_solver},
)
return 'running with linear solver' in results.solver_log
[docs]
def solve(self, model, **kwds) -> Results:
"Solve a model using Ipopt"
# Begin time tracking
start_time = default_timer()
# Allocate the results object so we can populate it as we go
results = Results()
results.timing_info.start_timestamp = datetime.datetime.now(
datetime.timezone.utc
)
results.solver_name = self.name
# Update configuration options, based on keywords passed to solve
config: IpoptConfig = self.config(value=kwds, preserve_implicit=True)
timer = config.timer
if timer is None:
timer = config.timer = HierarchicalTimer()
# As we are about to run a solver, update the stale flag
StaleFlagManager.mark_all_as_stale()
with TempfileManager.new_context() as tempfile:
if config.working_dir is None:
dname = tempfile.mkdtemp()
else:
dname = config.working_dir
if not os.path.exists(dname):
os.mkdir(dname)
# Because we are just "making up" a file name, it is better
# to always generate a consistent and legal name, rather
# than blindly follow what the user gave us. We will use
# `universal=True` here to make sure that double quotes are
# translated, thereby guaranteeing that we should always
# generate a legal base name (unless, of course, the user
# put double quotes somewhere else in the path)
basename = to_legal_filename(model.name, universal=True)
nlfd, nl_fname = tempfile.mkstemp(
suffix='.nl', prefix=basename, dir=dname, text=True, delete=False
)
results.extra_info.base_file_name = basename = nl_fname[:-3]
for ext in ('.row', '.col', '.sol', '.opt'):
if os.path.exists(basename + ext):
raise RuntimeError(
f"Solver interface file {basename + ext} already exists!"
)
# Note: the ASL has an issue where string constants written
# to the NL file (e.g. arguments in external functions) MUST
# be terminated with '\n' regardless of platform. We will
# disable universal newlines in the NL file to prevent
# Python from mapping those '\n' to '\r\n' on Windows.
with (
os.fdopen(nlfd, 'w', newline='\n', encoding='utf-8') as nl_file,
open(basename + '.row', 'w', encoding='utf-8') as row_file,
open(basename + '.col', 'w', encoding='utf-8') as col_file,
):
timer.start('write_nl_file')
try:
# Note: this is mapping the top-level
# symbolic_solver_labels onto the solver's writer
# config, and then that config is being used (in
# it's entirety) to set the NLWriter's CONFIG.
nl_info = NLWriter().write(
model,
nl_file,
row_file,
col_file,
config=config.writer_config,
symbolic_solver_labels=config.symbolic_solver_labels,
)
proven_infeasible = False
except InfeasibleConstraintException:
proven_infeasible = True
nl_info = NLWriterInfo()
timer.stop('write_nl_file')
if proven_infeasible:
results.termination_condition = TerminationCondition.provenInfeasible
results.solution_status = SolutionStatus.noSolution
results.extra_info.iteration_count = 0
elif not nl_info.variables:
if nl_info.eliminated_vars:
results.termination_condition = (
TerminationCondition.convergenceCriteriaSatisfied
)
results.solution_status = SolutionStatus.optimal
results.solution_loader = IpoptSolutionLoader(
sol_data=ASLSolFileData(), nl_info=nl_info
)
else:
results.termination_condition = TerminationCondition.emptyModel
results.solution_status = SolutionStatus.noSolution
results.extra_info.iteration_count = 0
else:
self._run_ipopt(results, config, nl_info, basename, timer)
if (
config.raise_exception_on_nonoptimal_result
and results.solution_status != SolutionStatus.optimal
):
raise NoOptimalSolutionError()
if config.load_solutions:
if results.solution_status == SolutionStatus.noSolution:
raise NoSolutionError()
results.solution_loader.load_vars()
if (
hasattr(model, 'dual')
and isinstance(model.dual, Suffix)
and model.dual.import_enabled()
):
model.dual.update(results.solution_loader.get_duals())
if (
hasattr(model, 'rc')
and isinstance(model.rc, Suffix)
and model.rc.import_enabled()
):
model.rc.update(results.solution_loader.get_reduced_costs())
if (
results.solution_status in {SolutionStatus.feasible, SolutionStatus.optimal}
and len(nl_info.objectives) > 0
):
if config.load_solutions:
results.incumbent_objective = value(nl_info.objectives[0])
else:
results.incumbent_objective = value(
replace_expressions(
nl_info.objectives[0].expr,
substitution_map={
id(v): val
for v, val in results.solution_loader.get_primals().items()
},
descend_into_named_expressions=True,
remove_named_expressions=True,
)
)
results.solver_config = config
# Capture/record end-time / wall-time
results.timing_info.timer = timer
results.timing_info.wall_time = default_timer() - start_time
return results
def _process_options(
self, option_fname: str, options: dict[str, str | int | float]
) -> list[str]:
# Look through the solver options and separate the command line
# options from the options that must be sent via an options
# file. Raise an exception for any unallowable options.
options_file_options = []
cmd_line_options = []
for key, val in options.items():
if key in unallowed_ipopt_options:
msg = unallowed_ipopt_options[key]
raise ValueError(f"unallowed Ipopt option '{key}': {msg}")
elif key in ipopt_command_line_options:
cmd_line_options.append(_option_to_cmd(key, val))
else:
options_file_options.append(f"{key} {val}\n")
# create the options file (if we need it)
if options_file_options:
with open(option_fname, 'w', encoding='utf-8') as OPT_FILE:
OPT_FILE.writelines(options_file_options)
cmd_line_options.append(_option_to_cmd('option_file_name', option_fname))
# Return the (formatted) command line options
return cmd_line_options
def _run_ipopt(self, results, config, nl_info, basename, timer):
# Get a copy of the environment to pass to the subprocess
env = os.environ.copy()
if nl_info.external_function_libraries:
env['AMPLFUNC'] = amplfunc_merge(env, *nl_info.external_function_libraries)
# Get the Ipopt executable and start building the command line
exe = config.executable.path()
if not exe:
raise ApplicationError('ipopt executable not found')
cmd = [exe, basename + '.nl', '-AMPL']
# Process ipopt options (splitting them between command line
# options and those that must be passed through the opt file)
options = config.solver_options.value()
# Map standard Pyomo solver options to Ipopt options: standard
# options override ipopt-specific options.
if config.threads and config.threads != 1:
logger.log(
logging.WARNING,
msg=f"The `threads={config.threads}` option was specified, "
f"but this is not used by {self.__class__.__name__}.",
)
if config.time_limit is not None:
options['max_cpu_time'] = config.time_limit
cmd.extend(self._process_options(basename + '.opt', options))
results.solver_version = self._get_version(exe)
results.extra_info.add(
'command_line', ConfigValue(cmd, visibility=ADVANCED_OPTION)
)
# This seems silly, but we have to give the subprocess slightly
# longer to finish than ipopt, otherwise we may kill the
# subprocess before ipopt has a chance to write the SOL file.
# We will add 1% (with a min of 1 second and max of 100 seconds).
timeout = config.time_limit
if timeout is not None:
timeout = timeout + min(max(1.0, 0.01 * timeout), 100.0)
# Call ipopt - passing the files via the subprocess
ostreams = [io.StringIO()] + config.tee
timer.start('subprocess')
try:
with TeeStream(*ostreams) as t:
process = subprocess.run(
cmd,
timeout=timeout,
env=env,
universal_newlines=True,
stdout=t.STDOUT,
stderr=t.STDERR,
check=False,
)
except OSError:
err = sys.exc_info()[1]
msg = 'Could not execute the command: %s\tError message: %s'
raise ApplicationError(msg % (cmd, err))
finally:
timer.stop('subprocess')
results.solver_log = ostreams[0].getvalue()
results.extra_info.return_code = process.returncode
if process.returncode:
results.termination_condition = TerminationCondition.error
# This is the data we need to parse to get the iterations
# and time
timer.start('parse_log')
parsed_output_data = self._parse_ipopt_output(results.solver_log)
results.extra_info.iteration_count = parsed_output_data.pop('iters', None)
_timing = parsed_output_data.pop('cpu_seconds', None)
if _timing:
results.timing_info.update(_timing)
# Save the iteration log, but mark it as an "advanced" result
iter_log = parsed_output_data.pop('iteration_log', None)
if iter_log is not None:
results.extra_info.add(
'iteration_log', ConfigList(iter_log, visibility=ADVANCED_OPTION)
)
results.extra_info.update(parsed_output_data)
timer.stop('parse_log')
timer.start('parse_sol')
if os.path.isfile(basename + '.sol'):
with open(basename + '.sol', 'r', encoding='utf-8') as sol_file:
sol_data = parse_asl_sol_file(sol_file)
else:
sol_data = ASLSolFileData()
results.solution_loader = IpoptSolutionLoader(
sol_data=sol_data, nl_info=nl_info
)
timer.stop('parse_sol')
# Initialize the solver message, solution loader solution
# status and termination condition:
asl_solve_code_to_solution_status(sol_data, results)
def _parse_ipopt_output(self, output: str) -> Dict[str, Any]:
parsed_data = {}
# Stop parsing if there is nothing to parse
if not output:
logger.log(
logging.WARNING,
"Returned output from ipopt was empty. Cannot parse for additional data.",
)
return parsed_data
# Extract number of iterations
iter_match = re.search(r'Number of Iterations.*:\s+(\d+)', output)
if iter_match:
parsed_data['iters'] = int(iter_match.group(1))
# Gather all the iteration data
iter_table = re.findall(r'^(?:\s*\d+.*?)$', output, re.MULTILINE)
if iter_table:
columns = [
("iter", int),
("objective", float),
("inf_pr", float),
("inf_du", float),
("lg_mu", float),
("d_norm", float),
("lg_rg", float),
("alpha_du", float),
("alpha_pr", float),
("ls", int),
]
iterations = []
n_expected_columns = len(columns)
iter_idx = columns.index(('iter', int))
alpha_pr_idx = columns.index(('alpha_pr', float))
for line in iter_table:
tokens = line.strip().split()
# IPOPT sometimes mashes the first two column values together
# (e.g., "2r-4.93e-03"). We need to split them.
if '-' in tokens[iter_idx]:
# This happens rarely, so we are OK with this
# portion of the parser being a little less
# efficient (e.g., reallocating the tokens list, and
# performing index math)
tkn = tokens[iter_idx]
idx = tkn.index('-')
tokens[iter_idx : iter_idx + 1] = tkn[:idx], tkn[idx:]
# Extract restoration flag from 'iter'
restoration = tokens[iter_idx].endswith("r")
if restoration:
tokens[iter_idx] = tokens[iter_idx][:-1]
# Separate alpha_pr into numeric part and optional tag (f, D, R, etc.)
step_acceptance = tokens[alpha_pr_idx][-1]
if step_acceptance in _ALPHA_PR_CHARS:
tokens[alpha_pr_idx] = tokens[alpha_pr_idx][:-1]
else:
step_acceptance = None
try:
iter_data = {
key: None if t == '-' else cast(t)
for (key, cast), t in zip(columns, tokens)
}
except (ValueError, TypeError):
logger.error(
"Error parsing Ipopt log entry:\n"
f"\t{sys.exc_info()[1]}\n\t{line}"
)
# Fall-back on a simpler (but slower) parse: extract
# the fields, and cast to float what we can. The
# point here is the parser should never fail with an
# exception (even if it fails to parse some of the
# log)
iter_data = {}
for (key, cast), t in zip(columns, tokens):
if t == '-':
t = None
else:
try:
t = cast(t)
except:
pass
iter_data[key] = t
iter_data["restoration"] = restoration
iter_data["step_acceptance"] = step_acceptance
# Capture optional IPOPT diagnostic tags if present
if len(tokens) > n_expected_columns:
iter_data['diagnostic_tags'] = " ".join(tokens[n_expected_columns:])
iterations.append(iter_data)
parsed_data['iteration_log'] = iterations
if len(iterations) != parsed_data.get('iters', 0) + 1:
n_iter = parsed_data.get('iters', 0)
logger.warning(
f"Total number of iteration records parsed {len(iterations)} does "
f"not match the number of iterations ({n_iter}) plus one."
)
# Extract scaled and unscaled table
scaled_unscaled_match = re.search(
r'''
Objective\.*:\s*([-+eE0-9.]+)\s+([-+eE0-9.]+)\s*
Dual\ infeasibility\.*:\s*([-+eE0-9.]+)\s+([-+eE0-9.]+)\s*
Constraint\ violation\.*:\s*([-+eE0-9.]+)\s+([-+eE0-9.]+)\s*
(?:Variable\ bound\ violation:\s*([-+eE0-9.]+)\s+([-+eE0-9.]+)\s*)?
Complementarity\.*:\s*([-+eE0-9.]+)\s+([-+eE0-9.]+)\s*
Overall\ NLP\ error\.*:\s*([-+eE0-9.]+)\s+([-+eE0-9.]+)
''',
output,
re.DOTALL | re.VERBOSE,
)
if scaled_unscaled_match:
groups = scaled_unscaled_match.groups()
all_fields = [
"incumbent_objective",
"dual_infeasibility",
"constraint_violation",
"variable_bound_violation", # optional
"complementarity_error",
"overall_nlp_error",
]
# Filter out None values and create final fields and values.
# Nones occur in old-style IPOPT output (<= 3.13)
zipped = [
(field, scaled, unscaled)
for field, scaled, unscaled in zip(
all_fields, groups[0::2], groups[1::2]
)
if scaled is not None and unscaled is not None
]
scaled = {k: float(s) for k, s, _ in zipped}
unscaled = {k: float(u) for k, _, u in zipped}
parsed_data.update(unscaled)
parsed_data['final_scaled_results'] = scaled
# Newer versions of IPOPT no longer separate timing into
# two different values. This is so we have compatibility with
# both new and old versions
parsed_data['cpu_seconds'] = {
k.strip(): float(v)
for k, v in re.findall(
r'Total(?: CPU)? sec(?:ond)?s in ([^=]+)=\s*([0-9.]+)', output
)
}
return parsed_data
[docs]
class LegacyIpoptSolver(LegacySolverWrapper, Ipopt):
def _process_options(
self, option_fname: str, options: dict[str, str | int | float]
) -> list[str]:
# The old Ipopt solver would map solver_options starting with
# "OF_" to the options file. That is no longer needed, so we
# will strip off any "OF_" that we find
for opt in list(options):
if opt.startswith('OF_'):
options[opt[3:]] = options.pop(opt)
return super()._process_options(option_fname, options)