Source code for deflex.main

# -*- coding: utf-8 -*-

"""Main script.

SPDX-FileCopyrightText: 2016-2019 Uwe Krien <krien@uni-bremen.de>

SPDX-License-Identifier: MIT
"""
__copyright__ = "Uwe Krien <krien@uni-bremen.de>"
__license__ = "MIT"


import logging
import multiprocessing
import os
import traceback
from collections import namedtuple
from datetime import datetime

import pandas as pd

from deflex import config as cfg
from deflex import scenario_tools


[docs]def stopwatch(): """Track the running time.""" if not hasattr(stopwatch, "start"): stopwatch.start = datetime.now() return str(datetime.now() - stopwatch.start)[:-7]
[docs]def load_scenario(path, file_type=None): """ Create a deflex scenario object from file. Parameters ---------- path : str A valid deflex scenario file. file_type : str or None Type of the input data. Valid values are 'csv', 'excel', None. If the input is non the path should end on 'csv', '.xls', '.xlsx' to allow auto-detection. Returns ------- deflex.DeflexScenario Examples -------- >>> fn = os.path.join(os.path.dirname(__file__), os.pardir, os.pardir, ... "tests", "data", "deflex_test_scenario.xls") >>> s = load_scenario(fn, file_type="excel") >>> type(s) <class 'deflex.scenario_tools.DeflexScenario'> >>> int(s.table_collection["volatile_source"]["capacity"]["DE02", "wind"]) 517 >>> type(load_scenario(fn)) <class 'deflex.scenario_tools.DeflexScenario'> >>> load_scenario(fn, file_type="csv") # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ... NotADirectoryError: [Errno 20] Not a directory: """ sc = scenario_tools.DeflexScenario() if path is not None: if file_type is None: if "xls" in path[-4:]: file_type = "excel" elif "csv" in path[-4:]: file_type = "csv" else: file_type = None logging.info("Reading file: %s", path) if file_type == "excel": sc.load_excel(path) elif file_type == "csv": sc.load_csv(path) return sc
[docs]def fetch_scenarios_from_dir(path, csv=True, xls=False): """ Search for files with an excel extension or directories ending with '_csv'. By now it is not possible to distinguish between valid deflex scenarios and other excel files or directories ending with 'csv'. Therefore, the given directory should only contain valid scenarios. The function will not search recursively. Parameters ---------- path : str Directory with valid deflex scenarios. csv : bool Search for csv directories. xls : bool Search for xls files. Returns ------- list : Scenarios found in the given directory. Examples -------- >>> test_data = os.path.join(os.path.dirname(__file__), os.pardir, ... os.pardir, "tests", "data") >>> my_csv = fetch_scenarios_from_dir(test_data) >>> len(my_csv) 2 >>> os.path.basename(my_csv[0]) 'deflex_2014_de02_test_csv' >>> my_excel = fetch_scenarios_from_dir(test_data, csv=False, xls=True) >>> len(my_excel) 3 >>> os.path.basename(my_excel[0]) 'deflex_2013_de02_test.xls' >>> len(fetch_scenarios_from_dir(test_data, xls=True)) 5 """ xls_scenarios = [] csv_scenarios = [] for name in os.listdir(path): if (name[-4:] == ".xls" or name[-5:] == "xlsx") and xls is True: xls_scenarios.append(os.path.join(path, name)) if name[-4:] == "_csv" and csv is True: csv_scenarios.append(os.path.join(path, name)) csv_scenarios = sorted(csv_scenarios) xls_scenarios = sorted(xls_scenarios) logging.debug("Found xls(x) scenario: %s", str(xls_scenarios)) logging.debug("Found csv scenario: %s", str(csv_scenarios)) return csv_scenarios + xls_scenarios
[docs]def model_multi_scenarios(scenarios, cpu_fraction=0.2, log_file=None): """ Parameters ---------- scenarios : iterable Multiple scenarios to be modelled in parallel. cpu_fraction : float Fraction of available cpu cores to use for the parallel modelling. A resulting dezimal number of cores will be rounded up to an integer. log_file : str Filename to store the log file. Returns ------- Examples -------- >>> fn1 = os.path.join(os.path.dirname(__file__), os.pardir, os.pardir, ... "tests", "data", "deflex_test_scenario.xls") >>> fn2 = os.path.join(os.path.dirname(__file__), os.pardir, os.pardir, ... "tests", "data", "deflex_test_scenario_broken.xls") >>> my_log_file = os.path.join(os.path.dirname(__file__), os.pardir, ... os.pardir, "tests", "data", ... "my_log_file.csv") >>> my_scenarios = [fn1, fn2] >>> model_multi_scenarios(my_scenarios, log_file=my_log_file) >>> my_log = pd.read_csv(my_log_file, index_col=[0]) >>> good = my_log.loc["deflex_test_scenario.xls"] >>> rv = good["return_value"] >>> datetime.strptime(rv, "%Y-%m-%d %H:%M:%S.%f").year > 2019 True >>> good["trace"] nan >>> os.path.basename(good["result_file"]) 'deflex_test_scenario_alpha.esys' >>> broken = my_log.loc["deflex_test_scenario_broken.xls"] >>> broken["return_value"].replace("'", "") # doctest: +ELLIPSIS 'ValueError(Missing time series for geothermal (capacity: 31.4) in DE01... >>> broken["trace"] # doctest: +ELLIPSIS 'Traceback (most recent call last)... >>> broken["result_file"] nan >>> os.remove(my_log_file) >>> os.remove(good["result_file"]) """ start = datetime.now() maximal_number_of_cores = int( round(multiprocessing.cpu_count() * cpu_fraction + 0.4999) ) p = multiprocessing.Pool(maximal_number_of_cores) logs = p.starmap( batch_model_scenario, zip(scenarios, [False] * len(scenarios)) ) p.close() p.join() failing = {n: r for n, r, t, f, s in logs if isinstance(r, BaseException)} logger = pd.DataFrame() for log in logs: logger.loc[log[0], "start"] = start if isinstance(log[1], BaseException): logger.loc[log[0], "return_value"] = repr(log[1]) else: logger.loc[log[0], "return_value"] = log[1] logger.loc[log[0], "trace"] = log[2] logger.loc[log[0], "result_file"] = log[3] if log_file is None: log_file = os.path.join( os.path.expanduser("~"), ".deflex", "log_deflex.csv" ) os.makedirs(os.path.dirname(log_file), exist_ok=True) logger.to_csv(log_file) if len(failing) < 1: logging.info("Finished all scenarios without errors") else: logging.info(failing)
[docs]def batch_model_scenario(path, named=True, file_type=None, ignore_errors=True): """ Model a single scenario in batch mode. By default errors will be ignored and returned together with the traceback. Parameters ---------- path : str A valid deflex scenario. file_type : str or None Type of the input data. Valid values are 'csv', 'excel', None. If the input is non the path schould end on 'csv', '.xls', '.xlsx'. named : bool If True a named tuple with the following fields will be returned ignore_errors : bool Set True to stop the script if an error occurs for debugging. By default errors are ignored and returned. Returns ------- namedtuple Examples -------- >>> fn = os.path.join(os.path.dirname(__file__), os.pardir, os.pardir, ... "tests", "data", "deflex_test_scenario.xls") >>> r = batch_model_scenario(fn, ignore_errors=False) # doctest: +ELLIPSIS Welcome to the CBC MILP ... >>> r.name 'deflex_test_scenario.xls' >>> result_file = r.result_file >>> os.path.basename(result_file) 'deflex_test_scenario_alpha.esys' >>> r.trace >>> r.return_value.year > 2019 True >>> fn = os.path.join("wrong_file.xls") >>> r = batch_model_scenario(fn) >>> r.name 'wrong_file.xls' >>> repr(r.return_value) "FileNotFoundError(2, 'No such file or directory')" >>> r.result_file >>> r.trace # doctest: +ELLIPSIS 'Traceback (most recent call last):... >>> os.remove(result_file) """ out = namedtuple( "out", ["name", "return_value", "trace", "result_file", "start_time"] ) name = os.path.basename(path) logging.info("Next scenario: %s", name) start_time = datetime.now() if ignore_errors: try: result_file = model_scenario(path, file_type) return_value = datetime.now() trace = None except Exception as e: trace = traceback.format_exc() return_value = e result_file = None else: result_file = model_scenario(path, file_type) return_value = datetime.now() trace = None if not named: return name, return_value, trace, result_file, start_time return out( name=name, return_value=return_value, trace=trace, result_file=result_file, start_time=start_time, )
[docs]def model_scenario( path=None, file_type=None, result_path=None, ): """ Compute a deflex scenario. Parameters ---------- path : str or None File or directory with a valid deflex scenario. If no path is given an energy system (es) has to be passed. file_type : str or None Type of the input data. Valid values are 'csv', 'excel', None. If the input is non the path schould end on 'csv', '.xls', '.xlsx'. result_path : str or None Path to store the output file. If None the results will be stored along with the scenarios. Returns ------- Examples -------- >>> fn = os.path.join(os.path.dirname(__file__), os.pardir, os.pardir, ... "tests", "data", "deflex_test_scenario.xls") >>> r = model_scenario(fn, file_type="excel") # doctest: +ELLIPSIS Welcome to the CBC MILP ... >>> rf = os.path.join(os.path.dirname(fn), "results_cbc", ... "deflex_test_scenario_alpha.esys") >>> os.remove(rf) """ stopwatch() meta = { "model_base": "deflex", "solver": cfg.get("general", "solver"), "start_time": datetime.now(), } logging.info("Start modelling: %s", stopwatch()) sc = load_scenario(path, file_type) logging.info("Add nodes to the EnergySystem: %s", stopwatch()) sc.table2es() sc.meta = meta # If a meta table exists in the table collection update meta dict if "meta" in sc.table_collection: meta.update(sc.table_collection["meta"].to_dict()["value"]) # Use name from meta or from filename if "name" in meta: sc.name = meta["name"] else: meta["name"] = ( os.path.basename(path) + "_" + datetime.now().strftime("%Y%d%m_%H%M%S") ) sc.name = meta["name"] if result_path is None: result_path = os.path.join( os.path.dirname(path), "results_{0}".format(cfg.get("general", "solver")), sc.name + ".esys", ) logging.info("Create the concrete model: %s", stopwatch()) sc.create_model() logging.info("Solve the optimisation model: %s", stopwatch()) sc.solve(solver=cfg.get("general", "solver")) logging.info("Solved. Dump results: %s", stopwatch()) os.makedirs(os.path.dirname(result_path), exist_ok=True) logging.info("Dump file to %s", result_path) sc.meta["end_time"] = datetime.now() sc.dump_es(result_path) logging.info( "%s - deflex scenario finished without errors: %s" % (stopwatch(), sc.name) ) return result_path
[docs]def plot_scenario(path, file_type=None, image_file=None): """ Plot the graph of an energy system. If no filename is given the plot will be shown on the screen but not writen to an image file Parameters ---------- path : str A valid deflex scenario file. file_type : str or None Type of the input data. Valid values are 'csv', 'excel', None. If the input is none the path should end on 'csv', '.xls', '.xlsx' to allow auto detection. image_file : str The image file with a valid suffix (e.g. png, pdf, svg). Returns ------- TODO: Keep this test? It does not work without graphviz-dev and python3-dev Examples -------- >>> fn = os.path.join(os.path.dirname(__file__), os.pardir, os.pardir, ... "tests", "data", "deflex_test_scenario.xls") >>> fn_img = os.path.join(os.path.dirname(__file__), os.pardir, os.pardir, ... "tests", "data", "test_es.graphml") >>> plot_scenario(fn, "excel", fn_img) >>> os.path.isfile(fn_img) True >>> os.remove(fn_img) >>> os.path.isfile(fn_img) False """ sc = load_scenario(path, file_type) sc.table2es() show = image_file is None sc.plot_nodes( filename=image_file, show=show, remove_nodes_with_substrings=["bus_cs"], )
if __name__ == "__main__": pass