import concurrent
import math
import random
import warnings
from collections import defaultdict
from time import time
from typing import Dict, List, Tuple, Type
import numpy as np
import seaborn as sns
from bw2data.backends.peewee import Activity
from IPython.display import display
from ipywidgets import interact
from matplotlib import pyplot as plt
from matplotlib.lines import Line2D
from SALib.analyze import sobol as analyse_sobol
from SALib.sample import sobol, sobol_sequence
from sympy import (
Abs,
Add,
AtomicExpr,
Eq,
Expr,
Float,
Mul,
Number,
Piecewise,
Sum,
Symbol,
simplify,
symbols,
)
from sympy.core.operations import AssocOp
from .base_utils import (
ValueOrExpression,
_display_tabs,
displayWithExportButton,
r_squared,
)
from .database import DbContext, with_db_context
from .lca import (
LambdaWithParamNames,
_expanded_names_to_names,
_filter_param_values,
_postMultiLCAAlgebric,
_preMultiLCAAlgebric,
_replace_fixed_params,
compute_impacts,
lambdify,
pd,
)
from .log import warn
from .methods import method_name, method_unit
from .params import (
FixedParamMode,
NameType,
ParamDef,
ParamType,
_complete_and_expand_params,
_param_name,
_param_registry,
_variable_params,
)
PARALLEL = False
DEFAULT_N = 1024
def _parallel_map(f, items):
if PARALLEL:
with concurrent.futures.ThreadPoolExecutor() as exec:
return exec.map(f, items)
else:
return map(f, items)
def _heatmap(df, title, vmax, ints=False):
"""Produce heatmap of a dataframe"""
fig, ax = plt.subplots(figsize=(17, 17))
sns.heatmap(
df.transpose(),
cmap="gist_heat_r",
vmax=vmax,
annot=True,
fmt=".0f" if ints else ".2f",
square=True,
)
plt.title(title, fontsize=20)
plt.yticks(rotation=0)
ax.tick_params(axis="x", labelsize=18)
ax.tick_params(axis="y", labelsize=18)
def _extract_var_params(lambdas):
required_param_names = set()
for lamb in lambdas:
required_param_names.update(_expanded_names_to_names(lamb.expanded_params))
var_params = _variable_params(required_param_names)
return sorted(var_params.values(), key=lambda p: (p.group if p.group else "", p.name))
[docs]
@with_db_context(arg="model")
def oat_matrix(
model: Activity,
impacts,
functional_unit: ValueOrExpression = 1,
n=10,
title="Impact variability (% of mean)",
name_type=NameType.LABEL,
):
"""
This function generates a heat map of relative the variance of impacts
for each parameter varying along its min/max values.
Parameters
----------
model:
Root activity of the inventory
impacts:
Impact variabilityList of impact methods keys (tuples)
functional_unit:
Float value of expression by which to divide each impact.
"""
# Compile model into lambda functions for fast LCA
lambdas = _preMultiLCAAlgebric(model, impacts, alpha=1 / functional_unit)
# Sort params by category
sorted_params = _extract_var_params(lambdas)
change = np.zeros((len(sorted_params), len(impacts)))
for iparam, param in enumerate(sorted_params):
params = {param.name: param.default for param in sorted_params}
# Compute range of values for given param
params[param.name] = param.range(n)
# Compute LCA
df = _postMultiLCAAlgebric(impacts, lambdas, **params)
# Compute change
change[iparam] = (df.max() - df.min()) / df.median() * 100
# Build final heatmap
change = pd.DataFrame(
change,
index=[_param_name(param, name_type) for param in sorted_params],
columns=[method_name(imp) for imp in impacts],
)
_heatmap(change.transpose(), title, 100, ints=True)
return change
def _oat_dasboard(
model,
impacts,
varying_param: ParamDef = None,
n=10,
all_param_names=None,
figsize=(15, 15),
figspace=(0.5, 0.5),
sharex=True,
cols=3,
func_unit_name="kWh",
):
if all_param_names is None:
all_param_names = _param_registry().keys()
params = {name: _param_registry()[name].default for name in all_param_names}
# Compute range of values for given param
params[varying_param.name] = varying_param.range(n)
# print("Params: ", params)
if isinstance(model, Activity):
with DbContext(model):
df = compute_impacts(model, impacts, **params)
else:
df = _postMultiLCAAlgebric(impacts, model, **params)
# Add X values in the table
pname = varying_param.name
if varying_param.unit:
pname = "%s [%s]" % (pname, varying_param.unit)
df.insert(0, pname, varying_param.range(n))
df = df.set_index(pname)
def table():
displayWithExportButton(df)
def graph():
with warnings.catch_warnings():
warnings.simplefilter("ignore")
nb_rows = int(math.ceil(len(impacts) / cols))
fig, axes = plt.subplots(figsize=figsize)
plt.subplots_adjust(None, None, None, None, figspace[0], figspace[1])
axes = df.plot(
ax=axes,
sharex=sharex,
subplots=True,
layout=(nb_rows, cols),
# legend=None,
kind="line" if varying_param.type == ParamType.FLOAT else "bar",
)
axes = axes.flatten()
for ax, impact in zip(axes, impacts):
ax.set_ylim(ymin=0)
unit = method_unit(impact) + " / " + func_unit_name
ax.set_ylabel(unit, fontsize=15)
ax.set_xlabel(pname, fontsize=15)
plt.show(fig)
def change():
ch = (df.max() - df.min()) / df.median() * 100
fig, ax = plt.subplots(figsize=(9, 6))
plt.title("Relative change for %s" % df.index.name)
ch.plot(kind="barh", rot=30)
ax.set_xlabel("Relative change of the median value (%)")
plt.tight_layout()
plt.show(fig)
_display_tabs({"Graphs": graph, "Data": table, "Variation": change})
[docs]
@with_db_context(arg="model")
def oat_dashboard(model, impacts, functional_unit: ValueOrExpression = 1, func_unit_name="kWh", **kwparams):
"""
This function runs a "one at a time" analysis on the selected param.
It makes each parameter vary on its min:max range, while keeping other parameters at their default values.
It returns an interactive dashboard.
Parameters
----------
model :
Root activity of the inventory
methods :
List of methods keys (tuple)
functional_unit:
Value of sympy expression by which to divide the impacts
func_unit_name:
Name of the physical unit of the functional unit, for display
figsize:
Size of figure fro graphs : (15, 15 by default)
figspace:
Space between figures for graphs : (0.5, 0.5) by default
sharex:
Shared X axes ? True by default
Returns
-------
An interactive dashboard with selection of a parameter and graphs of impacts.
"""
lambdas = _preMultiLCAAlgebric(model, impacts, alpha=1 / functional_unit)
def process_func(param):
with DbContext(model):
_oat_dasboard(
model=lambdas, impacts=impacts, varying_param=_param_registry()[param], func_unit_name=func_unit_name, **kwparams
)
param_list = _expanded_names_to_names(lambdas[0].expanded_params)
param_list = list(_variable_params(param_list).keys())
interact(process_func, param=param_list)
class StochasticMethod:
SALTELLI = "saltelli"
RAND = "rand"
SOBOL = "sobol"
def _stochastics(
modelOrLambdas,
methods,
n=DEFAULT_N,
var_params=None,
sample_method=StochasticMethod.SALTELLI,
functional_unit=1,
**extra_fixed_params,
):
params, problem = _generate_random_params(n, sample_method, var_params)
# Fix other params
if extra_fixed_params:
params.update(extra_fixed_params)
Y = _compute_stochastics(modelOrLambdas, methods, params=params, functional_unit=functional_unit)
return problem, params, Y
def _compute_stochastics(modelOrLambdas, methods, functional_unit=1, params=None):
if params is None:
params = {}
if isinstance(modelOrLambdas, Activity):
Y = compute_impacts(modelOrLambdas, methods, functional_unit=functional_unit, **params)
else:
Y = _postMultiLCAAlgebric(methods, modelOrLambdas, **params)
return Y
def _generate_random_params(n, sample_method=StochasticMethod.SALTELLI, var_params=None, seed=None):
"""Compute stochastic impacts for later analysis of incertitude"""
if var_params is None:
var_params = _variable_params().values()
if seed is None:
seed = int(time() * 1000)
random.seed(seed)
# Extract variable names
var_param_names = list([param if isinstance(param, str) else param.name for param in var_params])
problem = {
"num_vars": len(var_param_names),
"names": var_param_names,
"bounds": [[0, 1]] * len(var_param_names),
}
print("Generating samples ...")
if sample_method == StochasticMethod.SALTELLI:
X = sobol.sample(problem, n, calc_second_order=True)
elif sample_method == StochasticMethod.RAND:
X = np.random.rand(n, len(var_param_names))
elif sample_method == StochasticMethod.SOBOL:
X = sobol_sequence.sample(n * (len(var_param_names) * 2 + 2), len(var_param_names))
# elif sample_method == StochasticMethod.LATIN :
# X = latin.sample(problem, n)
else:
raise Exception("Unkown rand method " + sample_method)
# Map normalized 0-1 random values into real values
print("Transforming samples ...")
params = dict()
for i, param_name in enumerate(var_param_names):
param = _param_registry()[param_name]
params[param_name] = param.rand(X[:, i]).tolist()
# Add fixed parameters
for param in _param_registry().values():
if param.name not in var_param_names:
params[param.name] = param.default
return params, problem
class SobolResults:
def __init__(self, s1, s2, st, s1_conf=None, s2_conf=None, st_conf=None):
self.s1 = s1
self.s2 = s2
self.st = st
self.s1_conf = s1_conf
self.s2_conf = s2_conf
self.st_conf = st_conf
def _sobols(methods, problem, Y) -> SobolResults:
"""Computes sobols indices"""
s1 = np.zeros((len(problem["names"]), len(methods)))
s1_conf = np.zeros((len(problem["names"]), len(methods)))
s2 = np.zeros((len(problem["names"]), len(problem["names"]), len(methods)))
s2_conf = np.zeros((len(problem["names"]), len(problem["names"]), len(methods)))
st = np.zeros((len(problem["names"]), len(methods)))
st_conf = np.zeros((len(problem["names"]), len(methods)))
def process(args):
imethod, method = args
print("Processing sobol for " + str(method))
y = Y[Y.columns[imethod]]
res = analyse_sobol.analyze(problem, y.to_numpy(), calc_second_order=True)
return imethod, res
for imethod, res in _parallel_map(process, enumerate(methods)):
try:
s1[:, imethod] = res["S1"]
s1_conf[:, imethod] = res["S1_conf"]
s2_ = np.nan_to_num(res["S2"])
s2_conf_ = np.nan_to_num(res["S2_conf"])
s2[:, :, imethod] = s2_ + np.transpose(s2_)
s2_conf[:, :, imethod] = s2_conf_ + np.transpose(s2_conf_)
st[:, imethod] = res["ST"]
st_conf[:, imethod] = res["ST_conf"]
except Exception as e:
warn("Sobol failed on %s" % imethod[2], e)
return SobolResults(s1, s2, st, s1_conf, s2_conf, st_conf)
def _incer_stochastic_matrix(methods, param_names, Y, sob, name_type=NameType.LABEL):
"""Internal method computing matrix of parameter importance"""
def draw(indice, mode):
"""
Mode comes as ('s1' || 'st', 'raw' || 'percent')
"""
sx = sob.s1 if indice == "s1" else sob.st
if mode == "raw":
data = sx
else:
# If percent, express result as percentage of standard deviation / mean
data = np.zeros((len(param_names), len(methods)))
for i, method in enumerate(methods):
# Total variance
var = np.var(Y[Y.columns[i]])
mean = np.mean(Y[Y.columns[i]])
if mean != 0:
data[:, i] = np.sqrt((sx[:, i] * var)) / mean * 100
param_labels = [_param_name(_param_registry()[name], name_type) for name in param_names]
df = pd.DataFrame(
data,
index=param_labels,
columns=[method_name(method) for method in methods],
)
_heatmap(
df.transpose(),
title="Relative deviation of impacts (%)" if mode == "percent" else "Sobol indices (part of variability)",
vmax=100 if mode == "percent" else 1,
ints=mode == "percent",
)
interact(
draw,
indice=["s1", "st"],
mode=[("Raw indices", "raw"), ("Relative to mean (%)", "percent")],
)
@with_db_context(arg="model")
def incer_stochastic_matrix(model, methods, functional_unit=1, n=DEFAULT_N, name_type=NameType.LABEL):
"""
Method computing matrix of parameter importance
parameters
----------
var_params: Optional list of parameters to vary.
By default use all the parameters with distribution not FIXED
"""
lambdas = _preMultiLCAAlgebric(model, methods, alpha=1 / functional_unit)
var_params = _extract_var_params(lambdas)
problem, _, Y = _stochastics(lambdas, methods, n, var_params)
print("Processing Sobol indices ...")
sob = _sobols(methods, problem, Y)
_incer_stochastic_matrix(methods, problem["names"], Y, sob, name_type=name_type)
return sob
def _incer_stochastic_violin(methods, Y, figsize=(15, 15), figspace=(0.5, 0.5), sharex=True, nb_cols=3):
"""Internal method for computing violin graph of impacts
Parameters
----------
methods: list of impact methods
Y : output
figsize: Size of figure for graphs : (15, 15 by default)
figspace: Space between figures for graphs : (0.5, 0.5) by default
sharex: Shared X axes ? True by default
nb_cols: Number of colums. 3 by default
"""
nb_rows = math.ceil(len(methods) / nb_cols)
fig, axes = plt.subplots(nb_rows, nb_cols, figsize=figsize, sharex=sharex)
plt.subplots_adjust(None, None, None, None, figspace[0], figspace[1])
for imethod, method, ax in zip(range(len(methods)), methods, axes.flatten()):
data = Y[Y.columns[imethod]]
median = np.median(data)
std = np.std(data)
mean = np.mean(data)
ax.violinplot(data, showmedians=True)
ax.title.set_text(method_name(method))
ax.set_ylim(ymin=0)
ax.set_ylabel(method_unit(method))
ax.set_xticklabels([])
# Add text
textstr = "\n".join(
(
r"$\mu=%.3g$" % (mean,),
r"$\mathrm{median}=%.3g$" % (median,),
r"$\sigma=%.3g$" % (std,),
)
)
props = dict(boxstyle="round", facecolor="wheat", alpha=0.5)
ax.text(
0.05,
0.95,
textstr,
transform=ax.transAxes,
fontsize=10,
verticalalignment="top",
bbox=props,
)
plt.tick_params(axis="x", which="both", bottom=False, top=False, labelbottom=False)
plt.show(fig)
@with_db_context(arg="modelOrLambdas")
def incer_stochastic_violin(modelOrLambdas, methods, functional_unit=1, n=DEFAULT_N, var_params=None, **kwparams):
"""
Method for computing violin graph of impacts
parameters
----------
var_params: Optional list of parameters to vary.
By default use all the parameters with distribution not FIXED
"""
_, _, Y = _stochastics(
modelOrLambdas,
methods,
n=n,
var_params=var_params,
functional_unit=functional_unit,
)
_incer_stochastic_violin(methods, Y, **kwparams)
_percentiles = [10, 90, 25, 50, 75]
def _incer_stochastic_variations(methods, param_names, Y, sob1):
"""Method for computing violin graph of impacts"""
method_names = [method_name(method) for method in methods]
std = np.std(Y)
mean = np.mean(Y)
fig = plt.figure(figsize=(12, 6), dpi=80, facecolor="w", edgecolor="k")
ax = plt.gca()
tab20b = plt.get_cmap("tab20b")
tab20c = plt.get_cmap("tab20c")
ax.set_prop_cycle("color", [tab20b(k) if k < 1 else tab20c(k - 1) for k in np.linspace(0, 2, 40)])
relative_variance_pct = std * std / (mean * mean) * 100
totplt = plt.bar(np.arange(len(method_names)), relative_variance_pct, 0.8)
sum = np.zeros(len(methods))
plots = [totplt[0]]
for i_param, param_name in enumerate(param_names):
s1 = sob1[i_param, :]
curr_bar = s1 * relative_variance_pct
curr_plt = plt.bar(np.arange(len(method_names)), curr_bar, 0.8, bottom=sum)
sum += curr_bar
plots.append(curr_plt[0])
plt.legend(plots, ["Higher order"] + param_names, loc=(1, 0))
plt.xticks(np.arange(len(method_names)), method_names, rotation=90)
plt.title("variance / mean² (%)")
plt.show(fig)
def _incer_stochastic_data(methods, param_names, Y, sob1, sobt):
"""Show full stochastic output with sobol indices"""
data = np.zeros((len(param_names) * 2 + len(_percentiles) + 2, len(methods)))
data[0, :] = np.mean(Y, 0)
data[1, :] = np.std(Y, 0)
for i, percentile in enumerate(_percentiles):
data[2 + i, :] = np.percentile(Y, percentile, axis=0)
for i_param, param_name in enumerate(param_names):
s1 = sob1[i_param, :]
data[i_param + 2 + len(_percentiles), :] = s1
data[i_param + 2 + len(_percentiles) + len(param_names), :] = sobt[i_param, :]
rows = (
["mean", "std"]
+ ["p%d" % p for p in _percentiles]
+ ["Sobol 1(%s)" % param for param in param_names]
+ ["Sobol T(%s)" % param for param in param_names]
)
df = pd.DataFrame(data, index=rows, columns=[method_name(method) for method in methods])
displayWithExportButton(df)
[docs]
@with_db_context(arg="model")
def incer_stochastic_dashboard(
model: Activity, methods, n=DEFAULT_N, var_params=None, functional_unit: ValueOrExpression = 1, **kwparams
):
"""
This function runs a monte carlo & Sobol analysis (GSA) on a parametric model and displays a dashboard with results.
Parameters
----------
model:
The root activity of your inventory
methods:
List of impact methods keys (tuples)
var_params:
Optional list of parameters to vary.
By default, all the parameters that are not marked as *FIXED* will be varyed.
functional_unit:
Float value or Sympy expression by which to divide the impacts
figsize:
Size of figure for violin plots : (15, 15) by default
figspace:
Space between violin graphs (0.5, 0.5) by default
sharex:
Share X axe for violin graph : True by default
Returns
-------
An interactive dashboard with 4 tabs :
* Violin graphs with distributions of impacts
* Bar graph with relative variance of impacts
* A heatmap amtrix of Sobol indices for each paramter x impact method
* A detailed table with all values
"""
problem, _, Y = _stochastics(model, methods, n, var_params=var_params, functional_unit=functional_unit)
param_names = problem["names"]
print("Processing Sobol indices ...")
sob = _sobols(methods, problem, Y)
def violin():
_incer_stochastic_violin(methods, Y, **kwparams)
def variation():
_incer_stochastic_variations(methods, param_names, Y, sob.s1)
def matrix():
_incer_stochastic_matrix(methods, problem["names"], Y, sob)
def data():
_incer_stochastic_data(methods, problem["names"], Y, sob.s1, sob.st)
_display_tabs(
{
"Violin graphs": violin,
"Impact variations": variation,
"Sobol matrix": matrix,
"Data": data,
}
)
def _round_expr(expr, num_digits):
"""Round all number in sympy expression with n digits"""
return expr.xreplace({n: Float(n, num_digits) if isinstance(n, Float) else n for n in expr.atoms(Number)})
def _snake2camel(val):
return "".join(word.title() for word in val.split("_"))
def _enum_to_piecewize(exp):
def checkEnumSymbol(term):
if not isinstance(term, Symbol):
return (None, None)
if "_" not in term.name:
return (None, None)
enum_name, enum_val = term.name.rsplit("_", 1)
if enum_name not in _param_registry():
return (None, None)
return enum_name, enum_val
def checkEnumProduct(term):
"""If term is enumVal * X return (param, value, X) else return null"""
if not isinstance(term, Mul):
return (None, None, None)
if len(term.args) != 2:
return (None, None, None)
a, b = term.args
name, value = checkEnumSymbol(a)
if name is not None:
return (name, value, b)
name, value = checkEnumSymbol(b)
if name is not None:
return (name, value, a)
return (None, None, None)
def _replace_enums(expr):
# Dict of enum_name -> { enum_value -> ratio }
enums = defaultdict(lambda: dict())
res_terms = []
for term in expr.args:
name, value, ratio = checkEnumProduct(term)
if name is not None:
# This is a enum value !
enums[name][value] = ratio
else:
# Other term
res_terms.append(term)
if len(enums) == 0:
# No change
return expr
for enum_name, ratio_dict in enums.items():
choices = [(ratio, Eq(symbols(enum_name), symbols(enum_value))) for enum_value, ratio in ratio_dict.items()]
if len(choices) < len(_param_registry()[enum_name].values):
# Not all choices covered ? => Add default
choices.append((0, True))
res_terms.append(Piecewise(*choices))
return Add(*res_terms)
return exp.replace(lambda x: isinstance(x, Sum) or isinstance(x, Add), _replace_enums)
def prettify(exp):
"""
Prettify expression for publication :
> change snake_symbols to SnakeSymbols (avoiding lowerscript in Latex)"""
res = _enum_to_piecewize(exp)
res = res.replace(lambda x: isinstance(x, Symbol), lambda x: Symbol(_snake2camel(str(x))))
# Replace absolute values for positive parameters
res, nb_match = _replace_abs(res)
if nb_match > 0:
# It changed => simplify again
res = simplify(res)
return res
def _replace_abs(exp):
"""Replace |X| by X if X is float param"""
nb_match = 0
def replaceAbs(absExp: Abs):
nonlocal nb_match
if len(absExp.args) != 1:
return absExp
arg = absExp.args[0]
if not isinstance(arg, Symbol):
return absExp
params = _param_registry()
if arg.name not in params:
return absExp
param = params[arg.name]
if param.type == ParamType.FLOAT and param.min >= 0:
nb_match += 1
return arg
else:
return absExp
res = exp.replace(lambda x: isinstance(x, Abs), lambda x: replaceAbs(x))
return res, nb_match
[docs]
@with_db_context(arg="model")
def sobol_simplify_model(
model: Activity,
methods,
min_ratio=0.8,
functional_unit=1,
n=DEFAULT_N * 2,
var_params=None,
fixed_mode=FixedParamMode.MEDIAN,
num_digits=3,
simple_sums=True,
simple_products=True,
) -> List[LambdaWithParamNames]:
"""
Computes Sobol indices and selects main parameters for explaining sensibility of at least 'min_ratio',
Then generates simplified models for those parameters.
The other parameters are replaced by their mean or median values.
Also the term contributing to less than 1% of variation in sums and products are removed.
Decimal numbers are rounded to 3 digits.
Parameters
----------
model:
Root activity of the inventory
methods:
List of impact methods to consider
min_ratio:
[0, 1] minimum amount of first order variation (sum of S1) to explain; 0.8 (80%) by default.
var_params:
Optional list of parameters to vary. If not provided, all parameters vary.
fixed_mode :
What to replace minor parameters with : MEDIAN by default
simple_sums:
If true (default) remove terms in sums that are lower than 1%
simple_products:
If true (default) remove terms in products that contribute to less than 1% to variation
num_digits:
Number of decimal places to round decimal number to (default 3)
Returns
-------
List of *LambdaWithParamNames*, one per impact.
The class *LambdaWithParamNames* wraps the simplified expression together with the
list of required parameters and compiled lambda functions for fast evaluation.
The core simplified expresion may be access with **res[i].expr**
Examples
--------
>>> res = sobol_simplify_model(
>>> model=total_inventory,
>>> methods=[climate_change],
>>> functional_unit=total_energy) # Holds a sympy expression computing the total energy
>>> print(res[0].expr) # Dispalt the simplified expression
"""
# Default var param names
if var_params is None:
var_params = _variable_params().values()
var_param_names = list([param.name for param in var_params])
problem, params, Y = _stochastics(model, methods, n, var_params=var_params, functional_unit=functional_unit)
sob = _sobols(methods, problem, Y)
s1, s2 = sob.s1, sob.s2
res = []
# Generate simplified model
lambdas = _preMultiLCAAlgebric(model, methods, alpha=1 / functional_unit)
exprs = [lambd.expr for lambd in lambdas]
for imethod, method in enumerate(methods):
print("> Method : ", method_name(method))
s1_sum = np.sum(s1[:, imethod])
s2_sum = np.sum(s2[:, :, imethod]) / 2
print("S1: ", s1_sum)
print("S2: ", s2_sum)
print("ST: ", np.sum(sob.st[:, imethod]))
sum = 0
sorted_param_indices = list(range(0, len(var_param_names)))
sorted_param_indices = sorted(sorted_param_indices, key=lambda i: s1[i, imethod], reverse=True)
selected_params = []
sobols = dict()
for iparam, param in enumerate(sorted_param_indices):
# S1
sum += s1[param, imethod]
param_name = var_param_names[param]
selected_params.append(param_name)
sobols[param_name] = s1[param, imethod]
# S2
# for iparam2 in range(0, iparam) :
# param2 = sorted_param_indices[iparam2]
# sum += s2[param, param2, imethod]
if sum > min_ratio:
break
print("Selected params : ", selected_params, "explains: ", sum)
expr = exprs[imethod]
# Replace non selected params by their value
fixed_params = [param for param in _param_registry().values() if param.name not in selected_params]
expr = _replace_fixed_params(expr, fixed_params, fixed_mode=fixed_mode)
# Sympy simplification
expr = simplify(expr)
# Round numerical values to 3 digits
expr = _round_expr(expr, num_digits)
# Lambdify the expression
lambd = LambdaWithParamNames(expr, params=selected_params, sobols=sobols)
# Compute list of parameter values (monte carlo)
expanded_params = _complete_and_expand_params(params, lambd.params, asSymbols=False)
expanded_params = _filter_param_values(expanded_params, lambd.expanded_params)
# Extra step of simplification : simplify sums with neligeable terms
if simple_sums:
expr = _simplify_sums(expr, expanded_params)
if simple_products:
expr = _simplify_products(expr, expanded_params)
expr = simplify(expr)
display(prettify(expr))
res.append(LambdaWithParamNames(expr, params=selected_params, sobols=sobols))
return res
TERM_MIN_LEVEL = 0.01
def _simplify_sums(expr, param_values):
def replace_term(term, minv, maxv, max_max):
abs_max = max(abs(minv), abs(maxv))
if abs_max < (TERM_MIN_LEVEL * max_max):
return None
else:
return term
return _simplify_terms(expr, param_values, Add, replace_term)
def _simplify_products(expr, param_values):
def replace_term(term, minv, maxv, max_max):
# Close to 1 or -1 ?
for factor in [-1, 1]:
if abs(minv - factor) < TERM_MIN_LEVEL and abs(maxv - factor) < TERM_MIN_LEVEL:
if factor == -1:
return -1
else:
# * 1.0 : remove term
return None
return term
return _simplify_terms(expr, param_values, Mul, replace_term)
def _simplify_terms(expr, expanded_param_values, op: Type[AssocOp], replace):
"""Generic simplification of sum or product"""
# Determine max normalized value of this term, for all param values (monte carlo)
min_max_cache: Dict[str, Tuple[float, float]] = dict()
def min_max(term):
# In cache ?
key = str(term)
if key in min_max_cache:
return min_max_cache[key]
# Non varying ?
if len(term.free_symbols) == 0:
values = [term.evalf()]
else:
lambd_term = lambdify(expanded_param_values.keys(), term)
values = lambd_term(**expanded_param_values)
minv = np.min(values)
maxv = np.max(values)
min_max_cache[key] = (minv, maxv)
return (minv, maxv)
# Cleanup :keep only most relevant terms
def cleanup(exp):
if (not isinstance(exp, Expr)) or issubclass(exp.func, AtomicExpr):
return exp
# For Op, only select terms than are relevant
if exp.func == op:
# Compute max of max
def abs_max(minv, maxv):
return max(abs(minv), abs(maxv))
max_max = max([abs_max(*min_max(arg)) for arg in exp.args])
# Only keep term above level
args = [replace(arg, *min_max(arg), max_max) for arg in exp.args]
else:
args = exp.args
args = [cleanup(arg) for arg in args if arg is not None]
return exp.func(*args)
return cleanup(expr)
def _hline(x1, x2, y, linewidth=1, linestyle="solid"):
ymin, ymax = plt.ylim()
xmin, xmax = plt.xlim()
minx = (x1 - xmin) / (xmax - xmin)
maxx = (x2 - xmin) / (xmax - xmin)
plt.axhline(
ymax * y,
color="k",
xmin=minx,
xmax=maxx,
linewidth=linewidth,
linestyle=linestyle,
)
def _vline(x, ymin, ymax, linewidth=1, linestyle="solid"):
plt.axvline(x, color="k", ymin=ymin, ymax=ymax, linewidth=linewidth, linestyle=linestyle)
def _graph(
data,
unit,
title,
ax,
alpha=1,
textboxtop=0.95,
textboxright=0.95,
color=None,
limit_xrange=False,
percentiles=[5, 95],
fontsize=12,
):
if ax is not None:
plt.sca(ax)
else:
ax = plt.gca()
median = np.median(data)
std = np.std(data)
mean = np.mean(data)
xmin = np.min(data)
p9995 = np.percentile(data, 99.95)
pvals = [np.percentile(data, perc) for perc in percentiles]
variability = std / mean
args = dict()
if color:
args["color"] = color
if limit_xrange:
plt.xlim(xmin, p9995)
plt.hist(data, 200, alpha=alpha, **args)
perc_strs = [r"$p%d=%.3g$" % (p, pval) for p, pval in zip(percentiles, pvals)]
textstr = "\n".join(
[
r"$\mu=%.3g$" % (mean,),
r"$\mathrm{median}=%.3g$" % (median,),
r"$\sigma=%.3g$" % (std,),
r"$\sigma/\mu=%.3g$" % (variability,),
]
+ perc_strs
)
props = dict(boxstyle="round", facecolor="wheat" if not color else color, alpha=0.5)
ax.text(
textboxright,
textboxtop,
textstr,
transform=ax.transAxes,
fontsize=fontsize,
verticalalignment="top",
ha="right",
bbox=props,
)
# Axes
ax.set_xlabel(unit, dict(fontsize=14))
ax.set_yticks([])
ax.set_title(title, dict(fontsize=16))
return dict(median=median, std=std, p=pvals, mean=mean, var=variability)
[docs]
def distrib(
model,
methods,
functional_unit=1,
func_unit_name="",
Y=None,
nb_cols=1,
axes=None,
title=None,
invert=None,
scales=None, # Dict of method => scale
unit_overrides=None,
height=10,
width=15,
**kwargs,
):
"""
Show distributions together with statistical outcomes
Synonym of #graphs()
parameters
----------
model:
Root activity of the onventory
methods:
List of impact methods keys (tuples)
functional_unit:
Value of expression by which to divide the impacts
func_unit_name:
Physical unit name of the fonctional unit
nb_cols :
number of colons to display graphs on
invert :
list of methods for which result should be inverted (1/X). None by default
scales :
Dict of method => scale, for multiplying results. To be used with unit overrides
unit_overrides :
Dict of method => string for overriding unit, in respect to custom scales
height:
Height of graph : 10 inches be default
width :
Width of graphs : 15 inches by default
"""
if Y is None:
_, _, Y = _stochastics(model, methods, n=DEFAULT_N * 16, functional_unit=functional_unit)
if axes is None:
nb_rows = math.ceil(len(methods) / nb_cols)
fig, axes = plt.subplots(nb_rows, nb_cols, figsize=(width, height * nb_rows))
if isinstance(axes, np.ndarray):
axes = axes.flatten()
else:
axes = [axes]
plt.subplots_adjust(hspace=0.4)
res = dict()
for i, method, ax in zip(range(len(methods)), methods, axes):
data = Y[Y.columns[i]]
if invert and method in invert:
data = 1 / data
if scales and method in scales:
data = data * scales[method]
graph_title = title if title else method_name(method)
if unit_overrides and method in unit_overrides:
unit = unit_overrides[method]
else:
unit = method_unit(method)
unit += " / " + func_unit_name
stats = _graph(data, unit, graph_title, ax=ax, **kwargs)
res[graph_title + (" [%s]" % unit)] = stats
for i in range(0, -len(methods) % nb_cols):
ax = axes.flatten()[-(i + 1)]
ax.axis("off")
return pd.DataFrame(res)
[docs]
@with_db_context(arg="model")
def compare_simplified(
model: Activity,
methods,
simpl_lambdas,
functional_unit=1,
func_unit_name="",
scales=None, # Dict of method => scale
unit_overrides=None,
nb_cols=2,
height=10,
width=15,
textboxright=0.6,
r2_height=0.65,
residuals=False,
**kwargs,
):
"""
Compare distribution of simplified model with full model.
This functions runs the same monte caarlo random sample as input and feed it to oboth simplified models and full
model. It displays graphs and metrics comparing the two.
Parameters
----------
model:
Root activity of the inventory
methods :
List of impact methods
functional_unit:
Float value or sympy expression by whicxh to devide the impacts
func_unit_name:
Name of the physical functional unit
simpl_lambdas :
Simplified lambdas, as returned by **sobol_simplify_model(...)**
residuals :
If true, draw heat map of residuals, rather than distributions
nb_cols:
number of columns for displaying graphs
percentiles:
List of percentiles to compute [5, 95] by default
"""
# Raw model
lambdas = _preMultiLCAAlgebric(model, methods, alpha=1 / functional_unit)
nb_rows = math.ceil(len(methods) / nb_cols)
fig, axes = plt.subplots(nb_rows, nb_cols, figsize=(width, height * nb_rows))
if not isinstance(axes, np.ndarray):
axes = np.array([axes])
plt.tight_layout()
plt.subplots_adjust(hspace=0.3)
colors = plt.rcParams["axes.prop_cycle"].by_key()["color"]
for i, lambd, simpl_lambd, method, ax in zip(range(len(methods)), lambdas, simpl_lambdas, methods, axes.flatten()):
params, _ = _generate_random_params(100000, sample_method=StochasticMethod.RAND)
# Run Monte Carlo on full model
Y1 = _compute_stochastics([lambd], [method], functional_unit=functional_unit, params=params)
d1 = Y1[Y1.columns[0]]
# Run monte carlo of simplified model
Y2 = _compute_stochastics([simpl_lambd], [method], functional_unit=functional_unit, params=params)
d2 = Y2[Y2.columns[0]]
r_value = r_squared(Y1, Y2)
title = method_name(method)
if scales and method in scales:
d1 = d1 * scales[method]
d2 = d2 * scales[method]
if unit_overrides and method in unit_overrides:
unit = unit_overrides[method]
else:
unit = method_unit(method)
unit += " / " + func_unit_name
if residuals:
hb = ax.hexbin(d1, d2, gridsize=(200, 200), mincnt=1)
cb = fig.colorbar(hb, ax=ax)
xmin, xmax = ax.get_xlim()
ymin, ymax = ax.get_ylim()
vmax = max(ymax, xmax)
line = Line2D([0, vmax], [0, vmax], color="grey", linestyle="dashed")
ax.add_line(line)
ax.set_xlim(0, vmax)
ax.set_ylim(0, vmax)
cb.set_label("Counts")
ax.set_xlabel("Reference model [%s]" % (unit), dict(fontsize=14))
ax.set_ylabel("Simplified model [%s]" % (unit), dict(fontsize=14))
ax.set_title(title, dict(fontsize=16))
else:
_graph(d1, unit, title, ax=ax, alpha=0.6, color=colors[0], **kwargs)
_graph(
d2,
unit,
title,
ax=ax,
alpha=0.6,
textboxright=textboxright,
color=colors[1],
**kwargs,
)
ax.text(
0.9,
r2_height,
"R² : %0.3g" % r_value,
transform=ax.transAxes,
fontsize=14,
verticalalignment="top",
ha="right",
)
# Hide missing graphs
for i in range(0, -len(methods) % nb_cols):
ax = axes.flatten()[-(i + 1)]