Source code for pyomo.contrib.solver.solvers.gams

# ____________________________________________________________________________________
#
# Pyomo: Python Optimization Modeling Objects
# Copyright (c) 2008-2026 National Technology and Engineering Solutions of Sandia, LLC
# Under the terms of Contract DE-NA0003525 with National Technology and Engineering
# Solutions of Sandia, LLC, the U.S. Government retains certain rights in this
# software.  This software is distributed under the 3-clause BSD License.
# ____________________________________________________________________________________

import logging
import os
import subprocess
import time
import datetime
from io import StringIO
import sys
import struct
import re
import pathlib

from pyomo.common.dependencies import attempt_import
from pyomo.common.fileutils import Executable, ExecutableData
from pyomo.common.config import (
    ConfigValue,
    ConfigDict,
    document_configdict,
    Path,
    document_class_CONFIG,
)
from pyomo.common.tempfiles import TempfileManager
from pyomo.common.timing import HierarchicalTimer
from pyomo.core.base import value, Objective
from pyomo.core.staleflag import StaleFlagManager
from pyomo.contrib.solver.common.base import SolverBase, Availability
from pyomo.contrib.solver.common.config import SolverConfig
from pyomo.contrib.solver.common.results import (
    Results,
    SolutionStatus,
    TerminationCondition,
)
from pyomo.contrib.solver.solvers.gms_sol_reader import GMSSolutionLoader

import pyomo.core.base.suffix
from pyomo.common.tee import TeeStream
from pyomo.core.expr.visitor import replace_expressions
from pyomo.core.base.suffix import Suffix
from pyomo.common.errors import ApplicationError
from pyomo.contrib.solver.common.util import NoOptimalSolutionError
from pyomo.repn.plugins.gams_writer_v2 import GAMSWriter

logger = logging.getLogger(__name__)


def _gams_importer():
    try:
        import gams.core.gdx as gdxcc
    except ImportError:
        try:
            # fall back to the pre-GAMS-45.0 API
            import gdxcc
        except:
            # suppress the error from the old API and reraise the
            # current API import error
            pass
        raise
    return gdxcc


gdxcc, gdxcc_available = attempt_import('gdxcc', importer=_gams_importer)


