Source code for pyomo.repn.standard_repn

# ____________________________________________________________________________________
#
# Pyomo: Python Optimization Modeling Objects
# Copyright (c) 2008-2026 National Technology and Engineering Solutions of Sandia, LLC
# Under the terms of Contract DE-NA0003525 with National Technology and Engineering
# Solutions of Sandia, LLC, the U.S. Government retains certain rights in this
# software.  This software is distributed under the 3-clause BSD License.
# ____________________________________________________________________________________


import sys
import logging
import itertools

from pyomo.common.numeric_types import native_types, native_numeric_types
from pyomo.core.base import Constraint, Objective, ComponentMap

import pyomo.core.expr as EXPR
from pyomo.core.expr.numvalue import NumericConstant
from pyomo.core.base.objective import ObjectiveData, ScalarObjective
from pyomo.core.base import Expression
from pyomo.core.base.expression import (
    ScalarExpression,
    NamedExpressionData,
    ExpressionData,
)
from pyomo.core.base.var import ScalarVar, Var, VarData, value
from pyomo.core.base.param import ScalarParam, ParamData
from pyomo.core.kernel.expression import expression, noclone
from pyomo.core.kernel.variable import IVariable, variable
from pyomo.core.kernel.objective import objective

from io import StringIO

logger = logging.getLogger('pyomo.core')


