import re
from collections import defaultdict
from dataclasses import dataclass
from types import FunctionType
from typing import Dict, List, Optional, Tuple, Union
import brightway2 as bw
import numpy as np
import pandas as pd
import sympy
from bw2data.backends.peewee import Activity
from pandas import DataFrame
from peewee import DoesNotExist
from pint import Quantity, Unit
from sympy import Add, Basic, Expr, ImmutableMatrix, Mul, lambdify, parse_expr
from sympy.printing.numpy import NumPyPrinter
from typing_extensions import deprecated
from . import Settings, newActivity
from .activity import ActivityExtended
from .axis_dict import AxisDict
from .base_utils import (
MethodKey,
TabbedDataframe,
ValueOrExpression,
_actName,
_getDb,
_isnumber,
_isOutputExch,
_user_functions,
)
from .cache import ExprCache, LCIACache
from .database import BIOSPHERE_PREFIX, DbContext, _isForeground, _setMeta
from .log import debug, info, logger, warn
from .matrix import ActMatrix, invert
from .methods import method_name, method_unit
from .params import (
FixedParamMode,
_complete_params,
_compute_param_length,
_expand_param_names,
_expand_params,
_expanded_names_to_names,
_fixed_params,
_getAmountOrFormula,
_param_registry,
_toSymbolDict,
all_params,
freezeParams,
)
from .settings import PROXY_DB_FLAG, temp_settings
def register_user_function(sym, func):
"""Register a custom function with is python implementation
Parameters
----------
sym : the sympy function expression
func : the implementation of the function
Examples
--------
>>> def func_add(*args):
returm sum(*args)
>>>
>>> func_add_sym = register_user_function(sympy.Function('func_add', real=True, imaginary=False), func_add)
>>> e = sympy.Symbol('a') * func_add_sym(sympy.Symbol('b'), sympy.Symbol('c'))
>>> sympy.srepr(e)
"Mul(Symbol('a'), Function('func_add')(Symbol('b'), Symbol('c')))"
"""
global _user_functions
_user_functions[sym.name] = (sym, func)
return sym
[docs]
def user_function(real=True, imaginary=False):
"""Function decorator to register custom Sympy user function.
Beware that the implementation of the custom function should be compatible with numpy and work on vectors of values.
Usage
-----
>>> @user_function()
>>> def func_add(a, b):
>>> return a +b
>>>
>>> e = sympy.Symbol('a') * func_add(sympy.Symbol('b'), sympy.Symbol('c'))
>>> sympy.srepr(e)
"Mul(Symbol('a'), Function('func_add')(Symbol('b'), Symbol('c')))"
"""
def lamb_f(func):
sym_func = sympy.Function(func.__name__, real=real, imaginary=imaginary)
return register_user_function(sym_func, func)
return lamb_f
def _multiLCA(activities, methods):
"""Simple wrapper around brightway API"""
bw.calculation_setups["process"] = {"inv": activities, "ia": methods}
lca = bw.MultiLCA("process")
cols = [_actName(act) for act_amount in activities for act, amount in act_amount.items()]
return pd.DataFrame(lca.results.T, index=[method_name(method) for method in methods], columns=cols)
def multiLCA(models, methods, **params):
"""Compute LCA for a single activity and a set of methods,
after settings the parameters and updating exchange amounts.
This function does not use algebraic factorization and just calls the Brightway2 vanilla code.
Parameters
----------
model : Single activity (root model) or list of activities
methods : Impact methods to consider
params : Other parameters of the model
"""
if not isinstance(models, list):
models = [models]
# Freeze params
dbs = set(model[0] for model in models)
for db in dbs:
if _isForeground(db):
freezeParams(db, **params)
activities = [{act: 1} for act in models]
return _multiLCA(activities, methods).transpose()
""" Compute LCA and return (act, method) => value """
def _group_acts_by_db(acts: list[ActivityExtended]):
res = defaultdict(list)
for act in acts:
res[act["database"]].append(act)
return res
def _multiLCAWithCache(all_acts, methods) -> Dict[Tuple[ActivityExtended, MethodKey], float]:
res = dict()
# Split activities by db_name
for db_name, acts in _group_acts_by_db(all_acts).items():
# Create proxys for bio activities
proxy_acts = _createTechProxysForBio(acts)
with LCIACache(db_name) as cache:
# List activities with at least one missing value
remaining_acts = list(
proxy_act
for proxy_act in proxy_acts.values()
if any(method for method in methods if (proxy_act, method) not in cache.data)
)
# list methods with at least one missing value
remaining_methods = list(
method
for method in methods
if any(proxy_act for proxy_act in proxy_acts.values() if (proxy_act, method) not in cache.data)
)
if len(remaining_acts) > 0 and len(remaining_methods) > 0:
info(f"Computing LCA for {len(remaining_acts)} background acts on methods {remaining_methods}")
lca = _multiLCA([{act: 1} for act in remaining_acts], remaining_methods)
# Set output from dataframe
for imethod, method in enumerate(remaining_methods):
for iact, act in enumerate(remaining_acts):
cache.data[(act, method)] = lca.iloc[imethod, iact]
# Update res with a copy of the cache for selected impacts and activities
res.update({(act, method): cache.data[(proxy_acts[act], method)] for act in acts for method in methods})
return res
def _actToExpressionDict(model: Activity, axis=None) -> Dict[Activity, ValueOrExpression]:
"""
For a given activity return a dict of bg _activity => expression
"""
db_name = model["database"]
if not _isForeground(db_name):
# Already Bg activity ?
return {model: 1}
# Try to load from cache
with ExprCache(db_name) as cache:
key = (axis, "factorized") if Settings.factorize_static_bg else (axis)
if key not in cache.data:
logger.debug(f"{db_name} was not in expression cache, computing...")
res_by_act = _computeDbExpressions(db_name=db_name, axis=axis)
cache.data[key] = res_by_act
else:
logger.debug(f"{model} found in expression cache")
res_by_act = cache.data[key]
return res_by_act[model]
@dataclass
class ValueContext:
"""Represents a result value, with all parameters values used in context"""
value: float
context: Dict[str, float]
def lambdify_expr(expr):
return LambdaWithParamNames(expr, params=[param.name for param in _param_registry().values()])
class LambdaWithParamNames:
"""
This class represents a compiled (lambdified) expression together
with the list of requirement parameters and the source expression
"""
_use_sympy_cse = False
def __init__(self, expr: Expr, expanded_params=None, params=None, sobols=None):
"""Computes a lamdda function from expression and list of expected parameters.
you can provide either the list pf expanded parameters (full vars for enums) for the 'user' param names
"""
if isinstance(expr, dict):
# Come from JSON serialization
obj = expr
# LIst of required params for this lambda
self.params: List[str] = obj["params"]
# Full names
self.expanded_params = _expand_param_names(self.params)
local_dict = {x[0].name: x[0] for x in _user_functions.values()}
self.expr = parse_expr(obj["expr"], local_dict=local_dict)
self.lambd = _lambdify(self.expr, self.expanded_params)
self.sobols = obj["sobols"]
else:
self.expr = expr
self.params = params
if expanded_params is None:
if params is None:
expanded_params = _free_symbols(expr)
params = _expanded_names_to_names(expanded_params)
self.params = params
# We expand again the parameters
# If we expect an enum param name, we also expect the other ones :
# enumparam_val1 => enumparam_val1, enumparam_val2, ...
expanded_params = _expand_param_names(params)
elif self.params is None:
self.params = _expanded_names_to_names(expanded_params)
self.lambd = _lambdify(expr, expanded_params)
self.expanded_params = expanded_params
self.sobols = sobols
@property
def has_axis(self):
return isinstance(self.expr, AxisDict)
@property
def axis_keys(self):
if self.has_axis:
return self.expr.str_keys()
else:
return None
def compute(self, **params) -> ValueContext:
"""Compute result value based of input parameters"""
# Add default or computed values
completed_params = _complete_params(params, self.params)
# Expand enums
expanded_params = _expand_params(completed_params)
# Remove parameters that are not required
expanded_params = _filter_param_values(expanded_params, self.expanded_params)
value = self.lambd(**expanded_params)
return ValueContext(value=value, context=completed_params)
def serialize(self):
expr = str(self.expr)
return dict(params=self.params, expr=expr, sobols=self.sobols)
@staticmethod
def use_sympy_cse(b=True):
LambdaWithParamNames._use_sympy_cse = b
def __repr__(self):
return repr(self.expr)
def _repr_latex_(self):
return self.expr._repr_latex_()
def __getstate__(self):
"""For pickling/unpicling"""
state = self.__dict__.copy()
# Retirer les attributs temporaires
if "lambd" in state:
del state["lambd"]
return state
def __setstate__(self, state):
"""For pickling/unpicling"""
self.__dict__.update(state)
self.lambd = _lambdify(self.expr, self.expanded_params)
def _preMultiLCAAlgebric(
model: ActivityExtended, methods: MethodKey, alpha: ValueOrExpression = 1, axis=None
) -> list[LambdaWithParamNames]:
"""
This method transforms an activity into a set of functions ready to compute LCA very fast on a set on methods.
You may use is and pass the result to postMultiLCAAlgebric for fast computation on a model that does not change.
This method is used by multiLCAAlgebric
"""
with DbContext(model):
if isinstance(alpha, Quantity):
alpha = alpha.magnitude
def _key(method):
return (model, axis, method, alpha)
with ExprCache(model["database"]) as cache:
missing_methods = [method for method in methods if not _key(method) in cache.data]
if len(missing_methods) > 0:
exprs = _modelToExpr(model, methods=missing_methods, axis=axis)
for method, expr in zip(missing_methods, exprs):
cache.data[_key(method)] = LambdaWithParamNames(expr * alpha)
# At this point, everything is in cache
# REturn the list in order
return list(cache.data[_key(method)] for method in methods)
def _modelToExpr(model: Activity, methods: List[MethodKey], axis=None):
"""
Compute expressions corresponding to a model for each impact, replacing activities by the value of its impact
Return
------
<list of expressions (one per impact)>, <list of required param names>
"""
expr_by_bg_act = _actToExpressionDict(model, axis=axis)
# Keep them in same order
bg_acts = list(expr_by_bg_act.keys())
# Compute LCA for background activities
impacts = _multiLCAWithCache(bg_acts, methods)
# Create numpy matrix of impact values
# Rows are methods, columns are bg acts
impact_matrix = np.array([[impacts[bg_act, method] for bg_act in bg_acts] for method in methods])
# Create immutable sympy vector of bg expression, in the same order :
bg_expr_vector = ImmutableMatrix([[expr_by_bg_act[bg_act]] for bg_act in bg_acts])
# Multiply the two => returns a vector of impact expression
impacts_vector = impact_matrix * bg_expr_vector
def _get_expr(i):
if len(bg_acts) == 0:
return 0.0
else:
return impacts_vector[i]
# For each method, compute an algebric expression with activities replaced by their values
return [_get_expr(i) for i, method in enumerate(methods)]
def _filter_param_values(params, expanded_param_names):
return {key: val for key, val in params.items() if key in expanded_param_names}
def _free_symbols(expr: Basic):
if isinstance(expr, Basic):
return set([str(symb) for symb in expr.free_symbols])
else:
# Static value
return set()
def _lambdify(expr: Basic, expanded_params):
"""Lambdify, handling manually the case of SymDict (for impacts by axis)"""
printer = NumPyPrinter(
{"fully_qualified_modules": False, "inline": True, "allow_unknown_functions": True, "user_functions": dict()}
)
modules = [{x[0].name: x[1] for x in _user_functions.values()}, "numpy"]
if isinstance(expr, Basic):
lambd = lambdify(expanded_params, expr, modules, printer=printer, cse=LambdaWithParamNames._use_sympy_cse)
def func(*arg, **kwargs):
res = lambd(*arg, **kwargs)
if isinstance(res, dict):
# Transform key symbols into Str
return {str(k): v for k, v in res.items()}
else:
return res
return func
else:
# Not an expression : return static func
def static_func(*args, **kargs):
return expr
return static_func
def _slugify(str):
return re.sub("[^0-9a-zA-Z]+", "_", str)
@dataclass
class ResultsWithParams:
"""Holds bith the result with context parameters"""
dataframe: pd.DataFrame
params: Dict
def _postMultiLCAAlgebric(methods, lambdas: List[LambdaWithParamNames], with_params=False, unit: Unit = None, **params):
"""
Compute LCA for a given set of parameters and pre-compiled lambda functions.
This function is used by **multiLCAAlgebric**
Parameters
----------
methodAndLambdas : Output of preMultiLCAAlgebric
**params : Parameters of the model
"""
param_length = _compute_param_length(params)
# lambda are SymDict ?
# If use them as number of params
if lambdas[0].has_axis:
if param_length > 1:
raise Exception("Multi params cannot be used together with 'axis'")
param_length = len(lambdas[0].axis_keys)
# Init output
res = np.zeros((len(methods), param_length), float)
# All params
context_params = dict()
# Compute result on whole vectors of parameter samples at a time : lambdas use numpy for vector computation
def process(lamba):
nonlocal context_params
value_context = lambd.compute(**params)
# Update the param values used
context_params.update(value_context.context)
value = value_context.value
# Expand axis values as a list, to fit into the result numpy array
if isinstance(value, dict):
# Ensure the values are in the same order as the value
# XXX We use the order of the first lambda as each one might have different order
axes = lambdas[0].axis_keys
value = list(float(value[axis]) if axis in value else 0.0 for axis in axes)
return value
# Use multithread for that
for imethod, lambd in enumerate(lambdas):
value = process(lambd)
res[imethod, :] = value
result = pd.DataFrame(
res,
index=[method_name(method) + "[%s]" % method_unit(method, fu_unit=unit) for method in methods],
).transpose()
if with_params:
return ResultsWithParams(dataframe=result, params=context_params)
else:
return result
# Add default values for issing parameters or warn about extra params
def _filter_params(params, expected_names, model):
res = params.copy()
expected_params_names = _expanded_names_to_names(expected_names)
for expected_name in expected_params_names:
if expected_name not in params:
default = _param_registry()[expected_name].default
res[expected_name] = default
warn("Missing parameter %s, replaced by default value %s" % (expected_name, default))
for key, value in params.items():
if key not in expected_params_names:
del res[key]
if model:
warn("Param %s not required for model %s" % (key, model))
return res
def compute_value(formula, **params):
"""Compute actual value for a given formula, with possible parameters (or default ones)"""
if isinstance(formula, float) or isinstance(formula, int):
return formula
lambd = LambdaWithParamNames(formula)
value_context = lambd.compute(**params)
return value_context.value
@deprecated("multiLCAAlgebric is deprecated, use compute_impacts instead")
def multiLCAAlgebric(*args, **kwargs):
"""deprecated. `compute_impacts()` instead"""
warn("multiLCAAlgebric is deprecated, use compute_impacts instead")
return compute_impacts(*args, **kwargs)
def _params_dataframe(param_values: Dict[str, float]):
"""Create a DataFrame, ordered by group, showing param values"""
params_by_name = all_params()
records = []
plen = _compute_param_length(param_values)
for param_name, value in param_values.items():
param = params_by_name[param_name]
record = {
"group": param.group if param.group is not None else "",
"name": param.name,
"min": param.min,
"max": param.max,
"default": param.default,
}
if plen == 1:
record["value"] = value
else:
if isinstance(value, (list, np.ndarray)):
record.update({f"value_{i}": value for i, value in enumerate(value, 1)})
else:
# Repeat single value
record.update({f"value_{i}": value for i in range(1, plen + 1)})
records.append(record)
df = pd.DataFrame.from_records(records).set_index(["group", "name"]).sort_index()
return df
SingleOrMultipleFloat = Union[float, List[float], np.ndarray]
[docs]
def compute_inventory(
model: ActivityExtended,
functional_unit=1,
as_dict=False,
impact_method=None,
fields=["database", "name", "location", "unit"],
**params,
):
"""
This method computes the inventory of background activities for a given scenario / values of parameters.
Parameters
----------
model:
Root activity
functional_unit:
Quantitity to divide the inventory. 1 by default
as_dict:
If true, returns a dict of act => value. If false (default) returns a dataframe
fields:
List of fields to be added in the ouput dataframe
impact_method:
If provided, return the impact for each activity, rather that its quantity
params:
All other attributes are treated as values of lca_algebraic parameters.
If not specified, each parameters takes its default value.
Returns
-------
Dataframe or Dict of act => value
"""
with temp_settings(factorize_static_bg=False):
exprs_by_bg_act = _actToExpressionDict(model)
# Transform to dict of act => value
val_by_act = dict()
for bg_act, expr in exprs_by_bg_act.items():
val_by_act[bg_act] = compute_value(expr, **params) / functional_unit
if impact_method is not None:
# Compute LCA of background activities
impact_by_act = _multiLCAWithCache(val_by_act.keys(), [impact_method])
val_by_act: {act: value * impact_by_act[(act, impact_method)] for act, value in val_by_act.items()}
if as_dict:
return val_by_act
# Transform to dataframe
items = []
for act, value in val_by_act.items():
item = dict()
for field in fields:
if field in act:
item[field] = act[field]
item["value"] = value
items.append(item)
return DataFrame(items)
[docs]
def compute_impacts(
models,
methods,
axis=None,
functional_unit=1,
return_params=False,
description=None,
**params,
):
"""
Main parametric LCIA method :
Computes LCA by expressing the foreground model as symbolic expression of background activities and parameters.
Then, compute 'static' inventory of the referenced background activities.
This enables a very fast recomputation of LCA with different parameters, \
useful for stochastic evaluation of parametrized model
Parameters
----------
models :
Single model or
List of model or
List of (model, alpha)
or Dict of model:amount
In case of several models, you cannot use list of parameters
methods :
List of methods / impacts to consider
axis:
Designates the name of a custom attribute of foreground activities.
You may set this attribute using the method `myActivity.updateMeta(your_custom_attr="some_value")`
The impacts will be ventilated by this attribute.
This is useful to get impact by phase or sub-modules.
params:
Any other argument passed to this function is considered as a value of a parameter of the model :
Values can be either single float values, list or ndarray of values.
In the later case, all parameters should have the same number of values.
Paremeters that are not provided will have their default value set.
functional_unit:
Quantity (static or Sympy formula) by which to divide impacts. Optional, 1 by default.
return_params:
If true, also returns the value of all parameters in as tabbed DataFrame
description:
Optional description/metadata to be added in output when using "return params" Dataframe
Returns
-------
A dataframe with the results. If *return_params* is true, it returns `TabbedDataframe`,
including all parameters values, that can be saved as a multi sheet excel file.
Examples
--------
>>> compute_impacts(
>>> mainAct1, # The root activity of the foreground model
>>> [climate_change], # climate_change is the key (tuple) of the impact method
>>> functional_unit=energy_expression, # energy expression is a Sympy expression computing the energy in kWh
>>> axis="phase", # Split results by phase
>>> return_params=True, # Return all parameter values
>>>
>>> # Parameter values
>>> p1=2.0,
>>> p2=3.0)
"""
dfs = dict()
if isinstance(models, list):
def to_tuple(item):
if isinstance(item, tuple):
return item
else:
return (item, 1)
models = dict(to_tuple(item) for item in models)
elif not isinstance(models, dict):
models = {models: 1}
# Gather all param values (even default and computed)
params_all = dict()
# Single method provided ?
if isinstance(methods, tuple):
methods = [methods]
for model, alpha in models.items():
if type(model) is tuple:
model, alpha = model
alpha = float(alpha)
dbname = model.key[0]
with DbContext(dbname):
# Check no params are passed for FixedParams
for key in params:
if key in _fixed_params():
warn("Param '%s' is marked as FIXED, but passed in parameters : ignored" % key)
if functional_unit != 1:
alpha = alpha / functional_unit
lambdas = _preMultiLCAAlgebric(model, methods, alpha=alpha, axis=axis)
unit: Optional[Unit] = functional_unit.units if isinstance(functional_unit, Quantity) else None
res = _postMultiLCAAlgebric(methods, lambdas, with_params=return_params, unit=unit, **params)
if return_params:
df = res.dataframe
params_all.update(res.params)
else:
df = res
model_name = _actName(model)
while model_name in dfs:
model_name += "'"
# param with several values
list_params = {k: vals for k, vals in params.items() if isinstance(vals, list)}
# Shapes the output / index according to the axis or multi param entry
if axis:
df[axis] = lambdas[0].axis_keys
df = df.set_index(axis)
df.index.set_names([axis])
# Filter out line with zero output
df = df.loc[
df.apply(
lambda row: not (row.name is None and row.values[0] == 0.0),
axis=1,
)
]
# Rename "None" to others
df = df.rename(index={None: "_other_"})
# Sort index
df.sort_index(inplace=True)
# Add "total" line
df.loc["*sum*"] = df.sum(numeric_only=True)
elif len(list_params) > 0:
for k, vals in list_params.items():
df[k] = vals
df = df.set_index(list(list_params.keys()))
else:
# Single output ? => give the single row the name of the model activity
df = df.rename(index={0: model_name})
dfs[model_name] = df
if len(dfs) == 1:
df = list(dfs.values())[0]
else:
# Concat several dataframes for several models
df = pd.concat(list(dfs.values()))
if return_params:
metadata = {"Models": str(models), "Functional unit": functional_unit}
if description:
metadata["Description"] = description
return TabbedDataframe(metadata=metadata, Results=df, Parameters=_params_dataframe(params_all))
else:
return df
def _isBioAct(act: ActivityExtended):
db_name = act["database"]
return (BIOSPHERE_PREFIX in db_name) or ("type" in act and act["type"] in ["emission", "natural resource"])
def _getOrCreateProxyDb(db_name):
"""Init proxy db to biosphere if not done yet"""
proxyname = db_name + "-proxy"
if proxyname not in bw.databases:
db = bw.Database(proxyname)
db.write(dict())
_setMeta(proxyname, PROXY_DB_FLAG, True)
return proxyname
def _getOrCreateProxy(act: ActivityExtended, exchanges: Dict[ActivityExtended, float]):
proxy_db = _getOrCreateProxyDb(act["database"])
proxy_code = act["code"] + "#proxy"
with temp_settings(strict_mode=False):
try:
proxy = _getDb(proxy_db).get(proxy_code)
# Check exchanges
existing_exchanges = {ex[1]: ex[2] for ex in proxy.listExchanges()}
if existing_exchanges != exchanges:
info(f"Proxy exchanges differ in {proxy['name']}, updating them")
if len(existing_exchanges) > 0:
proxy.deleteExchanges()
proxy.addExchanges(exchanges)
except DoesNotExist:
name = act["name"] + " # proxy"
# Create biosphere proxy in User Db
proxy = newActivity(
db_name=proxy_db,
name=name,
code=proxy_code,
switchActivity=True,
isProxy=True,
unit=act["unit"],
exchanges=exchanges,
)
return proxy
def _createTechProxysForBio(acts: List[ActivityExtended]) -> Dict[ActivityExtended, ActivityExtended]:
"""
Potentially create tech proxys for bio activity (brightway cannot proces LCIA on them
Returns a dict of [OriginalAct -> OriginalOrProxyAct]
"""
res = dict()
for act in acts:
res[act] = act if not _isBioAct(act) else _getOrCreateProxy(act, {act: 1})
return res
def _replace_fixed_params(expr, fixed_params, fixed_mode=FixedParamMode.DEFAULT):
"""Replace fixed params with their value."""
if not isinstance(expr, Basic):
return expr
sub = {key: val for param in fixed_params for key, val in param.expandParams(param.stat_value(fixed_mode)).items()}
sub = _toSymbolDict(sub)
return expr.xreplace(sub)
def _get_axis(act, axis_name: str):
"""Safe"""
tag = act.get(axis_name, None)
if tag is None:
return None
if tag.isalnum():
return tag
else:
return re.sub("[^0-9a-zA-Z]+", "_", tag)
def _force_reduce(expr):
"""Force reduction of sum and multiplication : usefull for AxisDict"""
if isinstance(expr, AxisDict):
return AxisDict({key: _force_reduce(val) for key, val in expr._dict.items()})
if isinstance(expr, dict):
return _force_reduce(AxisDict(expr))
if isinstance(expr, Add):
res = 0.0
for arg in expr.args:
res += _force_reduce(arg)
return res
if isinstance(expr, Mul):
res = 1.0
for arg in expr.args:
res *= _force_reduce(arg)
return res
return expr
def _clean_expr(expr):
expr = _force_reduce(expr)
return _replace_fixed_params(expr, _fixed_params().values())
def _solve_expression(
fg_matrix: ActMatrix, bg_matrix: ActMatrix
) -> Dict[ActivityExtended, Dict[ActivityExtended, ValueOrExpression]]:
"""solve the foreground inventory. Retrurn dict o"""
A = fg_matrix.to_sympy()
B = bg_matrix.to_sympy()
debug(f"FG matrix : {A}")
# BG
if len(bg_matrix.cols_acts()) == 0:
# Case of empty matrix
res_mat = ImmutableMatrix([[]])
else:
# Use fast inversion of sparse matrices
inv_A = invert(A)
res_mat = inv_A * B
# Transform to dict of dict
res = dict()
for i_fg, fg_act in enumerate(fg_matrix.cols_acts()):
row = dict()
res[fg_act] = row
for i_bg, bg_act in enumerate(bg_matrix.cols_acts()):
val = res_mat[i_fg, i_bg]
if val == 0:
continue
row[bg_act] = _clean_expr(val)
return res
def _computeDbExpressions(db_name, axis=None) -> Dict[Activity, Dict[Activity, ValueOrExpression]]:
"""
Compute all expressions for a given DB
Returns Dict[FgAct => Dict[BGAct => Expressions]]
"""
if not _isForeground(db_name):
raise ValueError(f"Can only compute expression on foreground activities. {db_name} is background")
# Square technospere matrix dict of <act1, act2> => float
fg_matrix = ActMatrix()
# Rectangle matrix
bg_matrix = ActMatrix()
# Fill the matrices
def fill_matrices_rec(act: Activity, axis_tag=None):
# Update axis values
if axis is not None:
new_axis_val = _get_axis(act, axis)
if new_axis_val is not None:
axis_tag = new_axis_val
if not _isForeground(act["database"]):
# We reached a background DB ? => stop developping and create reference to activity
return
if act in fg_matrix.row_acts():
# Already explored
return
# Add current act to axes matrices, to keep correct shape
fg_matrix.add_row(act)
fg_matrix.add_col(act)
bg_matrix.add_row(act)
static_bg_amounts = dict()
for exch in act.exchanges():
amount = _getAmountOrFormula(exch)
if isinstance(amount, FunctionType):
# Some amounts in EIDB are functions ... we ignore them
continue
# Fetch activity referenced by the exchange
input_db, input_code = exch["input"]
sub_act = _getDb(input_db).get(input_code)
# Fill the appropriate matrix
if _isForeground(input_db):
# Production exchange
if not _isOutputExch(exch):
amount = -amount
fg_matrix[act, sub_act] += amount
# Recursively explore the rest
fill_matrices_rec(sub_act, axis_tag)
else:
# Only tag bg values
if axis_tag is not None:
amount = AxisDict({axis_tag: amount})
if Settings.factorize_static_bg and _isnumber(amount):
static_bg_amounts[sub_act] = amount
else:
bg_matrix[act, sub_act] += amount
# Static bg amount not empty ? create and reference it
if len(static_bg_amounts) > 0:
proxy_act = _getOrCreateProxy(act, static_bg_amounts)
bg_matrix[act, proxy_act] = 1.0
# Fill the matrices exploring everything
for act in bw.Database(db_name):
fill_matrices_rec(act)
# solve the linear equation algebically
return _solve_expression(fg_matrix, bg_matrix)
def _reverse_dict(dic):
return {v: k for k, v in dic.items()}