[docs] @document_configdict() class GAMSConfig(SolverConfig):
[docs] def __init__( self, description=None, doc=None, implicit=False, implicit_domain=None, visibility=0, ): super().__init__( description=description, doc=doc, implicit=implicit, implicit_domain=implicit_domain, visibility=visibility, ) self.executable: ExecutableData = self.declare( 'executable', ConfigValue( domain=Executable, default='gams', description="Executable for gams. Defaults to searching the " "``PATH`` for the first available ``gams``.", ), ) self.logfile: str = self.declare( 'logfile', ConfigValue( domain=Path(), default=None, description="Filename to output GAMS log to a file.", ), ) self.writer_config: ConfigDict = self.declare( 'writer_config', GAMSWriter.CONFIG() )
[docs] @document_class_CONFIG(methods=['solve']) class GAMS(SolverBase): #: Global class configuration; #: see :ref:`pyomo.contrib.solver.solvers.gams.GAMS::CONFIG`. CONFIG = GAMSConfig() # default behaviour of gams is to print to console, for # compatibility with windows and *nix we want to explicitly log to # stdout (see https://www.gams.com/latest/docs/UG_GamsCall.html) _log_levels = { (True, False): "lo=3", (False, False): "lo=0", (False, True): "lo=2", (True, True): "lo=4", } #: cache of availability / version information _exe_cache: dict = {None: (None, Availability.NotFound)} # mapping for GAMS SOLVESTAT _SOLVER_STATUS_LOOKUP = { 1: None, 2: TerminationCondition.iterationLimit, 3: TerminationCondition.maxTimeLimit, 4: TerminationCondition.error, 5: TerminationCondition.iterationLimit, 6: TerminationCondition.error, 7: TerminationCondition.licensingProblems, 8: TerminationCondition.interrupted, 9: TerminationCondition.error, 10: TerminationCondition.error, 11: TerminationCondition.error, 12: TerminationCondition.error, 13: TerminationCondition.error, } # mapping for GAMS MODELSTAT _MODEL_STATUS_LOOKUP = { 1: (SolutionStatus.optimal, TerminationCondition.convergenceCriteriaSatisfied), 2: (SolutionStatus.feasible, TerminationCondition.convergenceCriteriaSatisfied), 3: (SolutionStatus.noSolution, TerminationCondition.unbounded), 4: (SolutionStatus.infeasible, TerminationCondition.provenInfeasible), 5: (SolutionStatus.infeasible, TerminationCondition.locallyInfeasible), 6: (SolutionStatus.infeasible, TerminationCondition.unknown), 7: (SolutionStatus.feasible, TerminationCondition.unknown), 8: (SolutionStatus.feasible, TerminationCondition.unknown), 9: (SolutionStatus.noSolution, TerminationCondition.unknown), 10: (SolutionStatus.infeasible, TerminationCondition.infeasibleOrUnbounded), 11: (SolutionStatus.noSolution, TerminationCondition.licensingProblems), 12: (SolutionStatus.noSolution, TerminationCondition.unknown), 13: (SolutionStatus.noSolution, TerminationCondition.error), 14: (SolutionStatus.noSolution, TerminationCondition.error), 15: (SolutionStatus.optimal, TerminationCondition.convergenceCriteriaSatisfied), 16: ( SolutionStatus.feasible, TerminationCondition.convergenceCriteriaSatisfied, ), 17: ( SolutionStatus.feasible, TerminationCondition.convergenceCriteriaSatisfied, ), 18: (SolutionStatus.noSolution, TerminationCondition.unbounded), 19: (SolutionStatus.infeasible, TerminationCondition.provenInfeasible), }
[docs] def __init__(self, **kwds): super().__init__(**kwds) #: Instance configuration; #: see :ref:`pyomo.contrib.solver.solvers.gams.GAMS::CONFIG`. self.config = self.config
[docs] def available(self) -> Availability: ver, avail = self._get_version(self.config.executable.path()) return avail
[docs] def version(self) -> tuple[int, int, int] | None: ver, avail = self._get_version(self.config.executable.path()) return ver
def _get_version(self, exe: str | None): # check the cache if exe in self._exe_cache: return self._exe_cache[exe] # Note: non-None str paths are guaranteed to exist and be executable files res = subprocess.run( [exe], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, encoding="utf8", check=False, ) if res.returncode: params = (None, Availability.NotFound) self._exe_cache[exe] = params out = res.stdout.replace('\n', '\n ').strip() logger.warning( "Failed running GAMS command to get version (non-zero returncode " f"{res.returncode}):\n {out}" ) return params version_pattern = r"GAMS Release *: *(\d{1,3}\.\d{1,3}\.\d{1,3})" found = re.search(version_pattern, res.stdout) if not found: params = (None, Availability.NotFound) self._exe_cache[exe] = params out = res.stdout.replace('\n', '\n ').strip() logger.warning( "Failed parsing GAMS version (version not found while parsing):" f"\n {out}" ) return params version = found.group(1) version = tuple(int(i) for i in version.split('.')) # TBD: does this also catch Community licenses? if "GAMS Demo" in res.stdout: avail = Availability.LimitedLicense else: avail = Availability.FullLicense # TBD: should we run a small problem (1-variable LP) to make # sure the license is still valid? Alternatively, we can leave # Availability as NOTSET until someone explicitly *asks* for # .available(), in which case we can justify running the small # model (potentially requiring us to check out a license from a # license server). params = (version, avail) self._exe_cache[exe] = params return params
[docs] def solve(self, model, **kwds): #################################################################### # Presolve #################################################################### # Begin time tracking start_timestamp = datetime.datetime.now(datetime.timezone.utc) tick = time.perf_counter() # Update configuration options, based on keywords passed to solve # preserve_implicit=True is required to extract solver_options ConfigDict config: GAMSConfig = self.config(value=kwds, preserve_implicit=True) # Check if the solver executable exists: if not config.executable: raise ApplicationError( f"{self.__class__.__name__}: 'gams' executable not found" ) if config.timer is None: config.timer = HierarchicalTimer() timer = config.timer StaleFlagManager.mark_all_as_stale() # update the writer config if any of the overlapping # keys exists in the solver_options for key, val in config.solver_options.items(): config.writer_config.gams_commands.append(f"option {key}={val};") if not config.writer_config.put_results_format: config.writer_config.put_results_format = ( 'gdx' if gdxcc_available else 'dat' ) # Copy resLim last so that time_limit overrides any limit set in # solver_options (or writer_config.gams_commands) if config.time_limit is not None: config.writer_config.gams_commands.append( f"option resLim={config.time_limit};" ) # local variable to hold the working directory name and flags dname = None lst = "output.lst" model_name = "model" output_filename = None with TempfileManager.new_context() as tempfile: # IMPORTANT - only delete the whole tmpdir if the solver was the one # that made the directory. Otherwise, just delete the files the solver # made, if not keepfiles. That way the user can select a directory # they already have, like the current directory, without having to # worry about the rest of the contents of that directory being deleted. if not config.working_dir: dname = tempfile.mkdtemp() else: dname = config.working_dir if not os.path.exists(dname): os.mkdir(dname) basename = os.path.join(dname, model_name) output_filename = basename + '.gms' lst_filename = os.path.join(dname, lst) timer.start(f'write_gms_file') with open(output_filename, 'w', newline='\n', encoding='utf-8') as gms_file: gms_info = GAMSWriter().write( model, gms_file, config=config.writer_config ) # NOTE: omit InfeasibleConstraintException for now timer.stop(f'write_gms_file') if config.writer_config.put_results_format == 'gdx': results_filename = os.path.join(dname, "GAMS_MODEL_p.gdx") statresults_filename = os.path.join( dname, "%s_s.gdx" % (config.writer_config.put_results,) ) else: results_filename = os.path.join( dname, "%s.dat" % (config.writer_config.put_results,) ) statresults_filename = os.path.join( dname, "%sstat.dat" % (config.writer_config.put_results,) ) #################################################################### # Apply solver #################################################################### exe_path = config.executable.path() command = [exe_path, output_filename, "o=" + lst, "curdir=" + dname] # handled tee and logfile based on the length of list and # string respectively command.append(self._log_levels[(bool(config.tee), bool(config.logfile))]) ostreams = [StringIO()] if config.tee: ostreams.append(sys.stdout) with TeeStream(*ostreams) as t: timer.start('subprocess') subprocess_result = subprocess.run( command, stdout=t.STDOUT, stderr=t.STDERR, cwd=dname ) timer.stop('subprocess') rc = subprocess_result.returncode txt = ostreams[0].getvalue() if config.working_dir: logger.info("\nGAMS WORKING DIRECTORY: %s\n" % config.working_dir) if rc: # If nothing was raised, or for all other cases, raise this error_message = f"GAMS process encountered an error (returncode={rc})." if rc == 3: # Execution Error # Run check_expr_evaluation, which errors if necessary error_message += ( "\nError rc=3 (GAMS execution error), to be determined later." ) error_message += "\nCheck listing file for details.\n" logger.error(error_message) logger.error(txt.strip()) if os.path.exists(lst_filename): with open(lst_filename, 'r') as FILE: logger.error( "\nGAMS Listing file:\n\n%s" % (FILE.read().strip(),) ) raise RuntimeError(error_message) timer.start('parse_results') if config.writer_config.put_results_format == 'gdx': model_soln, stat_vars = self._parse_gdx_results( config, results_filename, statresults_filename ) else: model_soln, stat_vars = self._parse_dat_results( config, results_filename, statresults_filename ) timer.stop('parse_results') #################################################################### # Postsolve (WIP) results = self._postsolve( model, timer, config, model_soln, stat_vars, gms_info ) results.solver_config = config results.solver_log = ostreams[0].getvalue() tock = time.perf_counter() results.timing_info.start_timestamp = start_timestamp results.timing_info.wall_time = tock - tick results.timing_info.timer = timer return results
def _postsolve(self, model, timer, config, model_soln, stat_vars, gms_info): model_suffixes = list( name for (name, comp) in pyomo.core.base.suffix.active_import_suffix_generator( model ) ) extract_dual = 'dual' in model_suffixes extract_rc = 'rc' in model_suffixes results = Results() results.solver_name = "GAMS " results.solver_version = self.version() # --- Process GAMS SOLVESTAT --- solvestat = stat_vars["SOLVESTAT"] solver_term = GAMS._SOLVER_STATUS_LOOKUP.get( solvestat, TerminationCondition.unknown ) # specific message for status 4 if solvestat == 4: results.message = "Solver quit with a problem (see LST file)" # --- Process GAMS MODELSTAT --- modelstat = stat_vars["MODELSTAT"] solution_status, model_term = GAMS._MODEL_STATUS_LOOKUP.get( modelstat, (SolutionStatus.noSolution, TerminationCondition.unknown) ) # --- Populate 'results' results.solution_status = solution_status # replaced below, if solution should be loaded results.solution_loader = GMSSolutionLoader(None, None) if solvestat == 1: results.termination_condition = model_term else: results.termination_condition = solver_term # extra_info (save GAMS status codes) results.extra_info.gams_solvestat = solvestat results.extra_info.gams_modelstat = modelstat # Taken from ipopt.py if ( config.raise_exception_on_nonoptimal_result and results.solution_status != SolutionStatus.optimal ): raise NoOptimalSolutionError() obj = list(model.component_data_objects(Objective, active=True)) if results.solution_status in {SolutionStatus.feasible, SolutionStatus.optimal}: results.solution_loader = GMSSolutionLoader( gdx_data=model_soln, gms_info=gms_info ) if config.load_solutions: results.solution_loader.load_vars() if len(obj) == 1: results.incumbent_objective = stat_vars["OBJVAL"] else: results.incumbent_objective = None if ( hasattr(model, 'dual') and isinstance(model.dual, Suffix) and model.dual.import_enabled() ): model.dual.update(results.solution_loader.get_duals()) if ( hasattr(model, 'rc') and isinstance(model.rc, Suffix) and model.rc.import_enabled() ): model.rc.update(results.solution_loader.get_reduced_costs()) else: results.incumbent_objective = value( replace_expressions( obj[0].expr, substitution_map={ id(v): val for v, val in results.solution_loader.get_primals().items() }, descend_into_named_expressions=True, remove_named_expressions=True, ) ) return results def _parse_gdx_results(self, config, results_filename, statresults_filename): model_soln = dict() stat_vars = dict.fromkeys( [ 'MODELSTAT', 'SOLVESTAT', 'OBJEST', 'OBJVAL', 'NUMVAR', 'NUMEQU', 'NUMDVAR', 'NUMNZ', 'ETSOLVE', ] ) pgdx = gdxcc.new_gdxHandle_tp() ret = gdxcc.gdxCreateD(pgdx, os.path.dirname(config.executable.path()), 128) if not ret[0]: raise RuntimeError("GAMS GDX failure (gdxCreate): %s." % ret[1]) if os.path.exists(statresults_filename): ret = gdxcc.gdxOpenRead(pgdx, statresults_filename) if not ret[0]: raise RuntimeError("GAMS GDX failure (gdxOpenRead): %d." % ret[1]) specVals = gdxcc.doubleArray(gdxcc.GMS_SVIDX_MAX) rc = gdxcc.gdxGetSpecialValues(pgdx, specVals) specVals[gdxcc.GMS_SVIDX_EPS] = sys.float_info.min specVals[gdxcc.GMS_SVIDX_UNDEF] = float("nan") specVals[gdxcc.GMS_SVIDX_PINF] = float("inf") specVals[gdxcc.GMS_SVIDX_MINF] = float("-inf") specVals[gdxcc.GMS_SVIDX_NA] = struct.unpack( ">d", bytes.fromhex("fffffffffffffffe") )[0] gdxcc.gdxSetSpecialValues(pgdx, specVals) i = 0 while True: i += 1 ret = gdxcc.gdxDataReadRawStart(pgdx, i) if not ret[0]: break ret = gdxcc.gdxSymbolInfo(pgdx, i) if not ret[0]: break if len(ret) < 2: raise RuntimeError("GAMS GDX failure (gdxSymbolInfo).") stat = ret[1] if not stat in stat_vars: continue ret = gdxcc.gdxDataReadRaw(pgdx) if not ret[0] or len(ret[2]) == 0: raise RuntimeError("GAMS GDX failure (gdxDataReadRaw).") if stat in ('OBJEST', 'OBJVAL', 'ETSOLVE'): stat_vars[stat] = ret[2][0] else: stat_vars[stat] = int(ret[2][0]) gdxcc.gdxDataReadDone(pgdx) gdxcc.gdxClose(pgdx) if os.path.exists(results_filename): ret = gdxcc.gdxOpenRead(pgdx, results_filename) if not ret[0]: raise RuntimeError("GAMS GDX failure (gdxOpenRead): %d." % ret[1]) specVals = gdxcc.doubleArray(gdxcc.GMS_SVIDX_MAX) rc = gdxcc.gdxGetSpecialValues(pgdx, specVals) specVals[gdxcc.GMS_SVIDX_EPS] = sys.float_info.min specVals[gdxcc.GMS_SVIDX_UNDEF] = float("nan") specVals[gdxcc.GMS_SVIDX_PINF] = float("inf") specVals[gdxcc.GMS_SVIDX_MINF] = float("-inf") specVals[gdxcc.GMS_SVIDX_NA] = struct.unpack( ">d", bytes.fromhex("fffffffffffffffe") )[0] gdxcc.gdxSetSpecialValues(pgdx, specVals) i = 0 while True: i += 1 ret = gdxcc.gdxDataReadRawStart(pgdx, i) if not ret[0]: break ret = gdxcc.gdxDataReadRaw(pgdx) if not ret[0] or len(ret[2]) < 2: raise RuntimeError("GAMS GDX failure (gdxDataReadRaw).") level = ret[2][0] dual = ret[2][1] ret = gdxcc.gdxSymbolInfo(pgdx, i) if not ret[0]: break if len(ret) < 2: raise RuntimeError("GAMS GDX failure (gdxSymbolInfo).") model_soln[ret[1]] = (level, dual) gdxcc.gdxDataReadDone(pgdx) gdxcc.gdxClose(pgdx) gdxcc.gdxFree(pgdx) gdxcc.gdxLibraryUnload() return model_soln, stat_vars def _parse_dat_results(self, config, results_filename, statresults_filename): with open(statresults_filename, 'r') as statresults_file: statresults_text = statresults_file.read() stat_vars = dict() # Skip first line of explanatory text for line in statresults_text.splitlines()[1:]: items = line.split() try: numval = float(items[1]) except: numval = float("nan") stat_vars[items[0]] = numval with open(results_filename, 'r') as results_file: results_text = results_file.read() model_soln = dict() # Skip first line of explanatory text for line in results_text.splitlines()[1:]: items = line.split() model_soln[items[0]] = (float(items[1]), float(items[2])) return model_soln, stat_vars