# -*- coding: utf-8 -*-
"""Work with the scenario data.
SPDX-FileCopyrightText: 2016-2021 Uwe Krien <krien@uni-bremen.de>
SPDX-License-Identifier: MIT
"""
__copyright__ = "Uwe Krien <krien@uni-bremen.de>"
__license__ = "MIT"
import datetime
import logging
import os
import shutil
import sys
import warnings
import dill as pickle
import pandas as pd
from oemof import solph
from oemof.network import graph
from deflex import config as cfg
from deflex.scenario_tools.nodes import create_solph_nodes_from_data
if sys.getrecursionlimit() < 3000:
sys.setrecursionlimit(3000)
class NodeDict(dict):
"""
A dictionary where existing key-value-pairs cannot be overwritten.
A NodeDict can collect values with unique keys. Therefore a duplicate key
will raise a ``KeyError`` instead of overwriting the existing key
silently.
"""
def __setitem__(self, key, item):
if super().get(key) is None:
super().__setitem__(key, item)
else:
msg = (
"Key '{0}' already exists. ".format(key)
+ "Duplicate keys are not allowed in a node dictionary."
)
raise KeyError(msg)
class Scenario:
"""
Basic scenario class.
Attributes
----------
input_data : dict
The input data is organised in a dictionary of pandas.DataFrame/
pandas.Series. The keys are the data names (string) and the values are
the data tables.
results : dict
There are different sub-sections of the results. The dictionary has
got the following keys:
* main -- Results of all variables
(result dictionary from oemof.solph)
* param -- Input parameter
* meta -- Meta information and tags of the scenario
* problem -- Information about the linear problem such as
`lower bound`, `upper bound` etc.
* solver -- Solver results
* solution -- Information about the found solution and the objective
value
The model results are stored in the `main` section. It contains
another dictionary with tuples as keys and the
results of the variables as values (nested dictionary with
pandas.DataFrame). The tuples contain the node object in the following
form: (from_node, to_node) for flows and (node, None) for components.
See the `solph documentation
<https://oemof-solph.readthedocs.io/en/latest/usage.html>`_
for more details.
meta : dict
Meta information that can be used to search for in stored scenarios.
The dictionary keys can be used like tags or categories.
es : oemof.solph.EnergySystem
This attribute will hold the oemof.solph.EnergySystem.
"""
def __init__(self, meta=None, input_data=None, es=None, results=None):
self.meta = {} if meta is None else meta
self.input_data = {} if input_data is None else input_data
self.es = es
self.results = results
def initialise_energy_system(self):
"""
Create a solph.EnergySystem and store it in the es attribute. The
input_data attribute has to contain the input data to use this method.
Returns
-------
self
"""
if not self.input_data:
raise ValueError(
"There is no input data in the scenario. You cannot "
"initialise an energy system without a year and the number of "
"time steps."
)
year = int(self.input_data["general"]["year"])
time_steps = int(self.input_data["general"]["number of time steps"])
# Check series tables
for key in [t for t in self.input_data.keys() if "series" in t]:
if time_steps != len(self.input_data[key]):
msg = (
"Number of time steps is {0} but the length of the {1}"
" table is {2}."
).format(time_steps, key, len(self.input_data[key]))
raise ValueError(msg)
# Create datetime index
date_time_index = pd.date_range(
"1/1/{0}".format(year), periods=time_steps, freq="H"
)
self.es = solph.EnergySystem(timeindex=date_time_index)
return self
def read_xlsx(self, filename):
"""
Load scenario data from an xlsx file. The full path has to be passed.
Examples
--------
>>> import deflex as dflx
>>> fn = dflx.fetch_test_files("de02_no-heat.xlsx")
>>> sc = dflx.DeflexScenario()
>>> len(sc.input_data)
0
>>> sc = sc.read_xlsx(fn)
>>> len(sc.input_data)
11
"""
xlsx = pd.ExcelFile(filename)
for sheet in xlsx.sheet_names:
table_index_header = cfg.get_list("table_index_header", sheet)
self.input_data[sheet] = xlsx.parse(
sheet,
index_col=list(range(int(table_index_header[0]))),
header=list(range(int(table_index_header[1]))),
)
if "series" not in sheet:
self.input_data[sheet] = self.input_data[sheet].squeeze(
"columns"
)
self.check_input_data()
self._add_meta_data()
return self
def read_csv(self, path):
"""
Load scenario from a csv-collection. The path of the directory has
to be passed.
Examples
--------
>>> import deflex as dflx
>>> fn = dflx.fetch_test_files("de02_no-heat_csv")
>>> sc = dflx.DeflexScenario()
>>> len(sc.input_data)
0
>>> sc = sc.read_csv(fn)
>>> len(sc.input_data)
11
"""
for file in os.listdir(path):
if file[-4:] == ".csv":
name = file[:-4]
table_index_header = cfg.get_list("table_index_header", name)
filename = os.path.join(path, file)
self.input_data[name] = pd.read_csv(
filename,
index_col=list(range(int(table_index_header[0]))),
header=list(range(int(table_index_header[1]))),
)
if "series" not in name:
self.input_data[name] = self.input_data[name].squeeze(
"columns"
)
self.check_input_data()
self._add_meta_data()
return self
def _add_meta_data(self):
if "info" in self.input_data:
self.meta.update(self.input_data["info"].to_dict())
self.meta.update(self.input_data["general"].to_dict())
def check_input_data(self):
"""
Check the input data for NaN values.
If warning is True (default: False) a warning for all tables is raised
that contain NaN values. This is useful if you suspect many NaN values
in your data set, so you get a good overview over all the corrupt
columns. Otherwise an exception is raised on the first occurrence of
NaN values.
Examples
--------
>>> import deflex as dflx
>>> fn = dflx.fetch_test_files("de02_no-heat_csv")
>>> sc = dflx.create_scenario(fn, "csv")
>>> sc.input_data["electricity demand series"].iloc[15] = float("nan")
>>> sc.input_data["volatile series"].iloc[11] = float("nan")
>>> sc.check_input_data() # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
...
ValueError: NaN values found in the following tables: electricity...
"""
has_warning = []
for sheet, table in self.input_data.items():
msg = (
"NaN values found in table:'{0}', column(s): {1}.\n"
"Empty cells are not allowed in a scenario to avoid "
"unwanted behaviour.\nRemove the whole column/row if "
"a parameter is not needed (optional). Consider that 0, 'inf' "
"or 1 might be neutral values to replace NaN values."
)
if isinstance(table, pd.DataFrame):
# table.dropna(thresh=1, inplace=True, axis=0)
# table.dropna(thresh=1, inplace=True, axis=1)
if table.isnull().any().any():
columns = tuple(table.loc[:, table.isnull().any()].columns)
msg = msg.format(sheet, columns)
warnings.warn(msg, UserWarning)
has_warning.append(sheet)
self.input_data[sheet] = table.dropna(
thresh=(len(table.columns))
)
else:
if table.isnull().any():
value = table.loc[table.isnull()].index
msg = msg.format(sheet, value)
warnings.warn(msg, UserWarning)
has_warning.append(sheet)
if isinstance(self.input_data["volatile plants"], pd.Series):
self.input_data["volatile plants"] = pd.DataFrame(
self.input_data["volatile plants"],
columns=[self.input_data["volatile plants"].name],
)
if len(has_warning) > 0:
msg = (
"NaN values found in the following tables: {0}\n"
"See the warning above for more information"
)
raise ValueError(msg.format(", ".join(has_warning)))
def to_xlsx(self, filename):
"""
Store the input data into an xlsx-file.
filename : str
Full path to the filename.
Examples
--------
>>> import deflex as dflx
>>> fn = dflx.fetch_test_files("de02_no-heat_csv")
>>> sc = dflx.DeflexScenario()
>>> # read scenario from xlsx-file
>>> sc = sc.read_csv(fn)
>>> # store scenario as csv-collection.
>>> sc.to_xlsx(fn.replace("_csv", ".xlsx"))
"""
# create path if it does not exist
suffix = filename.split(".")[-1]
if not suffix == "xlsx":
filename = filename + ".xlsx"
os.makedirs(os.path.dirname(filename), exist_ok=True)
writer = pd.ExcelWriter(filename)
for name, df in sorted(self.input_data.items()):
df.to_excel(writer, name)
writer.save()
logging.info("Scenario saved as excel file to %s", filename)
def to_csv(self, path):
"""
Store the input data as a csv-collection.
filename : str
Full path to the filename.
Examples
--------
>>> import deflex as dflx
>>> fn = dflx.fetch_test_files("de02_no-heat.xlsx")
>>> sc = dflx.DeflexScenario()
>>> # read scenario from xlsx-file
>>> sc = sc.read_xlsx(fn)
>>> # store scenario as csv collection.
>>> sc.to_csv(fn.replace(".xlsx", "_csv"))
"""
if os.path.isdir(path):
shutil.rmtree(os.path.join(path))
os.makedirs(path)
for name, df in self.input_data.items():
name += ".csv"
filename = os.path.join(path, name)
df.to_csv(filename)
logging.info("Scenario saved as csv-collection to %s", path)
def create_nodes(self):
"""This method is a placeholder for the child classes."""
def compute(self, solver="cbc", with_duals=True, **kwargs):
"""
Create a solph.Model from the input data and optimise it using an
external solver. Afterwards the results are stored in the results
attribute.
Parameters
----------
solver : str
The name of the solver as used in the Pyomo package like cbc, glpk,
gurobi, cplex... (default: cbc).
with_duals : bool
Receive the dual variables of all buses in the results (default:
True).
Examples
--------
>>> import deflex as dflx
>>> fn = dflx.fetch_test_files("de02_no-heat_csv")
>>> sc = dflx.create_scenario(fn, "csv")
>>> sc.results is None
True
>>> sc.compute() # doctest: +ELLIPSIS
Welcome to the CBC MILP ...
>>> sc.results.keys()
['Problem', 'Solver', 'Solution', 'Main', 'Param', 'Meta']
"""
self.table2es()
self.solve(
self.create_model(), solver=solver, with_duals=with_duals, **kwargs
)
def add_nodes_to_es(self, nodes):
"""
Add nodes to an existing solph.EnergySystem. If the EnergySystem does
not exist an Error is raised. This method is included in the
:py:meth:`~deflex.DeflexScenario.compute()` method and is only
needed for advanced usage.
Parameters
----------
nodes : dict
Dictionary with a unique key and values of type oemof.network.Node.
Returns
-------
self
"""
self.es.add(*nodes.values())
return self
def table2es(self):
"""
Create a populated solph.EnergySystem from the input data.
The EnergySystem object will be stored in the ``es`` attribute of the
:py:class:`~deflex.DeflexScenario`.
This method is included in the
:py:meth:`~deflex.DeflexScenario.compute()`
method and is only needed for advanced usage.
Examples
--------
>>> import deflex as dflx
>>> fn = dflx.fetch_test_files("de02_no-heat_csv")
>>> sc = dflx.create_scenario(fn, "csv")
>>> sc.es is None
True
>>> sc.table2es()
>>> type(sc.es)
<class 'oemof.solph.network.energy_system.EnergySystem'>
"""
if self.es is None:
logging.info("Initialise a solph energy system.")
self.initialise_energy_system()
logging.info("Creating nodes...")
self.es.add(*self.create_nodes().values())
logging.info("Done. Nodes added to the energy system.")
def create_model(self):
"""
Create a solph model from an EnergySystem object.
This method is included in the :py:meth:`compute()` method and is only
needed for advanced usage.
Returns
-------
oemof.solph.Model
Examples
--------
>>> import deflex as dflx
>>> fn = dflx.fetch_test_files("de02_no-heat_csv")
>>> sc = dflx.create_scenario(fn, "csv")
>>> sc.table2es()
>>> type(sc.create_model())
<class 'oemof.solph.models.Model'>
"""
logging.info("Creating the model this may take a while...")
model = solph.Model(self.es)
logging.info("...Done.")
return model
def dump(self, filename):
"""
Store a solved scenario class into the binary pickle format.
The file will be stored with the suffix `.dflx`. If the given filename
does not contain the suffix, it will be added to the filename.
It is possible to restore the dump but it is not possible to compute
a restored dump. Unsolved scenarios should be stored in the xlsx or
csv format.
>>> import os
>>> import deflex as dflx
>>> fn = dflx.fetch_test_files("de02_no-heat_csv")
>>> sc = dflx.create_scenario(fn, "csv")
>>> sc.results is None
True
>>> sc.compute() # doctest: +ELLIPSIS
Welcome to the CBC MILP ...
>>> fn_dump = fn.replace("_csv", ".dflx")
>>> os.path.basename(fn_dump)
'de02_no-heat.dflx'
>>> sc.dump(fn_dump)
>>> os.path.isfile(fn_dump)
True
>>> sc2 = dflx.restore_scenario(fn_dump)
>>> type(sc2)
<class 'deflex.scenario.DeflexScenario'>
>>> sc2.results.keys()
['Problem', 'Solver', 'Solution', 'Main', 'Param', 'Meta']
>>> os.remove(fn_dump)
"""
suffix = filename.split(".")[-1]
if not suffix == "dflx":
filename = filename + ".dflx"
os.makedirs(os.path.dirname(filename), exist_ok=True)
f = open(filename, "wb")
pickle.dump(self.meta, f)
pickle.dump(self.__dict__, f)
f.close()
logging.info("Results dumped to %s.", filename)
def solve(self, model, solver="cbc", with_duals=True, **solver_kwargs):
"""
Solve the solph.Model. This method is included in the
:py:meth:`~deflex.DeflexScenario.compute()` method and is only
needed for advanced usage.
Parameters
----------
model : oemof.solph.Model
solver : str
with_duals : bool
Other Parameters
----------------
tee : bool
Set to `False` to suppress the solver output (default: True).
logfile : str
Define the path where to store the log file of the solver.
Examples
--------
>>> import deflex as dflx
>>> fn = dflx.fetch_test_files("de02_no-heat_csv")
>>> sc = dflx.create_scenario(fn, "csv")
>>> sc.table2es()
>>> my_model = sc.create_model()
>>> sc.solve(my_model, with_duals=False) # doctest: +ELLIPSIS
Welcome to the CBC MILP ...
>>> sc.results.keys()
['Problem', 'Solver', 'Solution', 'Main', 'Param', 'Meta']
"""
logging.info("Optimising using %s.", solver)
solver_kwargs["tee"] = solver_kwargs.get("tee", True)
self.meta["solph_version"] = solph.__version__
self.meta["solver_name"] = solver
self.meta["solver_start"] = datetime.datetime.now()
if with_duals:
model.receive_duals()
model.solve(solver=solver, solve_kwargs=solver_kwargs)
self.meta["solver_end"] = datetime.datetime.now()
self.es.results["main"] = solph.processing.results(model)
self.meta.update(solph.processing.meta_results(model))
self.es.results["param"] = solph.processing.parameter_as_dict(self.es)
self.es.results["meta"] = self.meta
self.results = self.es.results
def store_graph(self, filename, **kwargs):
"""
Store the EnergySystem graph into a `.graphml` file.
The kwargs are passed to the oemof.network function
`create_nx_graph()
<https://github.com/oemof/oemof.network/blob/dev/src/oemof/network/graph.py#L15>`_.
Parameters
----------
filename : str
Full path of the graphml-file.
Examples
--------
>>> import os
>>> import deflex as dflx
>>> fn = dflx.fetch_test_files("de02_no-heat_csv")
>>> sc = dflx.create_scenario(fn, "csv")
>>> sc.table2es()
>>> fn_graph = fn.replace("_csv", ".graphml")
>>> os.path.basename(fn_graph)
'de02_no-heat.graphml'
>>> sc.store_graph(fn_graph)
>>> os.path.isfile(fn_graph)
True
>>> os.remove(fn_graph)
"""
graph.create_nx_graph(self.es, filename=filename, **kwargs)
[docs]class DeflexScenario(Scenario):
"""
The Deflex Scenario is the center of a deflex energy model. It can store
the needed input data and the results after a successful optimisation.
a inherits from the Scenario class and extends the
Scenario class with valid nodes creation. Additionally one can define
an extra_regions attribute to create an extra commodity source for these
regions. This makes it possible to create a source balance for these
regions.
Parameters
----------
meta : dict
Meta information of the DeflexScenario (optional).
input_data : dict
A dictionary of tables in the deflex scenario style (optional).
es : oemof.solph.EnergySystem
An Energy system (optional).
results : dict
A valid Deflex results dictionary (optional).
"""
__doc__ += "\n".join(Scenario.__doc__.split("\n")[2:])
[docs] def __init__(self, meta=None, input_data=None, es=None, results=None):
super().__init__(
meta=meta, input_data=input_data, es=es, results=results
)
[docs] def create_nodes(self):
"""
Creates solph components and buses from the input data and store them
in a dictionary with unique IDs as keys.
Returns
-------
dict
"""
# Create a special dictionary that will raise an error if a key is
# updated. This avoids the
nodes = NodeDict()
return create_solph_nodes_from_data(self.input_data, nodes)