#
# This checks if the first argument is a numeric value.  If not
# then this is a Pyomo constant expression, and we can only check if its
# close to 'b' when it is constant.
#
[docs] def isclose_const(a, b, rel_tol=1e-9, abs_tol=0.0): if not a.__class__ in native_numeric_types: if a.is_constant(): a = value(a) else: return False return abs(a - b) <= max(rel_tol * max(abs(a), abs(b)), abs_tol)
[docs] class StandardRepn: """ This class defines a standard/common representation for Pyomo expressions that provides an efficient interface for writing all models. TODO: define what "efficient" means to us. """ __slots__ = ( 'constant', # The constant term 'linear_coefs', # Linear coefficients 'linear_vars', # Linear variables 'quadratic_coefs', # Quadratic coefficients 'quadratic_vars', # Quadratic variables 'nonlinear_expr', # Nonlinear expression 'nonlinear_vars', ) # Variables that appear in the nonlinear expression
[docs] def __init__(self): self.constant = 0 self.linear_vars = tuple() self.linear_coefs = tuple() self.quadratic_vars = tuple() self.quadratic_coefs = tuple() self.nonlinear_expr = None self.nonlinear_vars = tuple()
def __getstate__(self): """ This method is required because this class uses slots. """ return ( self.constant, self.linear_coefs, self.linear_vars, self.quadratic_coefs, self.quadratic_vars, self.nonlinear_expr, self.nonlinear_vars, ) def __setstate__(self, state): """ This method is required because this class uses slots. """ ( self.constant, self.linear_coefs, self.linear_vars, self.quadratic_coefs, self.quadratic_vars, self.nonlinear_expr, self.nonlinear_vars, ) = state # # Generate a string representation of the expression # def __str__(self): # pragma: nocover output = StringIO() output.write("\n") output.write("constant: " + str(self.constant) + "\n") output.write( "linear vars: " + str([v_.name for v_ in self.linear_vars]) + "\n" ) output.write( "linear var ids: " + str([id(v_) for v_ in self.linear_vars]) + "\n" ) output.write("linear coef: " + str(list(self.linear_coefs)) + "\n") output.write( "quadratic vars: " + str([(v_[0].name, v_[1].name) for v_ in self.quadratic_vars]) + "\n" ) output.write( "quadratic var ids: " + str([(id(v_[0]), id(v_[1])) for v_ in self.quadratic_vars]) + "\n" ) output.write("quadratic coef: " + str(list(self.quadratic_coefs)) + "\n") if self.nonlinear_expr is None: output.write("nonlinear expr: None\n") else: output.write("nonlinear expr:\n") try: output.write(self.nonlinear_expr.to_string()) output.write("\n") except AttributeError: output.write(str(self.nonlinear_expr)) output.write("\n") output.write( "nonlinear vars: " + str([v_.name for v_ in self.nonlinear_vars]) + "\n" ) output.write("\n") ret_str = output.getvalue() output.close() return ret_str def is_fixed(self): if ( len(self.linear_vars) == 0 and len(self.nonlinear_vars) == 0 and len(self.quadratic_vars) == 0 ): return True return False def polynomial_degree(self): if not self.nonlinear_expr is None: return None if len(self.quadratic_coefs) > 0: return 2 if len(self.linear_coefs) > 0: return 1 return 0 def is_constant(self): return ( self.nonlinear_expr is None and len(self.quadratic_coefs) == 0 and len(self.linear_coefs) == 0 ) def is_linear(self): return self.nonlinear_expr is None and len(self.quadratic_coefs) == 0 def is_quadratic(self): return len(self.quadratic_coefs) > 0 and self.nonlinear_expr is None def is_nonlinear(self): return not (self.nonlinear_expr is None and len(self.quadratic_coefs) == 0) def to_expression(self, sort=True): # # When an standard representation is created, the ordering of the # linear and quadratic terms may not be preserved. Hence, the # sorting option ensures that an expression generated from a # standard representation has a consistent order. # # TODO: Should this replace non-mutable parameters with constants? # expr = self.constant lvars = [(i, v) for i, v in enumerate(self.linear_vars)] if sort: lvars = sorted(lvars, key=lambda x: str(x[1])) for i, v in lvars: c = self.linear_coefs[i] if c.__class__ in native_numeric_types: if not c: pass if isclose_const(c, 1.0): expr += v elif isclose_const(c, -1.0): expr -= v elif c < 0.0: expr -= -c * v else: expr += c * v else: expr += c * v qvars = [(i, v) for i, v in enumerate(self.quadratic_vars)] if sort: qvars = sorted(qvars, key=lambda x: (str(x[1][0]), str(x[1][1]))) for i, v in qvars: if id(v[0]) == id(v[1]): term = v[0] ** 2 else: term = v[0] * v[1] c = self.quadratic_coefs[i] if c.__class__ in native_numeric_types: if isclose_const(c, 1.0): expr += term elif isclose_const(c, -1.0): expr -= term else: expr += c * term else: expr += c * term if self.nonlinear_expr is not None: if expr.__class__ in native_numeric_types and expr == 0: # Some "NL" expressions do not support addition # (e.g. relational expressions) return self.nonlinear_expr expr += self.nonlinear_expr return expr
""" Note: This function separates linear terms from nonlinear terms. Along the way, fixed variable and mutable parameter values *may* be replaced with constants. However, that is not guaranteed. Thus, the nonlinear expression may contain subexpressions whose value is constant. This was done to avoid additional work when a subexpression is clearly nonlinear. However, this requires that standard representations be temporary. They should be used to interface to a solver and then be deleted. """ # @profile
[docs] def generate_standard_repn( expr, idMap=None, compute_values=True, verbose=False, quadratic=True, repn=None ): # # Use a custom Results object # global Results if quadratic: Results = ResultsWithQuadratics else: Results = ResultsWithoutQuadratics if True: # # Setup # if idMap is None: idMap = {} idMap.setdefault(None, {}) if repn is None: repn = StandardRepn() # # The expression is a number or a non-variable constant # expression. # if expr.__class__ in native_numeric_types or not expr.is_potentially_variable(): if compute_values: repn.constant = EXPR.evaluate_expression(expr) else: repn.constant = expr return repn # # The expression is a variable # elif expr.is_variable_type(): if expr.fixed: if compute_values: repn.constant = value(expr) else: repn.constant = expr return repn repn.linear_coefs = (1,) repn.linear_vars = (expr,) return repn # # The expression is linear # elif expr.__class__ is EXPR.LinearExpression: linear_coefs = {} linear_vars = {} C_ = 0 if compute_values: for arg in expr.args: if arg.__class__ is EXPR.MonomialTermExpression: c, v = arg.args if c.__class__ not in native_numeric_types: c = EXPR.evaluate_expression(c) if v.fixed: C_ += c * v.value continue id_ = id(v) if id_ in linear_coefs: linear_coefs[id_] += c else: linear_coefs[id_] = c linear_vars[id_] = v elif arg.__class__ in native_numeric_types: C_ += arg elif arg.is_variable_type(): if arg.fixed: C_ += arg.value continue id_ = id(arg) if id_ in linear_coefs: linear_coefs[id_] += 1 else: linear_coefs[id_] = 1 linear_vars[id_] = arg else: C_ += EXPR.evaluate_expression(arg) else: # compute_values == False for arg in expr.args: if arg.__class__ is EXPR.MonomialTermExpression: c, v = arg.args if v.fixed: C_ += c * v continue id_ = id(v) if id_ in linear_coefs: linear_coefs[id_] += c else: linear_coefs[id_] = c linear_vars[id_] = v elif arg.__class__ in native_numeric_types: C_ += arg elif arg.is_variable_type(): if arg.fixed: C_ += arg continue id_ = id(arg) if id_ in linear_coefs: linear_coefs[id_] += 1 else: linear_coefs[id_] = 1 linear_vars[id_] = arg else: C_ += arg vars_ = [] coef_ = [] for id_, coef in linear_coefs.items(): if coef.__class__ in native_numeric_types and not coef: continue if id_ not in idMap[None]: key = len(idMap) - 1 idMap[None][id_] = key idMap[key] = linear_vars[id_] else: key = idMap[None][id_] vars_.append(idMap[key]) coef_.append(coef) repn.linear_vars = tuple(vars_) repn.linear_coefs = tuple(coef_) repn.constant = C_ return repn # # Unknown expression object # elif not expr.is_expression_type(): # pragma: nocover raise ValueError("Unexpected expression type: " + str(expr)) # # WEH - Checking the polynomial degree didn't # turn out to be a win. But I'm leaving this # in as a comment for now, since we're not # done tuning this code. # # degree = expr.polynomial_degree() # if degree == 1: # return _generate_linear_standard_repn(expr, # idMap=idMap, # compute_values=compute_values, # verbose=verbose, # repn=repn) # else: return _generate_standard_repn( expr, idMap=idMap, compute_values=compute_values, verbose=verbose, quadratic=quadratic, repn=repn, )
##----------------------------------------------------------------------- ## ## Logic for _generate_standard_repn ## ##-----------------------------------------------------------------------
[docs] class ResultsWithQuadratics: __slot__ = ('const', 'nonl', 'linear', 'quadratic')
[docs] def __init__(self, constant=0, nonl=0, linear=None, quadratic=None): self.constant = constant self.nonl = nonl self.linear = {} # if linear is None: # self.linear = {} # else: # self.linear = linear self.quadratic = {}
# if quadratic is None: # self.quadratic = {} # else: # self.quadratic = quadratic def __str__(self): # pragma: nocover return "Const:\t%s\nLinear:\t%s\nQuadratic:\t%s\nNonlinear:\t%s" % ( str(self.constant), str(self.linear), str(self.quadratic), str(self.nonl), )
[docs] class ResultsWithoutQuadratics: __slot__ = ('const', 'nonl', 'linear')
[docs] def __init__(self, constant=0, nonl=0, linear=None): self.constant = constant self.nonl = nonl self.linear = {}
# if linear is None: # self.linear = {} # else: # self.linear = linear def __str__(self): # pragma: nocover return "Const:\t%s\nLinear:\t%s\nNonlinear:\t%s" % ( str(self.constant), str(self.linear), str(self.nonl), )
Results = ResultsWithQuadratics # @profile def _collect_sum(exp, multiplier, idMap, compute_values, verbose, quadratic): ans = Results() nonl = [] varkeys = idMap[None] for e_ in itertools.islice(exp._args_, exp.nargs()): if e_.__class__ is EXPR.MonomialTermExpression: lhs, v = e_.args if lhs.__class__ in native_numeric_types: if not lhs: continue elif compute_values: lhs = value(lhs) if not lhs: continue if v.fixed: if compute_values: ans.constant += multiplier * lhs * value(v) else: ans.constant += multiplier * lhs * v else: id_ = id(v) if id_ in varkeys: key = varkeys[id_] else: key = len(idMap) - 1 varkeys[id_] = key idMap[key] = v if key in ans.linear: ans.linear[key] += multiplier * lhs else: ans.linear[key] = multiplier * lhs elif e_.__class__ in native_numeric_types: ans.constant += multiplier * e_ elif e_.is_variable_type(): if e_.fixed: if compute_values: ans.constant += multiplier * e_.value else: ans.constant += multiplier * e_ else: id_ = id(e_) if id_ in varkeys: key = varkeys[id_] else: key = len(idMap) - 1 varkeys[id_] = key idMap[key] = e_ if key in ans.linear: ans.linear[key] += multiplier else: ans.linear[key] = multiplier elif not e_.is_potentially_variable(): if compute_values: ans.constant += multiplier * value(e_) else: ans.constant += multiplier * e_ else: res_ = _collect_standard_repn( e_, multiplier, idMap, compute_values, verbose, quadratic ) # # Add returned from recursion # ans.constant += res_.constant if not (res_.nonl.__class__ in native_numeric_types and res_.nonl == 0): nonl.append(res_.nonl) for i, v in res_.linear.items(): ans.linear[i] = ans.linear.get(i, 0) + v if quadratic: for i in res_.quadratic: ans.quadratic[i] = ans.quadratic.get(i, 0) + res_.quadratic[i] if len(nonl) > 0: if len(nonl) == 1: ans.nonl = nonl[0] else: ans.nonl = EXPR.SumExpression(nonl) zero_coef = [ k for k, coef in ans.linear.items() if coef.__class__ in native_numeric_types and not coef ] for k in zero_coef: ans.linear.pop(k) return ans # @profile def _collect_term(exp, multiplier, idMap, compute_values, verbose, quadratic): # # LHS is a numeric value # if exp._args_[0].__class__ in native_numeric_types: if exp._args_[0] == 0: # TODO: coverage? return Results() return _collect_standard_repn( exp._args_[1], multiplier * exp._args_[0], idMap, compute_values, verbose, quadratic, ) # # LHS is a non-variable expression # else: if compute_values: val = value(exp._args_[0]) if val == 0: return Results() return _collect_standard_repn( exp._args_[1], multiplier * val, idMap, compute_values, verbose, quadratic, ) else: return _collect_standard_repn( exp._args_[1], multiplier * exp._args_[0], idMap, compute_values, verbose, quadratic, ) def _collect_prod(exp, multiplier, idMap, compute_values, verbose, quadratic): # # LHS is a numeric value # if exp._args_[0].__class__ in native_numeric_types: if exp._args_[0] == 0: # TODO: coverage? return Results() return _collect_standard_repn( exp._args_[1], multiplier * exp._args_[0], idMap, compute_values, verbose, quadratic, ) # # RHS is a numeric value # if exp._args_[1].__class__ in native_numeric_types: if exp._args_[1] == 0: # TODO: coverage? return Results() return _collect_standard_repn( exp._args_[0], multiplier * exp._args_[1], idMap, compute_values, verbose, quadratic, ) # # LHS is a non-variable expression # elif not exp._args_[0].is_potentially_variable(): if compute_values: val = value(exp._args_[0]) if val == 0: return Results() return _collect_standard_repn( exp._args_[1], multiplier * val, idMap, compute_values, verbose, quadratic, ) else: return _collect_standard_repn( exp._args_[1], multiplier * exp._args_[0], idMap, compute_values, verbose, quadratic, ) # # RHS is a non-variable expression # elif not exp._args_[1].is_potentially_variable(): if compute_values: val = value(exp._args_[1]) if val == 0: return Results() return _collect_standard_repn( exp._args_[0], multiplier * val, idMap, compute_values, verbose, quadratic, ) else: return _collect_standard_repn( exp._args_[0], multiplier * exp._args_[1], idMap, compute_values, verbose, quadratic, ) # # Both the LHS and RHS are potentially variable ... # # Collect LHS # lhs = _collect_standard_repn( exp._args_[0], 1, idMap, compute_values, verbose, quadratic ) lhs_nonl_None = lhs.nonl.__class__ in native_numeric_types and not lhs.nonl # # LHS is potentially variable, but it turns out to be a constant # because the variables were fixed. # if ( lhs_nonl_None and len(lhs.linear) == 0 and (not quadratic or len(lhs.quadratic) == 0) ): if lhs.constant.__class__ in native_numeric_types and lhs.constant == 0: return Results() if compute_values: val = value(lhs.constant) if val == 0: # TODO: coverage? return Results() return _collect_standard_repn( exp._args_[1], multiplier * val, idMap, compute_values, verbose, quadratic, ) else: return _collect_standard_repn( exp._args_[1], multiplier * lhs.constant, idMap, compute_values, verbose, quadratic, ) # # Collect RHS # rhs = _collect_standard_repn( exp._args_[1], 1, idMap, compute_values, verbose, quadratic ) rhs_nonl_None = rhs.nonl.__class__ in native_numeric_types and not rhs.nonl # # If RHS is zero, then return an empty results # if ( rhs_nonl_None and len(rhs.linear) == 0 and (not quadratic or len(rhs.quadratic) == 0) and rhs.constant.__class__ in native_numeric_types and rhs.constant == 0 ): return Results() # # If either the LHS or RHS are nonlinear, then simply return the nonlinear expression # if not lhs_nonl_None or not rhs_nonl_None: return Results(nonl=multiplier * exp) # If the resulting expression has a polynomial degree greater than 2 # (1 if quadratic is False), then simply return this as a general # nonlinear expression # if max(1 if lhs.linear else 0, 2 if quadratic and lhs.quadratic else 0) + max( 1 if rhs.linear else 0, 2 if quadratic and rhs.quadratic else 0 ) > (2 if quadratic else 1): return Results(nonl=multiplier * exp) ans = Results() ans.constant = multiplier * lhs.constant * rhs.constant if not (lhs.constant.__class__ in native_numeric_types and lhs.constant == 0): for key, coef in rhs.linear.items(): ans.linear[key] = multiplier * coef * lhs.constant if not (rhs.constant.__class__ in native_numeric_types and rhs.constant == 0): for key, coef in lhs.linear.items(): if key in ans.linear: ans.linear[key] += multiplier * coef * rhs.constant else: ans.linear[key] = multiplier * coef * rhs.constant if quadratic: if not (lhs.constant.__class__ in native_numeric_types and lhs.constant == 0): for key, coef in rhs.quadratic.items(): ans.quadratic[key] = multiplier * coef * lhs.constant if not (rhs.constant.__class__ in native_numeric_types and rhs.constant == 0): for key, coef in lhs.quadratic.items(): if key in ans.quadratic: ans.quadratic[key] += multiplier * coef * rhs.constant else: ans.quadratic[key] = multiplier * coef * rhs.constant for lkey, lcoef in lhs.linear.items(): for rkey, rcoef in rhs.linear.items(): ndx = (lkey, rkey) if lkey <= rkey else (rkey, lkey) if ndx in ans.quadratic: ans.quadratic[ndx] += multiplier * lcoef * rcoef else: ans.quadratic[ndx] = multiplier * lcoef * rcoef # TODO - Use quicksum here? el_linear = multiplier * sum( coef * idMap[key] for key, coef in lhs.linear.items() if coef.__class__ not in native_numeric_types or coef ) er_linear = multiplier * sum( coef * idMap[key] for key, coef in rhs.linear.items() if coef.__class__ not in native_numeric_types or coef ) el_quadratic = multiplier * sum( coef * idMap[key[0]] * idMap[key[1]] for key, coef in lhs.quadratic.items() if coef.__class__ not in native_numeric_types or coef ) er_quadratic = multiplier * sum( coef * idMap[key[0]] * idMap[key[1]] for key, coef in rhs.quadratic.items() if coef.__class__ not in native_numeric_types or coef ) if (el_linear.__class__ not in native_numeric_types or el_linear) and ( er_quadratic.__class__ not in native_numeric_types or er_quadratic ): ans.nonl += el_linear * er_quadratic if (er_linear.__class__ not in native_numeric_types or er_linear) and ( el_quadratic.__class__ not in native_numeric_types or el_quadratic ): ans.nonl += er_linear * el_quadratic return ans # @profile def _collect_var(exp, multiplier, idMap, compute_values, verbose, quadratic): ans = Results() if exp.fixed: if compute_values: ans.constant += multiplier * value(exp) else: ans.constant += multiplier * exp else: id_ = id(exp) if id_ in idMap[None]: key = idMap[None][id_] else: key = len(idMap) - 1 idMap[None][id_] = key idMap[key] = exp ans.linear[key] = multiplier return ans def _collect_pow(exp, multiplier, idMap, compute_values, verbose, quadratic): # # Exponent is a numeric value # if exp._args_[1].__class__ in native_numeric_types: exponent = exp._args_[1] # # Exponent is not potentially variable # # Compute its value if compute_values==True # elif not exp._args_[1].is_potentially_variable(): if compute_values: exponent = value(exp._args_[1]) else: exponent = exp._args_[1] # # Otherwise collect a standard repn # else: res = _collect_standard_repn( exp._args_[1], 1, idMap, compute_values, verbose, quadratic ) # # If the expression is variable, then return a nonlinear expression # if ( not (res.nonl.__class__ in native_numeric_types and res.nonl == 0) or len(res.linear) > 0 or (quadratic and len(res.quadratic) > 0) ): return Results(nonl=multiplier * exp) exponent = res.constant if exponent.__class__ in native_numeric_types: # # #**0 = 1 # if exponent == 0: return Results(constant=multiplier) # # #**1 = # # # Return the standard repn for arg(0) # elif exponent == 1: return _collect_standard_repn( exp._args_[0], multiplier, idMap, compute_values, verbose, quadratic ) # # Ignore #**2 unless quadratic==True # elif exponent == 2 and quadratic: res = _collect_standard_repn( exp._args_[0], 1, idMap, compute_values, verbose, quadratic ) # # If arg(0) is nonlinear, then this is a nonlinear repn # if ( not (res.nonl.__class__ in native_numeric_types and res.nonl == 0) or len(res.quadratic) > 0 ): return Results(nonl=multiplier * exp) # # If computing values and no linear terms, then the return a constant repn # elif compute_values and len(res.linear) == 0: return Results(constant=multiplier * res.constant**exponent) # # If the base is linear, then we compute the quadratic expression for it. # else: ans = Results() has_constant = ( res.constant.__class__ not in native_numeric_types or res.constant != 0 ) if has_constant: ans.constant = multiplier * res.constant * res.constant # this is reversed since we want to pop off the end for efficiency # and the quadratic terms have a convention that the indexing tuple # of key1, key2 is such that key1 <= key2 keys = sorted(res.linear.keys(), reverse=True) while len(keys) > 0: key1 = keys.pop() coef1 = res.linear[key1] if has_constant: ans.linear[key1] = 2 * multiplier * coef1 * res.constant ans.quadratic[key1, key1] = multiplier * coef1 * coef1 for key2 in keys: coef2 = res.linear[key2] ans.quadratic[key1, key2] = 2 * multiplier * coef1 * coef2 return ans # # If args(0) is a numeric value or it is fixed, then we have a constant value # if exp._args_[0].__class__ in native_numeric_types or exp._args_[0].is_fixed(): if compute_values: return Results(constant=multiplier * value(exp._args_[0]) ** exponent) else: return Results(constant=multiplier * exp) # # Return a nonlinear expression here # return Results(nonl=multiplier * exp) def _collect_division(exp, multiplier, idMap, compute_values, verbose, quadratic): if ( exp._args_[1].__class__ in native_numeric_types or not exp._args_[1].is_potentially_variable() ): # TODO: coverage? # Denominator is trivially constant if compute_values: denom = 1.0 * value(exp._args_[1]) else: denom = 1.0 * exp._args_[1] else: res = _collect_standard_repn( exp._args_[1], 1, idMap, compute_values, verbose, quadratic ) if ( not (res.nonl.__class__ in native_numeric_types and res.nonl == 0) or len(res.linear) > 0 or (quadratic and len(res.quadratic) > 0) ): # Denominator is variable, give up: this is nonlinear return Results(nonl=multiplier * exp) else: # Denominaor ended up evaluating to a constant denom = 1.0 * res.constant if denom.__class__ in native_numeric_types and denom == 0: raise ZeroDivisionError if ( exp._args_[0].__class__ in native_numeric_types or not exp._args_[0].is_potentially_variable() ): num = exp._args_[0] if compute_values: num = value(num) return Results(constant=multiplier * num / denom) return _collect_standard_repn( exp._args_[0], multiplier / denom, idMap, compute_values, verbose, quadratic ) def _collect_branching_expr(exp, multiplier, idMap, compute_values, verbose, quadratic): _if, _then, _else = exp.args if _if.__class__ in native_types: if_val = _if elif not _if.is_potentially_variable(): if compute_values: if_val = value(_if) else: return Results(nonl=multiplier * exp) else: res = _collect_standard_repn(_if, 1, idMap, compute_values, verbose, quadratic) if ( not (res.nonl.__class__ in native_numeric_types and res.nonl == 0) or len(res.linear) > 0 or (quadratic and len(res.quadratic) > 0) ): return Results(nonl=multiplier * exp) elif res.constant.__class__ in native_numeric_types: if_val = res.constant else: return Results(constant=multiplier * exp) if if_val: if _then.__class__ in native_numeric_types: return Results(constant=multiplier * _then) return _collect_standard_repn( _then, multiplier, idMap, compute_values, verbose, quadratic ) else: if _else.__class__ in native_numeric_types: return Results(constant=multiplier * _else) return _collect_standard_repn( _else, multiplier, idMap, compute_values, verbose, quadratic ) def _collect_nonl(exp, multiplier, idMap, compute_values, verbose, quadratic): res = _collect_standard_repn( exp._args_[0], 1, idMap, compute_values, verbose, quadratic ) if ( not (res.nonl.__class__ in native_numeric_types and res.nonl == 0) or len(res.linear) > 0 or (quadratic and len(res.quadratic) > 0) ): return Results(nonl=multiplier * exp) if compute_values: return Results(constant=multiplier * exp._apply_operation([res.constant])) else: return Results(constant=multiplier * exp) def _collect_negation(exp, multiplier, idMap, compute_values, verbose, quadratic): return _collect_standard_repn( exp._args_[0], -1 * multiplier, idMap, compute_values, verbose, quadratic ) # # TODO - Verify if code is used # def _collect_const(exp, multiplier, idMap, compute_values, verbose, quadratic): if compute_values: return Results(constant=multiplier * value(exp)) else: return Results(constant=multiplier * exp) def _collect_identity(exp, multiplier, idMap, compute_values, verbose, quadratic): if exp._args_[0].__class__ in native_numeric_types: return Results(constant=multiplier * exp._args_[0]) if not exp._args_[0].is_potentially_variable(): if compute_values: return Results(constant=multiplier * value(exp._args_[0])) else: return Results(constant=multiplier * exp._args_[0]) return _collect_standard_repn( exp.expr, multiplier, idMap, compute_values, verbose, quadratic ) def _collect_linear(exp, multiplier, idMap, compute_values, verbose, quadratic): ans = Results() if compute_values: ans.constant = multiplier * value(exp.constant) else: ans.constant = multiplier * exp.constant linear = {} linear_vars = {} for c, v in zip(exp.linear_coefs, exp.linear_vars): if v.fixed: if compute_values: ans.constant += multiplier * value(c) * value(v) else: ans.constant += multiplier * c * v else: key = id(v) if compute_values: if key in linear: linear[key] += multiplier * value(c) else: linear[key] = multiplier * value(c) linear_vars[key] = v else: if key in linear: linear[key] += multiplier * c else: linear[key] = multiplier * c linear_vars[key] = v for id_, coef in linear.items(): if coef.__class__ in native_numeric_types and not coef: continue if id_ in idMap[None]: key = idMap[None][id_] else: key = len(idMap) - 1 idMap[None][id_] = key idMap[key] = linear_vars[id_] ans.linear[key] = coef return ans def _collect_comparison(exp, multiplier, idMap, compute_values, verbose, quadratic): if multiplier != 1: # this *will* generate an exception with the new relational expressions exp = multiplier * exp return Results(nonl=exp) def _collect_external_fn(exp, multiplier, idMap, compute_values, verbose, quadratic): if compute_values and exp.is_fixed(): return Results(constant=multiplier * value(exp)) return Results(nonl=multiplier * exp) _repn_collectors = { EXPR.SumExpression: _collect_sum, EXPR.ProductExpression: _collect_prod, EXPR.MonomialTermExpression: _collect_term, EXPR.PowExpression: _collect_pow, EXPR.DivisionExpression: _collect_division, EXPR.Expr_ifExpression: _collect_branching_expr, EXPR.UnaryFunctionExpression: _collect_nonl, EXPR.AbsExpression: _collect_nonl, EXPR.NegationExpression: _collect_negation, EXPR.LinearExpression: _collect_linear, EXPR.InequalityExpression: _collect_comparison, EXPR.RangedExpression: _collect_comparison, EXPR.EqualityExpression: _collect_comparison, EXPR.ExternalFunctionExpression: _collect_external_fn, # ConnectorData : _collect_linear_connector, # ScalarConnector : _collect_linear_connector, ParamData: _collect_const, ScalarParam: _collect_const, # param.Param : _collect_linear_const, # parameter : _collect_linear_const, NumericConstant: _collect_const, VarData: _collect_var, ScalarVar: _collect_var, Var: _collect_var, variable: _collect_var, IVariable: _collect_var, ExpressionData: _collect_identity, ScalarExpression: _collect_identity, expression: _collect_identity, noclone: _collect_identity, NamedExpressionData: _collect_identity, Expression: _collect_identity, ObjectiveData: _collect_identity, ScalarObjective: _collect_identity, objective: _collect_identity, } def _collect_standard_repn(exp, multiplier, idMap, compute_values, verbose, quadratic): fn = _repn_collectors.get(exp.__class__, None) if fn is not None: return fn(exp, multiplier, idMap, compute_values, verbose, quadratic) # # Catch any known numeric constants # if exp.__class__ in native_numeric_types or not exp.is_potentially_variable(): return _collect_const( exp, multiplier, idMap, compute_values, verbose, quadratic ) # # These are types that might be extended using duck typing. # try: if exp.is_variable_type(): fn = _collect_var if exp.is_named_expression_type(): fn = _collect_identity except AttributeError: # TODO: coverage? pass if fn is not None: _repn_collectors[exp.__class__] = fn return fn(exp, multiplier, idMap, compute_values, verbose, quadratic) raise ValueError( "Unexpected expression (type %s)" % type(exp).__name__ ) # TODO: coverage? def _generate_standard_repn( expr, idMap=None, compute_values=True, verbose=False, quadratic=True, repn=None ): if expr.__class__ is EXPR.SumExpression: # # This is the common case, so start collecting the sum # ans = _collect_sum(expr, 1, idMap, compute_values, verbose, quadratic) else: # # Call generic recursive logic # ans = _collect_standard_repn(expr, 1, idMap, compute_values, verbose, quadratic) # # Create the final object here from 'ans' # repn.constant = ans.constant # # Create a list (tuple) of the variables and coefficients # v = [] c = [] for key, val in ans.linear.items(): if val.__class__ in native_numeric_types: if not val: continue elif val.is_constant(): # TODO: coverage? if value(val) == 0: continue v.append(idMap[key]) c.append(val) repn.linear_vars = tuple(v) repn.linear_coefs = tuple(c) if quadratic: repn.quadratic_vars = [] repn.quadratic_coefs = [] for key in ans.quadratic: val = ans.quadratic[key] if val.__class__ in native_numeric_types: if val == 0: # TODO: coverage? continue elif val.is_constant(): # TODO: coverage? if value(val) == 0: continue repn.quadratic_vars.append((idMap[key[0]], idMap[key[1]])) repn.quadratic_coefs.append(val) repn.quadratic_vars = tuple(repn.quadratic_vars) repn.quadratic_coefs = tuple(repn.quadratic_coefs) v = [] c = [] for key in ans.quadratic: v.append((idMap[key[0]], idMap[key[1]])) c.append(ans.quadratic[key]) repn.quadratic_vars = tuple(v) repn.quadratic_coefs = tuple(c) if ans.nonl is not None and not isclose_const(ans.nonl, 0): repn.nonlinear_expr = ans.nonl repn.nonlinear_vars = [] for v_ in EXPR.identify_variables(repn.nonlinear_expr, include_fixed=False): repn.nonlinear_vars.append(v_) # # Update idMap in case we skipped nonlinear sub-expressions # # Q: Should we skip nonlinear sub-expressions? # id_ = id(v_) if id_ in idMap[None]: key = idMap[None][id_] else: key = len(idMap) - 1 idMap[None][id_] = key idMap[key] = v_ repn.nonlinear_vars = tuple(repn.nonlinear_vars) return repn """ WEH - This code assumes the expression is linear and fills in a dictionary. This avoids creating temporary Results objects, but in practice that's not a big win. Hence, this is deprecated. ##----------------------------------------------------------------------- ## ## Logic for _generate_linear_standard_repn ## ##----------------------------------------------------------------------- def _linear_collect_sum(exp, multiplier, idMap, compute_values, verbose, coef): varkeys = idMap[None] for e_ in itertools.islice(exp._args_, exp.nargs()): if e_.__class__ in native_numeric_types: coef[None] += multiplier*e_ elif e_.is_variable_type(): if e_.fixed: if compute_values: coef[None] += multiplier*e_.value else: coef[None] += multiplier*e_ else: id_ = id(e_) if id_ in varkeys: key = varkeys[id_] else: key = len(idMap) - 1 varkeys[id_] = key idMap[key] = e_ if key in coef: coef[key] += multiplier else: coef[key] = multiplier elif not e_.is_potentially_variable(): if compute_values: coef[None] += multiplier * value(e_) else: coef[None] += multiplier * e_ elif e_.__class__ is EXPR.NegationExpression: arg = e_._args_[0] if arg.is_variable_type(): if arg.fixed: if compute_values: coef[None] -= multiplier*arg.value else: coef[None] -= multiplier*arg else: id_ = id(arg) if id_ in varkeys: key = varkeys[id_] else: key = len(idMap) - 1 varkeys[id_] = key idMap[key] = arg if key in coef: coef[key] -= multiplier else: coef[key] = -1 * multiplier else: _collect_linear_standard_repn(arg, -1*multiplier, idMap, compute_values, verbose, coef) elif e_.__class__ is EXPR.MonomialTermExpression: if compute_values: lhs = value(e_._args_[0]) else: lhs = e_._args_[0] if e_._args_[1].fixed: if compute_values: coef[None] += multiplier*lhs*value(e_._args_[1]) else: coef[None] += multiplier*lhs*e_._args_[1] else: id_ = id(e_._args_[1]) if id_ in varkeys: key = varkeys[id_] else: key = len(idMap) - 1 varkeys[id_] = key idMap[key] = e_._args_[1] if key in coef: coef[key] += multiplier*lhs else: coef[key] = multiplier*lhs else: _collect_linear_standard_repn(e_, multiplier, idMap, compute_values, verbose, coef) def _linear_collect_linear(exp, multiplier, idMap, compute_values, verbose, coef): varkeys = idMap[None] if compute_values: coef[None] += multiplier*value(exp.constant) else: coef[None] += multiplier*exp.constant for c,v in zip(exp.linear_coefs, exp.linear_vars): if v.fixed: if compute_values: coef[None] += multiplier*v.value else: coef[None] += multiplier*v else: id_ = id(v) if id_ in varkeys: key = varkeys[id_] else: key = len(idMap) - 1 varkeys[id_] = key idMap[key] = v if compute_values: if key in coef: coef[key] += multiplier*value(c) else: coef[key] = multiplier*value(c) else: if key in coef: coef[key] += multiplier*c else: coef[key] = multiplier*c def _linear_collect_term(exp, multiplier, idMap, compute_values, verbose, coef): # # LHS is a numeric value # if exp._args_[0].__class__ in native_numeric_types: if isclose_default(exp._args_[0],0): return _collect_linear_standard_repn(exp._args_[1], multiplier * exp._args_[0], idMap, compute_values, verbose, coef) # # LHS is a non-variable expression # else: if compute_values: val = value(exp._args_[0]) if isclose_default(val,0): return _collect_linear_standard_repn(exp._args_[1], multiplier * val, idMap, compute_values, verbose, coef) else: _collect_linear_standard_repn(exp._args_[1], multiplier*exp._args_[0], idMap, compute_values, verbose, coef) def _linear_collect_prod(exp, multiplier, idMap, compute_values, verbose, coef): # # LHS is a numeric value # if exp._args_[0].__class__ in native_numeric_types: if isclose_default(exp._args_[0],0): return _collect_linear_standard_repn(exp._args_[1], multiplier * exp._args_[0], idMap, compute_values, verbose, coef) # # LHS is a non-variable expression # elif not exp._args_[0].is_potentially_variable(): if compute_values: val = value(exp._args_[0]) if isclose_default(val,0): return _collect_linear_standard_repn(exp._args_[1], multiplier * val, idMap, compute_values, verbose, coef) else: _collect_linear_standard_repn(exp._args_[1], multiplier*exp._args_[0], idMap, compute_values, verbose, coef) # # The LHS should never be variable # elif exp._args_[0].is_fixed(): if compute_values: val = value(exp._args_[0]) if isclose_default(val,0): return _collect_linear_standard_repn(exp._args_[1], multiplier * val, idMap, compute_values, verbose, coef) else: _collect_linear_standard_repn(exp._args_[1], multiplier*exp._args_[0], idMap, compute_values, verbose, coef) else: if compute_values: val = value(exp._args_[1]) if isclose_default(val,0): return _collect_linear_standard_repn(exp._args_[0], multiplier * val, idMap, compute_values, verbose, coef) else: _collect_linear_standard_repn(exp._args_[0], multiplier*exp._args_[1], idMap, compute_values, verbose, coef) def _linear_collect_var(exp, multiplier, idMap, compute_values, verbose, coef): if exp.fixed: if compute_values: coef[None] += multiplier*value(exp) else: coef[None] += multiplier*exp else: id_ = id(exp) if id_ in idMap[None]: key = idMap[None][id_] else: key = len(idMap) - 1 idMap[None][id_] = key idMap[key] = exp if key in coef: coef[key] += multiplier else: coef[key] = multiplier def _linear_collect_negation(exp, multiplier, idMap, compute_values, verbose, coef): _collect_linear_standard_repn(exp._args_[0], -1*multiplier, idMap, compute_values, verbose, coef) def _linear_collect_identity(exp, multiplier, idMap, compute_values, verbose, coef): arg = exp._args_[0] if arg.__class__ in native_numeric_types: coef[None] += arg elif not arg.is_potentially_variable(): if compute_values: coef[None] += value(arg) else: coef[None] += arg else: _collect_linear_standard_repn(exp.expr, multiplier, idMap, compute_values, verbose, coef) def _linear_collect_branching_expr(exp, multiplier, idMap, compute_values, verbose, coef): if exp._if.__class__ in native_numeric_types: if_val = exp._if else: # If this value is not constant, then the expression is nonlinear. if_val = value(exp._if) if if_val: _collect_linear_standard_repn(exp._then, multiplier, idMap, compute_values, verbose, coef) else: _collect_linear_standard_repn(exp._else, multiplier, idMap, compute_values, verbose, coef) def _linear_collect_pow(exp, multiplier, idMap, compute_values, verbose, coef): if exp._args_[1].__class__ in native_numeric_types: exponent = exp._args_[1] else: # If this value is not constant, then the expression is nonlinear. exponent = value(exp._args_[1]) if exponent == 0: coef[None] += multiplier else: #exponent == 1 _collect_linear_standard_repn(exp._args_[0], multiplier, idMap, compute_values, verbose, coef) _linear_repn_collectors = { EXPR.SumExpression : _linear_collect_sum, EXPR.ProductExpression : _linear_collect_prod, EXPR.MonomialTermExpression : _linear_collect_term, EXPR.PowExpression : _linear_collect_pow, #EXPR.DivisionExpression : _linear_collect_division, EXPR.Expr_ifExpression : _linear_collect_branching_expr, #EXPR.UnaryFunctionExpression : _linear_collect_nonl, #EXPR.AbsExpression : _linear_collect_nonl, EXPR.NegationExpression : _linear_collect_negation, EXPR.LinearExpression : _linear_collect_linear, #EXPR.InequalityExpression : _linear_collect_comparison, #EXPR.RangedExpression : _linear_collect_comparison, #EXPR.EqualityExpression : _linear_collect_comparison, #EXPR.ExternalFunctionExpression : _linear_collect_external_fn, ##EXPR.LinearSumExpression : _collect_linear_sum, ##ConnectorData : _collect_linear_connector, ##ScalarConnector : _collect_linear_connector, ##param.ParamData : _collect_linear_const, ##param.ScalarParam : _collect_linear_const, ##param.Param : _collect_linear_const, ##parameter : _collect_linear_const, VarData : _linear_collect_var, ScalarVar : _linear_collect_var, Var : _linear_collect_var, variable : _linear_collect_var, IVariable : _linear_collect_var, ExpressionData : _linear_collect_identity, ScalarExpression : _linear_collect_identity, expression : _linear_collect_identity, noclone : _linear_collect_identity, NamedExpressionData : _linear_collect_identity, Expression : _linear_collect_identity, ObjectiveData : _linear_collect_identity, ScalarObjective : _linear_collect_identity, objective : _linear_collect_identity, } def _collect_linear_standard_repn(exp, multiplier, idMap, compute_values, verbose, coefs): fn = _linear_repn_collectors.get(exp.__class__, None) if fn is not None: return fn(exp, multiplier, idMap, compute_values, verbose, coefs) # # These are types that might be extended using duck typing. # try: if exp.is_variable_type(): fn = _linear_collect_var if exp.is_named_expression_type(): fn = _linear_collect_identity except AttributeError: pass if fn is not None: return fn(exp, multiplier, idMap, compute_values, verbose, coefs) raise ValueError( "Unexpected expression (type %s)" % type(exp).__name__) def _generate_linear_standard_repn(expr, idMap=None, compute_values=True, verbose=False, repn=None): coef = {None:0} # # Call recursive logic # ans = _collect_linear_standard_repn(expr, 1, idMap, compute_values, verbose, coef) # # Create the final object here from 'ans' # repn.constant = coef[None] del coef[None] # # Create a list (tuple) of the variables and coefficients # # If we compute the values of constants, then we can skip terms with zero # coefficients # if compute_values: keys = list(key for key in coef if not isclose(coef[key],0)) else: keys = list(coef.keys()) repn.linear_vars = tuple(idMap[key] for key in keys) repn.linear_coefs = tuple(coef[key] for key in keys) return repn """ ##----------------------------------------------------------------------- ## ## Functions to preprocess blocks ## ##-----------------------------------------------------------------------
[docs] def preprocess_block_objectives(block, idMap=None): # Get/Create the ComponentMap for the repn if not hasattr(block, '_repn'): block._repn = ComponentMap() block_repn = block._repn for objective_data in block.component_data_objects( Objective, active=True, descend_into=False ): if objective_data.expr is None: raise ValueError( "No expression has been defined for objective %s" % (objective_data.name) ) try: repn = generate_standard_repn(objective_data.expr, idMap=idMap) except Exception: err = sys.exc_info()[1] logging.getLogger('pyomo.core').error( "exception generating a standard representation for objective %s: %s" % (objective_data.name, str(err)) ) raise block_repn[objective_data] = repn
[docs] def preprocess_block_constraints(block, idMap=None): # Get/Create the ComponentMap for the repn if not hasattr(block, '_repn'): block._repn = ComponentMap() block_repn = block._repn for constraint in block.component_objects( Constraint, active=True, descend_into=False ): preprocess_constraint(block, constraint, idMap=idMap, block_repn=block_repn)
[docs] def preprocess_constraint(block, constraint, idMap=None, block_repn=None): from pyomo.repn.beta.matrix import MatrixConstraint if isinstance(constraint, MatrixConstraint): return # Get/Create the ComponentMap for the repn if not hasattr(block, '_repn'): block._repn = ComponentMap() block_repn = block._repn for index, constraint_data in constraint.items(): if not constraint_data.active: continue if constraint_data.body is None: raise ValueError( "No expression has been defined for the body " "of constraint %s" % (constraint_data.name) ) try: repn = generate_standard_repn(constraint_data.body, idMap=idMap) except Exception: err = sys.exc_info()[1] logging.getLogger('pyomo.core').error( "exception generating a standard representation for " "constraint %s: %s" % (constraint_data.name, str(err)) ) raise block_repn[constraint_data] = repn
[docs] def preprocess_constraint_data(block, constraint_data, idMap=None, block_repn=None): # Get/Create the ComponentMap for the repn if not hasattr(block, '_repn'): block._repn = ComponentMap() block_repn = block._repn if constraint_data.body is None: raise ValueError( "No expression has been defined for the body " "of constraint %s" % (constraint_data.name) ) try: repn = generate_standard_repn(constraint_data.body, idMap=idMap) except Exception: err = sys.exc_info()[1] logging.getLogger('pyomo.core').error( "exception generating a standard representation for " "constraint %s: %s" % (constraint_data.name, str(err)) ) raise block_repn[constraint_data] = repn