# ____________________________________________________________________________________
#
# Pyomo: Python Optimization Modeling Objects
# Copyright (c) 2008-2026 National Technology and Engineering Solutions of Sandia, LLC
# Under the terms of Contract DE-NA0003525 with National Technology and Engineering
# Solutions of Sandia, LLC, the U.S. Government retains certain rights in this
# software. This software is distributed under the 3-clause BSD License.
# ____________________________________________________________________________________
import os
from .dependencies import ctypes, multiprocessing
def _as_bytes(val):
"""Helper function to coerce a string to a bytes() object"""
if isinstance(val, bytes):
return val
elif val is not None:
return val.encode('utf-8')
return None
def _as_unicode(val):
"""Helper function to coerce a string to a unicode() object"""
if isinstance(val, str):
return val
elif val is not None:
return val.decode()
return None
def _attempt_ctypes_cdll(name):
"""Load a CDLL library, returning bool indicating success"""
try:
dll = ctypes.CDLL(name)
return True
except OSError:
return False
def _load_dll(name, timeout=10):
"""Load a DLL with a timeout
On some platforms and some DLLs (notably Windows GitHub Actions with
Python 3.5, 3.6, and 3.7 and the msvcr90.dll) we have observed
behavior where the ctypes.CDLL() call hangs indefinitely. This uses
multiprocessing to attempt the import in a subprocess (with a
timeout) and then only calls the import in the main process if the
subprocess succeeded.
Performance note: CtypesEnviron only ever attempts to load a DLL
once (the DLL reference is then held in a class attribute), and this
interface only spawns the subprocess if ctypes.util.find_library
actually locates the target library. This will have a measurable
impact on Windows (where the DLLs exist), but not on other platforms.
The default timeout of 10 is arbitrary. For simple situations, 1
seems adequate. However, more complex examples have been observed
that needed timeout==5. Using a default of 10 is simply doubling
that observed case.
"""
if not ctypes.util.find_library(name):
return False, None
if _load_dll.pool is None:
# Resolving the deferred multiprocessing import could change the
# local "multiprocessing" variable (replacing it with the
# imported module). This can result in an UnboundLocalError.
# By explicitly declaring it "global" we can avoid the error.
global multiprocessing
try:
_load_dll.pool = multiprocessing.Pool(1)
except AssertionError:
# multiprocessing will fail with an assertion error if this
# Python process is a daemonic process (e.g., it was
# launched within a dask server). Fall back on a serial
# process (and live with the risk that the import hangs).
import multiprocessing.dummy
_load_dll.pool = multiprocessing.dummy.Pool(1)
job = _load_dll.pool.apply_async(_attempt_ctypes_cdll, (name,))
try:
result = job.get(timeout)
except multiprocessing.TimeoutError:
result = False
# If there was a timeout, then the subprocess is likely hung and
# cannot be reused. Terminate it here (subsequent calls will
# create a new Pool)
_load_dll.pool.terminate()
_load_dll.pool = None
if result:
return result, ctypes.CDLL(name)
else:
return result, None
# For efficiency, cache the multiprocessing Pool between calls to _load_dll
_load_dll.pool = None
class _RestorableEnvironInterface:
"""Interface to track environment changes and restore state"""
def __init__(self, dll):
assert dll.available()
self.dll = dll
self._original_state = {}
# Transfer over the current os.environ
for key, val in list(os.environ.items()):
if val != self[key]:
self[key] = val
# If we can get a dictionary of the current environment (not
# always possible), then remove any keys that are not in
# os.environ
origEnv = self.dll.get_env_dict()
if origEnv is not None:
for key in origEnv:
if key not in os.environ:
del self[key]
def restore(self):
for key, val in self._original_state.items():
if not val:
if self[key] is not None:
del self[key]
else:
self[key] = val
self._original_state = {}
def __getitem__(self, key):
if isinstance(key, str):
return self.dll.wgetenv(key)
else:
return self.dll.getenv(key)
def __setitem__(self, key, val):
if key not in self._original_state:
self._original_state[key] = self[key]
if isinstance(key, str):
if isinstance(val, str):
self.dll.wputenv_s(key, val)
else:
self.dll.wputenv_s(key, _as_unicode(val))
elif isinstance(val, str):
self.dll.wputenv_s(_as_unicode(key), val)
else:
self.dll.putenv_s(key, val)
def __delitem__(self, key):
if key not in self._original_state:
self._original_state[key] = self[key]
if isinstance(key, str):
self.dll.wputenv_s(key, u'')
else:
self.dll.putenv_s(key, b'')
class _OSEnviron:
"""Helper class to proxy a "DLL-like" interface to os.environ"""
_libname = 'os.environ'
def available(self):
return True
def get_env_dict(self):
return dict(os.environ)
def getenv(self, key):
# environb is not always present and depends on how the
# interpreter was compiled. Fall back on casting environ if
# it is not available.
try:
return os.environb.get(key, None)
except AttributeError:
return _as_bytes(os.environ.get(_as_unicode(key), None))
def wgetenv(self, key):
# PY2 doesn't distinguish, and PY3's environ is nominally
# unicode. We will coerce the result to unicode to guarantee
# the result type.
return _as_unicode(os.environ.get(key, None))
def putenv_s(self, key, val):
# Win32 convention deletes environ entries when the string is empty
if not val:
if key in os.environ:
del os.environ[key]
return
os.environb[key] = val
def wputenv_s(self, key, val):
# Win32 convention deletes environ entries when the string is empty
if not val:
if key in os.environ:
del os.environ[key]
return
os.environ[key] = val
class _MsvcrtDLL:
"""Helper class to manage the interface with the MSVCRT runtime"""
def __init__(self, name):
self._libname = name
if name is None:
self._loaded = False
else:
self._loaded = None
self.dll = None
def available(self):
if self._loaded is not None:
return self._loaded
self._loaded, self.dll = _load_dll(self._libname)
if not self._loaded:
return self._loaded
self.putenv_s = self.dll._putenv_s
self.putenv_s.argtypes = [ctypes.c_char_p, ctypes.c_char_p]
self.putenv_s.restype = ctypes.c_int
self.wputenv_s = self.dll._wputenv_s
self.wputenv_s.argtypes = [ctypes.c_wchar_p, ctypes.c_wchar_p]
self.wputenv_s.restype = ctypes.c_int
self.getenv = self.dll.getenv
self.getenv.argtypes = [ctypes.c_char_p]
self.getenv.restype = ctypes.c_char_p
self.wgetenv = self.dll._wgetenv
self.wgetenv.argtypes = [ctypes.c_wchar_p]
self.wgetenv.restype = ctypes.c_wchar_p
return self._loaded
def get_env_dict(self):
if not self.available():
return None
try:
envp = ctypes.POINTER(ctypes.c_wchar_p).in_dll(self.dll, '_wenviron')
if not envp.contents:
envp = None
except ValueError:
envp = None
if envp is None:
try:
envp = ctypes.POINTER(ctypes.c_char_p).in_dll(self.dll, '_environ')
if not envp.contents:
return None
except ValueError:
return None
ans = {}
size = 0
for line in envp:
if not line:
break
size += len(line)
if len(line) == 0:
raise ValueError(
"Error processing MSVCRT _environ: 0-length string encountered"
)
if size > 32767:
raise ValueError(
"Error processing MSVCRT _environ: "
"exceeded max environment block size (32767)"
)
key, val = line.split('=', 1)
ans[key] = val
return ans
class _Win32DLL:
"""Helper class to manage the interface with the Win32 runtime"""
def __init__(self, name):
self._libname = name
if name is None:
self._loaded = False
else:
self._loaded = None
self.dll = None
def available(self):
if self._loaded is not None:
return self._loaded
self._loaded, self.dll = _load_dll(self._libname)
if not self._loaded:
return self._loaded
self.putenv_s = self.dll.SetEnvironmentVariableA
self.putenv_s.argtypes = [ctypes.c_char_p, ctypes.c_char_p]
self.putenv_s.restype = ctypes.c_bool
self.wputenv_s = self.dll.SetEnvironmentVariableW
self.wputenv_s.argtypes = [ctypes.c_wchar_p, ctypes.c_wchar_p]
self.wputenv_s.restype = ctypes.c_bool
# Note DWORD == c_ulong
self._getenv_dll = self.dll.GetEnvironmentVariableA
self._getenv_dll.argtypes = [ctypes.c_char_p, ctypes.c_char_p, ctypes.c_ulong]
self._getenv_dll.restype = ctypes.c_ulong
self._wgetenv_dll = self.dll.GetEnvironmentVariableW
self._wgetenv_dll.argtypes = [
ctypes.c_wchar_p,
ctypes.c_wchar_p,
ctypes.c_ulong,
]
self._wgetenv_dll.restype = ctypes.c_ulong
# We (arbitrarily) choose to return the unicode environ
self._envstr = self.dll.GetEnvironmentStringsW
self._envstr.argtypes = []
self._envstr.restype = ctypes.POINTER(ctypes.c_wchar)
self._free_envstr = self.dll.FreeEnvironmentStringsW
self._free_envstr.argtypes = [ctypes.POINTER(ctypes.c_wchar)]
self._free_envstr.restype = ctypes.c_bool
return self._loaded
def getenv(self, key):
size = self._getenv_dll(key, None, 0)
if not size:
return None
buf = ctypes.create_string_buffer(b'\0' * size)
self._getenv_dll(key, buf, size)
return buf.value or None
def wgetenv(self, key):
size = self._wgetenv_dll(key, None, 0)
if not size:
return None
buf = ctypes.create_unicode_buffer(u'\0' * size)
self._wgetenv_dll(key, buf, size)
return buf.value or None
def get_env_dict(self):
ans = {}
_str_buf = self._envstr()
# I am sure there is an easier way to parse this.
# GetEnvironmentStringsW returns a single const char* that
# points to a block of memory that contains the environment
# strings. Each environment string is NULL terminated, and an
# empty string (effectively two consecutive NULLs) indicates the
# end of the block. The following just parses that data
# character by character to reconstitute the original
# environment strings.
_null = {u'\0', b'\0'}
i = 0
while _str_buf[i] not in _null:
_str = ''
while _str_buf[i] not in _null:
_str += _str_buf[i]
i += len(_str_buf[i])
if len(_str_buf[i]) == 0:
raise ValueError(
"Error processing Win32 GetEnvironmentStringsW: "
"0-length character encountered"
)
if i > 32767: # max var length
raise ValueError(
"Error processing Win32 GetEnvironmentStringsW: "
"exceeded max environment block size (32767)"
)
key, val = _str.split('=', 1)
ans[key] = val
i += len(_str_buf[i]) # Skip the NULL
self._free_envstr(_str_buf)
return ans
[docs]
class CtypesEnviron:
"""A context manager for managing environment variables
This class provides a simplified interface for consistently setting
and restoring environment variables, with special handling to ensure
consistency with the C Runtime Library environment on Windows
platforms.
`os.environ` reflects the current python environment variables, and
will be passed to subprocesses. However, it does not reflect the C
Runtime Library (MSVCRT) environment on Windows platforms. This can
be problemmatic as DLLs loaded through the CTYPES interface will see
the MSVCRT environment and not os.environ. This class provides a
way to manage environment variables and pass changes to both
os.environ and the MSVCRT runtime.
This class implements a context manager API, so that clients can
temporarily change - and then subsequently restore - the
environment.
.. testcode::
:hide:
import os
orig_env_val = os.environ.get('TEMP_ENV_VAR', None)
.. doctest::
>>> os.environ['TEMP_ENV_VAR'] = 'original value'
>>> print(os.environ['TEMP_ENV_VAR'])
original value
>>> with CtypesEnviron(TEMP_ENV_VAR='temporary value'):
... print(os.environ['TEMP_ENV_VAR'])
temporary value
>>> print(os.environ['TEMP_ENV_VAR'])
original value
.. testcode::
:hide:
if orig_env_val is None:
del os.environ['TEMP_ENV_VAR']
else:
os.environ['TEMP_ENV_VAR'] = orig_env_val
"""
# Windows has a number of C Runtime Libraries, each of which can
# hold its own independent copy of the system environment
#
# Note that this order is important: kernel32 appears to track
# os.envion (and it is possible that it could be omitted). It is
# important to deal with it before the msvcrt libraries.
DLLs = [
_Win32DLL('kernel32'),
_MsvcrtDLL(getattr(ctypes.util, 'find_msvcrt', lambda: None)()),
_MsvcrtDLL('api-ms-win-crt-environment-l1-1-0'),
_MsvcrtDLL('msvcrt'),
_MsvcrtDLL('msvcr120'),
_MsvcrtDLL('msvcr110'),
_MsvcrtDLL('msvcr100'),
_MsvcrtDLL('msvcr90'),
_MsvcrtDLL('msvcr80'),
_MsvcrtDLL('msvcr71'),
_MsvcrtDLL('msvcr70'),
]
[docs]
def __init__(self, **kwds):
self.interfaces = [_RestorableEnvironInterface(_OSEnviron())]
self.interfaces.extend(
_RestorableEnvironInterface(dll) for dll in self.DLLs if dll.available()
)
# If this is the first time a CtypesEnviron was created, the
# calls to dll.activate() may have spawned a multiprocessing
# pool, which we should clean up.
if _load_dll.pool is not None:
_load_dll.pool.terminate()
_load_dll.pool = None
# Set the incoming env strings on all interfaces...
for k, v in kwds.items():
self[k] = v
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
self.restore()
[docs]
def restore(self):
"""Restore the environment to the original state
This restores all environment variables modified through this
object to the state they were in before this instance made any
changes. Note that any changes made directly to os.environ
outside this instance will not be detected / undone.
"""
# It is possible that os.environ and the MSVCRT did not start
# off in sync; e.g., if someone had set a value in os.environ
# directly. We will be especially careful and restore each
# environ back to its original state. Note that there are cases
# where some of the libs refer to the same DLL space, so we will
# restore them in the opposite order that we set them, so that
# cases where multiple libs point to the same environment things
# are all restored correctly.
for lib in reversed(self.interfaces):
lib.restore()
def __getitem__(self, key):
"""Return the current environment variable value from os.environ"""
return os.environ[key]
def __contains__(self, key):
"""Return True if the key is in os.environ"""
return key in os.environ
def __setitem__(self, key, val):
"""Set an environment variable in all known runtime environments"""
for lib in self.interfaces:
lib[key] = val
def __delitem__(self, key):
"""Remove an environment variable from all known runtime environments"""
for lib in self.interfaces:
del lib[key]