Source code for openpnm.core._models

import logging
import inspect
import openpnm as op
import numpy as np
from copy import deepcopy
from openpnm.utils import (
    PrintableDict,
    Workspace,
    is_valid_propname,
)


logger = logging.getLogger(__name__)
ws = Workspace()


__all__ = [
    'ModelsDict',
    'ModelWrapper',
    'ModelsMixin2',
]


[docs] class ModelsDict(PrintableDict): """ This subclassed dictionary is assigned to the ``models`` attribute of all objects that inherit from the ``ModelsMixin`` class. Each dictionary entry corresponds to an entry in the target object's dictionary, and contains the models and associated parameters for generating the model. The main features of this subclass are three methods the help resolve the order in which models should be called: ``dependency_list``, ``dependency_graph``, and ``dependency_map``. """ def _find_target(self): """ Finds and returns the target object to which this ModelsDict is associated. """ for proj in ws.values(): for obj in proj: if hasattr(obj, "models"): if obj.models is self: return obj raise Exception("No target object found!")
[docs] def dependency_list(self): r""" Returns a list of dependencies in the order with which they should be called to ensure data is calculated by one model before it's asked for by another. Notes ----- This raises an exception if the graph has cycles which means the dependencies are unresolvable (i.e. there is no order which the models can be called that will work). In this case it is possible to visually inspect the graph using ``dependency_graph``. See Also -------- dependency_graph dependency_map """ import networkx as nx dtree = self.dependency_graph() cycles = list(nx.simple_cycles(dtree)) if cycles: msg = 'Cyclic dependency: ' + ' -> '.join(cycles[0] + [cycles[0][0]]) raise Exception(msg) d = nx.algorithms.dag.lexicographical_topological_sort(dtree, sorted) return list(d)
[docs] def dependency_graph(self, deep=False): """ Returns a NetworkX graph object of the dependencies Parameters ---------- deep : bool, optional Defines whether intra- or inter-object dependency graph is desired. Default is False, i.e. only returns dependencies within the object. See Also -------- dependency_list dependency_map """ import networkx as nx dtree = nx.DiGraph() models = list(self.keys()) for model in models: propname = model.split("@")[0] dtree.add_node(propname) # Filter pore/throat props only args = op.utils.flat_list(self[model].values()) dependencies = [arg for arg in args if is_valid_propname(arg)] # Add dependency from model's parameters for d in dependencies: dtree.add_edge(d, propname) return dtree
[docs] def dependency_map(self, ax=None, figsize=None, deep=False, style='shell'): # pragma: no cover """ Create a graph of the dependency graph in a decent format Parameters ---------- ax : matplotlib.axis, optional Matplotlib axis object on which dependency map is to be drawn. figsize : tuple, optional Tuple containing frame size. See Also -------- dependency_graph dependency_list """ import networkx as nx import matplotlib.pyplot as plt if ax is None: fig, ax = plt.subplots() if figsize is not None: fig.set_size_inches(figsize) labels = {} node_shapes = {} dtree = self.dependency_graph(deep=deep) for node in dtree.nodes: labels[node] = node.split(".")[1] node_shapes[node] = "o" if node.startswith("pore") else "s" nx.set_node_attributes(dtree, node_shapes, "node_shape") layout = getattr(nx, f"{style}_layout") pos = layout(dtree) Pprops = [prop for prop in dtree.nodes if prop.startswith("pore")] Tprops = [prop for prop in dtree.nodes if prop.startswith("throat")] colors = ["yellowgreen", "coral"] shapes = ["o", "s"] for props, color, shape in zip([Pprops, Tprops], colors, shapes): nx.draw( dtree, pos=pos, nodelist=props, node_shape=shape, labels=labels, with_labels=True, edge_color='lightgrey', node_color=color, font_size=12, width=2.0 ) ax = plt.gca() ax.margins(x=0.2, y=0.05) return ax
@property def _info(self): # Pragma: no cover r""" Prints a nicely formatted list of model names and the domain to which they apply. Notes ----- This is a hidden function for now, but could be exposed if useful. """ names = {} for item in self: name, _, domain = item.partition('@') if name not in names.keys(): names[name] = [] names[name].append(domain) D = PrintableDict(names, key='Model', value='Domain') print(D) def __str__(self): # pragma: no cover horizontal_rule = '―' * 85 lines = [horizontal_rule] strg = '{0:<3s} {1:<35s} {2:<25s} {3}' lines.append(strg.format('#', 'Property Name', 'Parameter', 'Value')) lines.append(horizontal_rule) for i, item in enumerate(self.keys()): temp = self[item].copy() regen_mode = temp.pop('regen_mode', None) model = str(temp.pop('model')).split(' ')[1] lines.append(strg.format(str(i+1), item, 'model:', model)) for param in temp.keys(): lines.append(strg.format('', '', param+':', temp[param])) lines.append(strg.format('', '', 'regeneration mode:', regen_mode)) lines.append(horizontal_rule) return '\n'.join(lines) def __delitem__(self, key): if '@' in key: super().__delitem__(key) else: # Delete all models with the same prefix for item in list(self.keys()): if item.startswith(key): super().__delitem__(item) def __getitem__(self, key): try: return super().__getitem__(key) except KeyError: d = PrintableDict(key='Model', value='Args') for k, v in self.items(): if k.startswith(key+'@'): d[k] = v if len(d) > 0: return d else: raise KeyError(key)
[docs] def update(self, d, domain=None): # Catch un-run function if hasattr(d, '__call__'): raise Exception('Received dict argument is a function, try running it') parent = self._find_target() for k, v in d.items(): parent.add_model(propname=k, domain=domain, **v)
[docs] class ModelWrapper(dict): """ This class is used to hold individual models and provide some extra functionality, such as pretty-printing and the ability to run itself. """
[docs] def __call__(self): model = self['model'] kwargs = {} for k, v in self.items(): if k not in ['model', 'regen_mode']: kwargs[k] = v return model(self.target, **kwargs)
@property def name(self): for proj in ws.values(): for obj in proj: if hasattr(obj, 'models'): for key, mod in obj.models.items(): if mod is self: return key @property def propname(self): return self.name.split('@')[0] @property def domain(self): element, prop = self.name.split('.', 1) prop, domain = prop.split('@') return element + '.' + domain def __str__(self): # pragma: no cover horizontal_rule = '―' * 78 lines = [horizontal_rule] strg = '{0:<25s} {1:<25s} {2}' lines.append(strg.format('Property Name', 'Parameter', 'Value')) lines.append(horizontal_rule) temp = self.copy() regen_mode = temp.pop('regen_mode', None) model = str(temp.pop('model')).split(' ')[1] lines.append(strg.format(self.propname, 'model:', model)) for param in temp.keys(): lines.append(strg.format('', param+':', temp[param])) lines.append(strg.format('', 'regeneration mode:', regen_mode)) lines.append(horizontal_rule) return '\n'.join(lines) @property def target(self): """ Finds and returns the object to which this model is assigned """ for proj in ws.values(): for obj in proj: if hasattr(obj, "models"): for mod in obj.models.values(): if mod is self: return obj raise Exception("No target object found!")
[docs] class ModelsMixin2: r""" This class is added to ``Network`` and ``Phase`` objects under the ``models`` attribute. It provides the functionality for storing and running pore-scale models. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.models = ModelsDict()
[docs] def add_model(self, propname, model, domain='all', regen_mode='normal', **kwargs): r""" Add a pore-scale model to the object, along with the desired arguments Parameters ---------- propname : str The name of the property being computed. E.g. if ``propname='pore.diameter'`` then the computed results will be stored in ``obj['pore.diameter']``. model : function handle The function that produces the values domain : str The label indicating the locations in which results generated by ``model`` should be stored. See `Notes` for more details. regen_mode : str How the model should be regenerated. Options are: ============ ===================================================== regen_mode description ============ ===================================================== normal (default) The model is run immediately upon being added, and is also run each time ``regenerate_models`` is called. deferred The model is NOT run when added, but is run each time ``regenerate_models`` is called. This is useful for models that depend on other data that may not exist yet. constant The model is run immediately upon being added, but is is not run when ``regenerate_models`` is called, effectively turning the property into a constant. ============ ===================================================== kwargs : keyword arguments All additional keyword arguments are passed on to the model Notes ----- The ``domain`` argument dictates where the results of ``model`` should be stored. For instance, given ``propname='pore.diameter'`` and ``domain='left'`` then when `model` is run, the results are stored in in the pores labelled left. Note that if ``model`` returns ``Np`` values, then values not belonging to ``'pore.left'`` are discarded. The following steps outline the process: 1. Find the pore indices: .. code-block:: python Ps = obj.pores('left') 2. Run the model: .. code-block:: python vals = model(**kwargs) 3. If the model returns a full Np-length array, then extract the correct values and apply them to the corresponding locations: .. code-block:: python if len(vals) == obj.Np: obj['pore.diameter'][Ps] = vals[Ps] 4. If the model was designed to return only the subset of values then: .. code-block:: python if len(vals) == obj.num_pores('left'): obj['pore.diameter'][Ps] = vals """ if '@' in propname: propname, domain = propname.split('@') elif domain is None: domain = self.settings['default_domain'] element, prop = propname.split('.', 1) domain = domain.split('.', 1)[-1] # Add model and regen_mode to kwargs dictionary kwargs.update({'model': model, 'regen_mode': regen_mode}) # Insepct model to extract arguments and default values kwargs.update(self._inspect_model(model, kwargs)) self.models[propname+'@'+domain] = ModelWrapper(**kwargs) if regen_mode != 'deferred': self.run_model(propname+'@'+domain)
def _inspect_model(self, model, kwargs={}): if model.__defaults__: vals = list(inspect.getfullargspec(model).defaults) keys = inspect.getfullargspec(model).args[-len(vals):] for k, v in zip(keys, vals): # Put defaults into kwargs if k not in kwargs: # Skip if argument was given in kwargs kwargs.update({k: v}) return kwargs
[docs] def add_model_collection(self, models, domain='all', regen_mode='deferred'): r""" Add a ``collection`` of several models at once Parameters ---------- models : dict The collection of models to add. regen_mode : str By default the models are not regenerated upon addition. See the docstring for ``add_model`` for more information. domain : str The label indicating which locations the supplied collection of models should be applied to. Notes ----- Collections are dictionaries that are formatted the same as ``obj.models``. Several model collections are available in ``openpnm.models.collections``. """ models = deepcopy(models) for k, v in models.items(): if 'domain' not in v.keys(): v['domain'] = domain if 'regen_mode' not in v.keys(): v['regen_mode'] = regen_mode self.add_model(propname=k, **v)
[docs] def regenerate_models(self, propnames=None, exclude=[]): r""" Runs all the models stored in the object's ``models`` attribute Parameters ---------- propnames : list of strings If given then only the specified models are run exclude : list of strings If given then these models will *not* be run Notes ----- This function will ensure that models are called in the correct order such that 'pore.diameter' will be run before 'pore.volume', since the diameter is required to compute the volume. """ all_models = self.models.dependency_list() # Regenerate all properties by default if propnames is None: propnames = all_models else: propnames = np.atleast_1d(propnames).tolist() # Remove any that are specifically excluded propnames = np.setdiff1d(propnames, exclude).tolist() # Reorder given propnames according to dependency tree tmp = [e.split("@")[0] for e in propnames] idx_sorted = [all_models.index(e) for e in tmp] propnames = [elem for i, elem in sorted(zip(idx_sorted, propnames))] # Now run each on in sequence for item in propnames: try: self.run_model(item) except KeyError as e: msg = (f"{item} was not run since the following property" f" is missing: {e}") logger.warning(msg) self.models[item]['regen_mode'] = 'deferred'
[docs] def run_model(self, propname, domain=None): r""" Runs the requested model and places the result into the correct locations Parameters ---------- propname : str The name of the model to run. domain : str The label of the domain for which the model should be run. Passing ``propname='pore.diameter@domain1`` and ``domain=None`` is equivalent to passing ``propname='pore.diameter`` and ``domain=domain1``. Passing ``domain=None`` will regenerate all models starting with ``propname``. """ if domain is None: if '@' in propname: # Get domain from propname if present propname, _, domain = propname.partition('@') self.run_model(propname=propname, domain=domain) else: # No domain means run model for ALL domains for item in self.models.keys(): if item.startswith(propname+"@"): _, _, domain = item.partition("@") self.run_model(propname=propname, domain=domain) else: # domain was given explicitly domain = domain.split('.', 1)[-1] element, prop = propname.split('@')[0].split('.', 1) propname = f'{element}.{prop}' mod_dict = self.models[propname+'@'+domain] # Collect kwargs kwargs = {'domain': f'{element}.{domain}'} for item in mod_dict.keys(): if item not in ['model', 'regen_mode']: kwargs[item] = mod_dict[item] # Deal with models that don't have domain argument yet if 'domain' not in inspect.getfullargspec(mod_dict['model']).args: _ = kwargs.pop('domain', None) vals = mod_dict['model'](self, **kwargs) if isinstance(vals, dict): # Handle models that return a dict for k, v in vals.items(): v = np.atleast_1d(v) if v.shape[0] == 1: # Returned item was a scalar v = np.tile(v, self._count(element)) vals[k] = v[self[f'{element}.{domain}']] elif isinstance(vals, (int, float)): # Handle models that return a float vals = np.atleast_1d(vals) else: # Index into full domain result for use below vals = vals[self[f'{element}.{domain}']] else: # Model that accepts domain arg vals = mod_dict['model'](self, **kwargs) # Finally add model results to self if isinstance(vals, np.ndarray): # If model returns single array if propname not in self.keys(): temp = self._initialize_empty_array_like(vals, element) self[f'{element}.{prop}'] = temp self[propname][self[f'{element}.{domain}']] = vals elif isinstance(vals, dict): # If model returns a dict of arrays for k, v in vals.items(): if f'{propname}.{k}' not in self.keys(): temp = self._initialize_empty_array_like(v, element) self[f'{propname}.{k}'] = temp self[f'{propname}.{k}'][self[f'{element}.{domain}']] = v