[docs]classModelsDict(PrintableDict):""" This subclassed dictionary is assigned to the ``models`` attribute of all objects that inherit from the ``ModelsMixin`` class. Each dictionary entry corresponds to an entry in the target object's dictionary, and contains the models and associated parameters for generating the model. The main features of this subclass are three methods the help resolve the order in which models should be called: ``dependency_list``, ``dependency_graph``, and ``dependency_map``. """def_find_target(self):""" Finds and returns the target object to which this ModelsDict is associated. """forprojinws.values():forobjinproj:ifhasattr(obj,"models"):ifobj.modelsisself:returnobjraiseException("No target object found!")
[docs]defdependency_list(self):r""" Returns a list of dependencies in the order with which they should be called to ensure data is calculated by one model before it's asked for by another. Notes ----- This raises an exception if the graph has cycles which means the dependencies are unresolvable (i.e. there is no order which the models can be called that will work). In this case it is possible to visually inspect the graph using ``dependency_graph``. See Also -------- dependency_graph dependency_map """importnetworkxasnxdtree=self.dependency_graph()cycles=list(nx.simple_cycles(dtree))ifcycles:msg='Cyclic dependency: '+' -> '.join(cycles[0]+[cycles[0][0]])raiseException(msg)d=nx.algorithms.dag.lexicographical_topological_sort(dtree,sorted)returnlist(d)
[docs]defdependency_graph(self,deep=False):""" Returns a NetworkX graph object of the dependencies Parameters ---------- deep : bool, optional Defines whether intra- or inter-object dependency graph is desired. Default is False, i.e. only returns dependencies within the object. See Also -------- dependency_list dependency_map """importnetworkxasnxdtree=nx.DiGraph()models=list(self.keys())formodelinmodels:propname=model.split("@")[0]dtree.add_node(propname)# Filter pore/throat props onlyargs=op.utils.flat_list(self[model].values())dependencies=[argforarginargsifis_valid_propname(arg)]# Add dependency from model's parametersfordindependencies:dtree.add_edge(d,propname)returndtree
[docs]defdependency_map(self,ax=None,figsize=None,deep=False,style='shell'):# pragma: no cover""" Create a graph of the dependency graph in a decent format Parameters ---------- ax : matplotlib.axis, optional Matplotlib axis object on which dependency map is to be drawn. figsize : tuple, optional Tuple containing frame size. See Also -------- dependency_graph dependency_list """importnetworkxasnximportmatplotlib.pyplotaspltifaxisNone:fig,ax=plt.subplots()iffigsizeisnotNone:fig.set_size_inches(figsize)labels={}node_shapes={}dtree=self.dependency_graph(deep=deep)fornodeindtree.nodes:labels[node]=node.split(".")[1]node_shapes[node]="o"ifnode.startswith("pore")else"s"nx.set_node_attributes(dtree,node_shapes,"node_shape")layout=getattr(nx,f"{style}_layout")pos=layout(dtree)Pprops=[propforpropindtree.nodesifprop.startswith("pore")]Tprops=[propforpropindtree.nodesifprop.startswith("throat")]colors=["yellowgreen","coral"]shapes=["o","s"]forprops,color,shapeinzip([Pprops,Tprops],colors,shapes):nx.draw(dtree,pos=pos,nodelist=props,node_shape=shape,labels=labels,with_labels=True,edge_color='lightgrey',node_color=color,font_size=12,width=2.0)ax=plt.gca()ax.margins(x=0.2,y=0.05)returnax
@propertydef_info(self):# Pragma: no coverr""" Prints a nicely formatted list of model names and the domain to which they apply. Notes ----- This is a hidden function for now, but could be exposed if useful. """names={}foriteminself:name,_,domain=item.partition('@')ifnamenotinnames.keys():names[name]=[]names[name].append(domain)D=PrintableDict(names,key='Model',value='Domain')print(D)def__str__(self):# pragma: no coverhorizontal_rule='―'*85lines=[horizontal_rule]strg='{0:<3s}{1:<35s}{2:<25s}{3}'lines.append(strg.format('#','Property Name','Parameter','Value'))lines.append(horizontal_rule)fori,iteminenumerate(self.keys()):temp=self[item].copy()regen_mode=temp.pop('regen_mode',None)model=str(temp.pop('model')).split(' ')[1]lines.append(strg.format(str(i+1),item,'model:',model))forparamintemp.keys():lines.append(strg.format('','',param+':',temp[param]))lines.append(strg.format('','','regeneration mode:',regen_mode))lines.append(horizontal_rule)return'\n'.join(lines)def__delitem__(self,key):if'@'inkey:super().__delitem__(key)else:# Delete all models with the same prefixforiteminlist(self.keys()):ifitem.startswith(key):super().__delitem__(item)def__getitem__(self,key):try:returnsuper().__getitem__(key)exceptKeyError:d=PrintableDict(key='Model',value='Args')fork,vinself.items():ifk.startswith(key+'@'):d[k]=viflen(d)>0:returndelse:raiseKeyError(key)
[docs]defupdate(self,d,domain=None):# Catch un-run functionifhasattr(d,'__call__'):raiseException('Received dict argument is a function, try running it')parent=self._find_target()fork,vind.items():parent.add_model(propname=k,domain=domain,**v)
[docs]classModelWrapper(dict):""" This class is used to hold individual models and provide some extra functionality, such as pretty-printing and the ability to run itself. """
@propertydefname(self):forprojinws.values():forobjinproj:ifhasattr(obj,'models'):forkey,modinobj.models.items():ifmodisself:returnkey@propertydefpropname(self):returnself.name.split('@')[0]@propertydefdomain(self):element,prop=self.name.split('.',1)prop,domain=prop.split('@')returnelement+'.'+domaindef__str__(self):# pragma: no coverhorizontal_rule='―'*78lines=[horizontal_rule]strg='{0:<25s}{1:<25s}{2}'lines.append(strg.format('Property Name','Parameter','Value'))lines.append(horizontal_rule)temp=self.copy()regen_mode=temp.pop('regen_mode',None)model=str(temp.pop('model')).split(' ')[1]lines.append(strg.format(self.propname,'model:',model))forparamintemp.keys():lines.append(strg.format('',param+':',temp[param]))lines.append(strg.format('','regeneration mode:',regen_mode))lines.append(horizontal_rule)return'\n'.join(lines)@propertydeftarget(self):""" Finds and returns the object to which this model is assigned """forprojinws.values():forobjinproj:ifhasattr(obj,"models"):formodinobj.models.values():ifmodisself:returnobjraiseException("No target object found!")
[docs]classModelsMixin2:r""" This class is added to ``Network`` and ``Phase`` objects under the ``models`` attribute. It provides the functionality for storing and running pore-scale models. """def__init__(self,*args,**kwargs):super().__init__(*args,**kwargs)self.models=ModelsDict()
[docs]defadd_model(self,propname,model,domain='all',regen_mode='normal',**kwargs):r""" Add a pore-scale model to the object, along with the desired arguments Parameters ---------- propname : str The name of the property being computed. E.g. if ``propname='pore.diameter'`` then the computed results will be stored in ``obj['pore.diameter']``. model : function handle The function that produces the values domain : str The label indicating the locations in which results generated by ``model`` should be stored. See `Notes` for more details. regen_mode : str How the model should be regenerated. Options are: ============ ===================================================== regen_mode description ============ ===================================================== normal (default) The model is run immediately upon being added, and is also run each time ``regenerate_models`` is called. deferred The model is NOT run when added, but is run each time ``regenerate_models`` is called. This is useful for models that depend on other data that may not exist yet. constant The model is run immediately upon being added, but is is not run when ``regenerate_models`` is called, effectively turning the property into a constant. ============ ===================================================== kwargs : keyword arguments All additional keyword arguments are passed on to the model Notes ----- The ``domain`` argument dictates where the results of ``model`` should be stored. For instance, given ``propname='pore.diameter'`` and ``domain='left'`` then when `model` is run, the results are stored in in the pores labelled left. Note that if ``model`` returns ``Np`` values, then values not belonging to ``'pore.left'`` are discarded. The following steps outline the process: 1. Find the pore indices: .. code-block:: python Ps = obj.pores('left') 2. Run the model: .. code-block:: python vals = model(**kwargs) 3. If the model returns a full Np-length array, then extract the correct values and apply them to the corresponding locations: .. code-block:: python if len(vals) == obj.Np: obj['pore.diameter'][Ps] = vals[Ps] 4. If the model was designed to return only the subset of values then: .. code-block:: python if len(vals) == obj.num_pores('left'): obj['pore.diameter'][Ps] = vals """if'@'inpropname:propname,domain=propname.split('@')elifdomainisNone:domain=self.settings['default_domain']element,prop=propname.split('.',1)domain=domain.split('.',1)[-1]# Add model and regen_mode to kwargs dictionarykwargs.update({'model':model,'regen_mode':regen_mode})# Insepct model to extract arguments and default valueskwargs.update(self._inspect_model(model,kwargs))self.models[propname+'@'+domain]=ModelWrapper(**kwargs)ifregen_mode!='deferred':self.run_model(propname+'@'+domain)
def_inspect_model(self,model,kwargs={}):ifmodel.__defaults__:vals=list(inspect.getfullargspec(model).defaults)keys=inspect.getfullargspec(model).args[-len(vals):]fork,vinzip(keys,vals):# Put defaults into kwargsifknotinkwargs:# Skip if argument was given in kwargskwargs.update({k:v})returnkwargs
[docs]defadd_model_collection(self,models,domain='all',regen_mode='deferred'):r""" Add a ``collection`` of several models at once Parameters ---------- models : dict The collection of models to add. regen_mode : str By default the models are not regenerated upon addition. See the docstring for ``add_model`` for more information. domain : str The label indicating which locations the supplied collection of models should be applied to. Notes ----- Collections are dictionaries that are formatted the same as ``obj.models``. Several model collections are available in ``openpnm.models.collections``. """models=deepcopy(models)fork,vinmodels.items():if'domain'notinv.keys():v['domain']=domainif'regen_mode'notinv.keys():v['regen_mode']=regen_modeself.add_model(propname=k,**v)
[docs]defregenerate_models(self,propnames=None,exclude=[]):r""" Runs all the models stored in the object's ``models`` attribute Parameters ---------- propnames : list of strings If given then only the specified models are run exclude : list of strings If given then these models will *not* be run Notes ----- This function will ensure that models are called in the correct order such that 'pore.diameter' will be run before 'pore.volume', since the diameter is required to compute the volume. """all_models=self.models.dependency_list()# Regenerate all properties by defaultifpropnamesisNone:propnames=all_modelselse:propnames=np.atleast_1d(propnames).tolist()# Remove any that are specifically excludedpropnames=np.setdiff1d(propnames,exclude).tolist()# Reorder given propnames according to dependency treetmp=[e.split("@")[0]foreinpropnames]idx_sorted=[all_models.index(e)foreintmp]propnames=[elemfori,eleminsorted(zip(idx_sorted,propnames))]# Now run each on in sequenceforiteminpropnames:try:self.run_model(item)exceptKeyErrorase:msg=(f"{item} was not run since the following property"f" is missing: {e}")logger.warning(msg)self.models[item]['regen_mode']='deferred'
[docs]defrun_model(self,propname,domain=None):r""" Runs the requested model and places the result into the correct locations Parameters ---------- propname : str The name of the model to run. domain : str The label of the domain for which the model should be run. Passing ``propname='pore.diameter@domain1`` and ``domain=None`` is equivalent to passing ``propname='pore.diameter`` and ``domain=domain1``. Passing ``domain=None`` will regenerate all models starting with ``propname``. """ifdomainisNone:if'@'inpropname:# Get domain from propname if presentpropname,_,domain=propname.partition('@')self.run_model(propname=propname,domain=domain)else:# No domain means run model for ALL domainsforiteminself.models.keys():ifitem.startswith(propname+"@"):_,_,domain=item.partition("@")self.run_model(propname=propname,domain=domain)else:# domain was given explicitlydomain=domain.split('.',1)[-1]element,prop=propname.split('@')[0].split('.',1)propname=f'{element}.{prop}'mod_dict=self.models[propname+'@'+domain]# Collect kwargskwargs={'domain':f'{element}.{domain}'}foriteminmod_dict.keys():ifitemnotin['model','regen_mode']:kwargs[item]=mod_dict[item]# Deal with models that don't have domain argument yetif'domain'notininspect.getfullargspec(mod_dict['model']).args:_=kwargs.pop('domain',None)vals=mod_dict['model'](self,**kwargs)ifisinstance(vals,dict):# Handle models that return a dictfork,vinvals.items():v=np.atleast_1d(v)ifv.shape[0]==1:# Returned item was a scalarv=np.tile(v,self._count(element))vals[k]=v[self[f'{element}.{domain}']]elifisinstance(vals,(int,float)):# Handle models that return a floatvals=np.atleast_1d(vals)else:# Index into full domain result for use belowvals=vals[self[f'{element}.{domain}']]else:# Model that accepts domain argvals=mod_dict['model'](self,**kwargs)# Finally add model results to selfifisinstance(vals,np.ndarray):# If model returns single arrayifpropnamenotinself.keys():temp=self._initialize_empty_array_like(vals,element)self[f'{element}.{prop}']=tempself[propname][self[f'{element}.{domain}']]=valselifisinstance(vals,dict):# If model returns a dict of arraysfork,vinvals.items():iff'{propname}.{k}'notinself.keys():temp=self._initialize_empty_array_like(v,element)self[f'{propname}.{k}']=tempself[f'{propname}.{k}'][self[f'{element}.{domain}']]=v