# ____________________________________________________________________________________
#
# Pyomo: Python Optimization Modeling Objects
# Copyright (c) 2008-2026 National Technology and Engineering Solutions of Sandia, LLC
# Under the terms of Contract DE-NA0003525 with National Technology and Engineering
# Solutions of Sandia, LLC, the U.S. Government retains certain rights in this
# software. This software is distributed under the 3-clause BSD License.
# ____________________________________________________________________________________
from collections import namedtuple
import enum
import itertools as it
import traceback
from pyomo.common.config import document_kwargs_from_configdict
from pyomo.common.errors import InfeasibleConstraintException
from pyomo.contrib.fbbt.fbbt import fbbt
from pyomo.contrib.gdpopt.algorithm_base_class import _GDPoptAlgorithm
from pyomo.contrib.gdpopt.create_oa_subproblems import (
add_util_block,
add_disjunction_list,
add_disjunct_list,
add_algebraic_variable_list,
add_boolean_variable_lists,
add_transformed_boolean_variable_list,
)
from pyomo.contrib.gdpopt.config_options import (
_add_nlp_solver_configs,
_add_ldsda_configs,
_add_mip_solver_configs,
_add_tolerance_configs,
_add_nlp_solve_configs,
)
from pyomo.contrib.gdpopt.nlp_initialization import restore_vars_to_original_values
from pyomo.contrib.gdpopt.util import SuppressInfeasibleWarning, get_main_elapsed_time
from pyomo.contrib.satsolver.satsolver import satisfiable
from pyomo.core import minimize, Suffix, TransformationFactory, Objective, value
from pyomo.opt import SolverFactory
from pyomo.opt import TerminationCondition as tc
from pyomo.core.expr.logical_expr import ExactlyExpression
[docs]
class DirectionNorm(str, enum.Enum):
"""
Norm type for search direction generation in LD-SDA.
Attributes
----------
L2 : str
Standard basis vectors (2n directions for n external variables).
Linf : str
All combinations of {-1, 0, 1} excluding the zero vector (3^n - 1 directions).
"""
L2 = 'L2'
Linf = 'Linf'
def __str__(self):
return self.value
[docs]
class SearchPhase(str, enum.Enum):
"""
Phase of the LD-SDA search algorithm.
Attributes
----------
INITIAL : str
Initial point evaluation.
NEIGHBOR : str
Neighbor search phase.
LINE : str
Line search phase.
"""
INITIAL = 'Initial point'
NEIGHBOR = 'Neighbor search'
LINE = 'Line search'
def __str__(self):
return self.value
# Data tuple for external variables.
ExternalVarInfo = namedtuple(
'ExternalVarInfo',
[
'exactly_number', # number of external variables for this type
'Boolean_vars', # list with names of the ordered Boolean variables to be reformulated
'UB', # upper bound on external variable
'LB', # lower bound on external variable
],
)
[docs]
@SolverFactory.register(
'gdpopt.ldsda',
doc="The LD-SDA (Logic-based Discrete-Steepest Descent Algorithm) "
"Generalized Disjunctive Programming (GDP) solver",
)
class GDP_LDSDA_Solver(_GDPoptAlgorithm):
"""
The GDPopt (Generalized Disjunctive Programming optimizer) LD-SDA
(Logic-based Discrete-Steepest Descent Algorithm) solver.
This solver accepts models that can include nonlinear, continuous variables
and constraints, as well as logical conditions. It uses a discrete steepest
descent approach to explore the space of discrete variables (disjunctions)
while solving NLP subproblems for the continuous variables.
References
----------
Ovalle, D.; Liñán, D. A.; Lee, A.; Gómez, J. M.; Ricardez-Sandoval, L.; Grossmann, I. E.; Bernal Neira, D. E. Logic-Based Discrete-Steepest Descent: A Solution Method for Process Synthesis Generalized Disjunctive Programs. Computers & Chemical Engineering 2025, 195, 108993. https://doi.org/10.1016/j.compchemeng.2024.108993.
"""
CONFIG = _GDPoptAlgorithm.CONFIG()
_add_mip_solver_configs(CONFIG)
_add_nlp_solver_configs(CONFIG, default_solver='ipopt')
_add_nlp_solve_configs(
CONFIG, default_nlp_init_method=restore_vars_to_original_values
)
_add_tolerance_configs(CONFIG)
_add_ldsda_configs(CONFIG)
algorithm = 'LDSDA'
# Override solve() to customize the docstring for this solver
[docs]
@document_kwargs_from_configdict(CONFIG, doc=_GDPoptAlgorithm.solve.__doc__)
def solve(self, model, **kwds):
return super().solve(model, **kwds)
def _log_citation(self, config):
config.logger.info("\n" + """- LDSDA algorithm:
Bernal DE, Ovalle D, Liñán DA, Ricardez-Sandoval LA, Gómez JM, Grossmann IE.
Process Superstructure Optimization through Discrete Steepest Descent Optimization: a GDP Analysis and Applications in Process Intensification.
Computer Aided Chemical Engineering 2022 Jan 1 (Vol. 49, pp. 1279-1284). Elsevier.
https://doi.org/10.1016/B978-0-323-85159-6.50213-X
""".strip())
def _solve_gdp(self, model, config):
"""
Execute the main LD-SDA algorithm logic.
Initializes the utility blocks, reformulates the model, solves the
initial point, and enters the main search loop (Neighbor Search and
Line Search) until a local optimum is found or termination criteria
are met.
Parameters
----------
model : ConcreteModel
The GDP model to be solved.
config : ConfigBlock
The configuration block containing solver options.
"""
logger = config.logger
self.log_formatter = (
'{:>9} {:>15} {:>20} {:>11.5f} {:>11.5f} {:>8.2%} {:>7.2f} {}'
)
self.best_direction = None
self.current_point = tuple(config.starting_point)
self.explored_point_set = set()
# Create utility block on the original model so that we will be able to
# copy solutions between
util_block = self.original_util_block = add_util_block(model)
add_disjunct_list(util_block)
add_algebraic_variable_list(util_block)
add_boolean_variable_lists(util_block)
util_block.config_disjunction_list = config.disjunction_list
util_block.config_logical_constraint_list = config.logical_constraint_list
# We will use the working_model to perform the LDSDA search.
self.working_model = model.clone()
self.working_model_util_block = self.working_model.find_component(util_block)
add_disjunction_list(self.working_model_util_block)
TransformationFactory('core.logical_to_linear').apply_to(self.working_model)
# Now that logical_to_disjunctive has been called.
add_transformed_boolean_variable_list(self.working_model_util_block)
self.get_external_information(self.working_model_util_block, config)
self.directions = self._get_directions(
self.number_of_external_variables, config
)
# Add the BigM suffix if it does not already exist. Used later during
# nonlinear constraint activation.
if not hasattr(self.working_model_util_block, 'BigM'):
self.working_model_util_block.BigM = Suffix()
self._log_header(logger)
# Solve the initial point
_ = self._solve_GDP_subproblem(self.current_point, SearchPhase.INITIAL, config)
# Main loop
locally_optimal = False
while not locally_optimal:
self.iteration += 1
if self.any_termination_criterion_met(config):
break
locally_optimal = self.neighbor_search(config)
if not locally_optimal:
self.line_search(config)
[docs]
def any_termination_criterion_met(self, config):
"""
Check if any termination criteria (iteration limit or time limit) have been met.
Parameters
----------
config : ConfigBlock
The configuration block containing limits.
Returns
-------
bool
True if either the iteration limit or time limit has been reached,
False otherwise.
"""
return self.reached_iteration_limit(config) or self.reached_time_limit(config)
def _solve_GDP_subproblem(self, external_var_value, search_type, config):
"""
Solve the GDP subproblem with disjunctions fixed according to the external variable values.
This method fixes the Boolean variables based on the `external_var_value`,
applies necessary transformations (BigM, FBBT), and solves the resulting
MINLP/NLP using the configured solver.
Parameters
----------
external_var_value : tuple or list
The values of the external variables (indices of active disjuncts)
defining the current point in the discrete space.
search_type : SearchPhase
The context of the solve (SearchPhase.INITIAL, SearchPhase.NEIGHBOR, or SearchPhase.LINE).
config : ConfigBlock
The configuration block containing solver options.
Returns
-------
primal_improved : bool
True if the solution of this subproblem improved the best known
primal bound (incumbent).
primal_bound : float
The objective value (primal bound) obtained from the subproblem.
Returns None if the subproblem was infeasible.
"""
self.fix_disjunctions_with_external_var(external_var_value)
subproblem = self.working_model.clone()
TransformationFactory('core.logical_to_linear').apply_to(subproblem)
with SuppressInfeasibleWarning():
try:
TransformationFactory('gdp.bigm').apply_to(subproblem)
fbbt(subproblem, integer_tol=config.integer_tolerance)
TransformationFactory('contrib.detect_fixed_vars').apply_to(subproblem)
TransformationFactory('contrib.propagate_fixed_vars').apply_to(
subproblem
)
TransformationFactory(
'contrib.deactivate_trivial_constraints'
).apply_to(subproblem, tmp=False, ignore_infeasible=False)
except InfeasibleConstraintException:
return False, None
minlp_args = dict(config.minlp_solver_args)
if config.time_limit is not None and config.minlp_solver == 'gams':
elapsed = get_main_elapsed_time(self.timing)
remaining = max(config.time_limit - elapsed, 1)
minlp_args['add_options'] = minlp_args.get('add_options', [])
minlp_args['add_options'].append('option reslim=%s;' % remaining)
result = SolverFactory(config.minlp_solver).solve(subproblem, **minlp_args)
primal_improved = self._handle_subproblem_result(
result, subproblem, external_var_value, config, search_type
)
# Only retrieve primal_bound if the solve succeeded; otherwise return None
if primal_improved:
obj = next(subproblem.component_data_objects(Objective, active=True))
primal_bound = value(obj)
else:
primal_bound = None
return primal_improved, primal_bound
[docs]
def fix_disjunctions_with_external_var(self, external_var_values_list):
"""
Fix the disjunctions in the working model based on external variable values.
Maps the integer values in `external_var_values_list` to the corresponding
Boolean variables in the model, fixing the selected one to True and
others to False for each logical group.
Parameters
----------
external_var_values_list : list or tuple
The list of integer values representing the active disjunct index
for each external variable.
"""
for external_variable_value, external_var_info in zip(
external_var_values_list,
self.working_model_util_block.external_var_info_list,
):
for idx, boolean_var in enumerate(external_var_info.Boolean_vars):
if idx == external_variable_value - 1:
boolean_var.fix(True)
if boolean_var.get_associated_binary() is not None:
boolean_var.get_associated_binary().fix(1)
else:
boolean_var.fix(False)
if boolean_var.get_associated_binary() is not None:
boolean_var.get_associated_binary().fix(0)
self.explored_point_set.add(tuple(external_var_values_list))
def _get_directions(self, dimension, config):
"""
Generate the search directions for the given dimension.
Parameters
----------
dimension : int
The dimensionality of the neighborhood (number of external variables).
config : ConfigBlock
The configuration block specifying the norm ('L2' or 'Linf').
Returns
-------
list of tuple
A list of direction vectors (tuples).
- If DirectionNorm.L2: Standard basis vectors and their negatives.
- If DirectionNorm.Linf: All combinations of {-1, 0, 1} excluding the zero vector.
"""
if config.direction_norm == DirectionNorm.L2:
directions = []
for i in range(dimension):
directions.append(tuple([0] * i + [1] + [0] * (dimension - i - 1)))
directions.append(tuple([0] * i + [-1] + [0] * (dimension - i - 1)))
return directions
elif config.direction_norm == DirectionNorm.Linf:
directions = list(it.product([-1, 0, 1], repeat=dimension))
directions.remove((0,) * dimension) # Remove the zero direction
return directions
def _check_valid_neighbor(self, neighbor):
"""
Check if a given neighbor point is valid.
A neighbor is valid if it has not been explored yet and lies within
the defined bounds (LB and UB) of the external variables.
Parameters
----------
neighbor : tuple
The coordinates of the neighbor point to check.
Returns
-------
bool
True if the neighbor is valid (unexplored and within bounds),
False otherwise.
"""
if neighbor in self.explored_point_set:
return False
return all(
external_var_value >= external_var_info.LB
and external_var_value <= external_var_info.UB
for external_var_value, external_var_info in zip(
neighbor, self.working_model_util_block.external_var_info_list
)
)
[docs]
def neighbor_search(self, config):
"""
Evaluate immediate neighbors of the current point to find a better solution.
Iterates through all search directions, generates neighbors, and solves
their subproblems. Uses a tie-breaking mechanism favoring points farther
away (Euclidean distance) if objective values are within tolerance.
Parameters
----------
config : ConfigBlock
The configuration block containing solver options.
Returns
-------
bool
True if the current point is locally optimal (no better neighbor found),
False if a better neighbor was found (current point updated).
"""
locally_optimal = True
best_neighbor = None
self.best_direction = None # reset best direction
fmin = float('inf') # Initialize the best objective value
best_dist = 0 # Initialize the best distance
abs_tol = (
config.integer_tolerance
) # Use integer_tolerance for objective comparison
# Loop through all possible directions (neighbors)
for direction in self.directions:
# Generate a neighbor point by applying the direction to the current point
neighbor = tuple(map(sum, zip(self.current_point, direction)))
# Check if the neighbor is valid
if self._check_valid_neighbor(neighbor):
# Solve the subproblem for this neighbor
primal_improved, primal_bound = self._solve_GDP_subproblem(
neighbor, SearchPhase.NEIGHBOR, config
)
if primal_improved:
locally_optimal = False
# --- Tiebreaker Logic ---
if abs(fmin - primal_bound) < abs_tol:
# Calculate the squared Euclidean distance from the current point
dist = sum(
(x - y) ** 2 for x, y in zip(neighbor, self.current_point)
)
# Update the best neighbor if this one is farther away
if dist > best_dist:
best_neighbor = neighbor
self.best_direction = direction
best_dist = dist # Update the best distance
else:
# Standard improvement logic: update if the objective is better
fmin = primal_bound # Update the best objective value
best_neighbor = neighbor # Update the best neighbor
self.best_direction = direction # Update the best direction
best_dist = sum(
(x - y) ** 2 for x, y in zip(neighbor, self.current_point)
)
# --- End of Tiebreaker Logic ---
# Move to the best neighbor if an improvement was found
if not locally_optimal:
self.current_point = best_neighbor
return locally_optimal
[docs]
def line_search(self, config):
"""
Perform a line search along the best direction found by the neighbor search.
Continues moving in `self.best_direction` as long as the objective
function value improves.
Parameters
----------
config : ConfigBlock
The configuration block containing solver options.
"""
primal_improved = True
while primal_improved:
next_point = tuple(map(sum, zip(self.current_point, self.best_direction)))
if self._check_valid_neighbor(next_point):
# Unpack the tuple and use only the first boolean value
primal_improved, _ = self._solve_GDP_subproblem(
next_point, SearchPhase.LINE, config
)
if primal_improved:
self.current_point = next_point
else:
break
else:
break
def _handle_subproblem_result(
self, subproblem_result, subproblem, external_var_value, config, search_type
):
"""
Process the result of a subproblem solve.
Checks termination conditions, updates primal bounds if valid, and
logs the state.
Parameters
----------
subproblem_result : SolverResults
The result object returned by the solver.
subproblem : ConcreteModel
The subproblem model instance.
external_var_value : tuple
The external variable configuration used for this subproblem.
config : ConfigBlock
The configuration block.
search_type : SearchPhase
The type of search (SearchPhase.NEIGHBOR, etc.).
Returns
-------
bool
True if the result improved the current best primal bound,
False otherwise.
"""
if subproblem_result is None:
return False
if subproblem_result.solver.termination_condition in {
tc.optimal,
tc.feasible,
tc.globallyOptimal,
tc.locallyOptimal,
tc.maxTimeLimit,
tc.maxIterations,
tc.maxEvaluations,
}:
primal_bound = (
subproblem_result.problem.upper_bound
if self.objective_sense == minimize
else subproblem_result.problem.lower_bound
)
primal_improved = self._update_bounds_after_solve(
search_type,
primal=primal_bound,
logger=config.logger,
current_point=external_var_value,
)
if primal_improved:
self.update_incumbent(
subproblem.component(self.original_util_block.name)
)
return primal_improved
return False
def _log_header(self, logger):
logger.info(
'================================================================='
'===================================='
)
logger.info(
'{:^9} | {:^15} | {:^20} | {:^11} | {:^11} | {:^8} | {:^7}\n'.format(
'Iteration',
'Search Type',
'External Variables',
'Lower Bound',
'Upper Bound',
'Gap',
'Time(s)',
)
)
def _log_current_state(
self, logger, search_type, current_point, primal_improved=False
):
star = "*" if primal_improved else ""
logger.info(
self.log_formatter.format(
self.iteration,
search_type,
str(current_point),
self.LB,
self.UB,
self.relative_gap(),
get_main_elapsed_time(self.timing),
star,
)
)
def _update_bounds_after_solve(
self, search_type, primal=None, dual=None, logger=None, current_point=None
):
primal_improved = self._update_bounds(primal, dual)
if logger is not None:
self._log_current_state(logger, search_type, current_point, primal_improved)
return primal_improved