# ____________________________________________________________________________________
#
# Pyomo: Python Optimization Modeling Objects
# Copyright (c) 2008-2026 National Technology and Engineering Solutions of Sandia, LLC
# Under the terms of Contract DE-NA0003525 with National Technology and Engineering
# Solutions of Sandia, LLC, the U.S. Government retains certain rights in this
# software. This software is distributed under the 3-clause BSD License.
# ____________________________________________________________________________________
import os.path
import json
from pyomo.common.collections import Bunch
from pyomo.common.dependencies import yaml, yaml_available, yaml_load_args
from pyomo.dataportal.factory import DataManagerFactory
[docs]
def detuplize(d, sort=False):
# print("detuplize %s" % str(d))
if type(d) in (list, tuple, set):
ans = []
for item in d:
if type(item) in (list, tuple, set):
ans.append(list(item))
else:
ans.append(item)
if sort:
return sorted(ans)
return ans
elif None in d:
return d[None]
else:
#
# De-tuplize keys via list of key/value pairs
#
ans = []
for k, v in d.items():
if type(k) is tuple:
ans.append({'index': list(k), 'value': v})
else:
ans.append({'index': k, 'value': v})
if sort:
return sorted(ans, key=lambda x: x['value'])
return ans
[docs]
def tuplize(d):
# print("tuplize %s" % str(d))
if type(d) is list and len(d) > 0 and not type(d[0]) is dict:
ans = []
for val in d:
if type(val) is list:
item = []
for v in val:
try:
item.append(int(v))
except:
try:
item.append(float(v))
except:
item.append(v)
item = tuple(item)
else:
try:
item = int(val)
except:
try:
item = float(val)
except:
item = val
ans.append(item)
return ans
#
elif type(d) is list:
ret = {}
for val in d:
if type(val['index']) is list:
index = tuple(val['index'])
else:
index = val['index']
ret[index] = val['value']
return ret
#
elif type(d) is dict:
return d
else:
return {None: d}
[docs]
@DataManagerFactory.register("json", "JSON file interface")
class JSONDictionary:
[docs]
def __init__(self):
self._info = {}
self.options = Bunch()
def available(self):
return True
def initialize(self, **kwds):
self.filename = kwds.pop('filename')
self.add_options(**kwds)
def add_options(self, **kwds):
self.options.update(kwds)
def open(self):
if self.filename is None:
raise IOError("No filename specified")
def close(self):
pass
[docs]
def read(self):
"""
This function loads data from a JSON file and tuplizes the nested
dictionaries and lists of lists.
"""
if not os.path.exists(self.filename):
raise IOError("Cannot find file '%s'" % self.filename)
INPUT = open(self.filename, 'r')
jdata = json.load(INPUT)
INPUT.close()
if jdata is None or len(jdata) == 0:
raise IOError("Empty JSON data file")
self._info = {}
for k, v in jdata.items():
self._info[k] = tuplize(v)
[docs]
def write(self, data):
"""
This function creates a JSON file for the specified data.
"""
with open(self.filename, 'w') as OUTPUT:
jdata = {}
if self.options.data is None:
for k, v in data.items():
jdata[k] = detuplize(v)
elif type(self.options.data) in (list, tuple):
for k in self.options.data:
jdata[k] = detuplize(data[k], sort=self.options.sort)
else:
k = self.options.data
jdata[k] = detuplize(data[k])
json.dump(jdata, OUTPUT)
[docs]
def process(self, model, data, default):
"""
Set the data for the selected components
"""
if not self.options.namespace in data:
data[self.options.namespace] = {}
#
try:
if self.options.data is None:
for key in self._info:
self._set_data(data, self.options.namespace, key, self._info[key])
elif type(self.options.data) in (list, tuple):
for key in self.options.data:
self._set_data(data, self.options.namespace, key, self._info[key])
else:
key = self.options.data
self._set_data(data, self.options.namespace, key, self._info[key])
except KeyError:
raise IOError(
"Data value for '%s' is not available in JSON file '%s'"
% (key, self.filename)
)
def _set_data(self, data, namespace, name, value):
if type(value) is dict:
data[namespace][name] = value
else:
data[namespace][name] = {None: value}
def clear(self):
self._info = {}
[docs]
@DataManagerFactory.register("yaml", "YAML file interface")
class YamlDictionary:
[docs]
def __init__(self):
self._info = {}
self.options = Bunch()
def available(self):
return yaml_available
def requirements(self):
return "pyyaml"
def initialize(self, **kwds):
self.filename = kwds.pop('filename')
self.add_options(**kwds)
def add_options(self, **kwds):
self.options.update(kwds)
def open(self):
if self.filename is None:
raise IOError("No filename specified")
def close(self):
pass
[docs]
def read(self):
"""
This function loads data from a YAML file and tuplizes the nested
dictionaries and lists of lists.
"""
if not os.path.exists(self.filename):
raise IOError("Cannot find file '%s'" % self.filename)
INPUT = open(self.filename, 'r')
jdata = yaml.load(INPUT, **yaml_load_args)
INPUT.close()
if jdata is None:
raise IOError("Empty YAML file")
self._info = {}
for k, v in jdata.items():
self._info[k] = tuplize(v)
[docs]
def write(self, data):
"""
This function creates a YAML file for the specified data.
"""
with open(self.filename, 'w') as OUTPUT:
jdata = {}
if self.options.data is None:
for k, v in data.items():
jdata[k] = detuplize(v)
elif type(self.options.data) in (list, tuple):
for k in self.options.data:
jdata[k] = detuplize(data[k], sort=self.options.sort)
else:
k = self.options.data
jdata[k] = detuplize(data[k])
yaml.dump(jdata, OUTPUT)
[docs]
def process(self, model, data, default):
"""
Set the data for the selected components
"""
if not self.options.namespace in data:
data[self.options.namespace] = {}
#
try:
if self.options.data is None:
for key in self._info:
self._set_data(data, self.options.namespace, key, self._info[key])
elif type(self.options.data) in (list, tuple):
for key in self.options.data:
self._set_data(data, self.options.namespace, key, self._info[key])
else:
key = self.options.data
self._set_data(data, self.options.namespace, key, self._info[key])
except KeyError:
raise IOError(
"Data value for '%s' is not available in YAML file '%s'"
% (key, self.filename)
)
def _set_data(self, data, namespace, name, value):
if type(value) is dict:
data[namespace][name] = value
else:
data[namespace][name] = {None: value}
def clear(self):
self._info = {}