importloggingimportnumpyasnpimportscipy.sparse.csgraphasspgrfromopenpnm.topotoolsimportis_fully_connectedfromopenpnm.algorithmsimportAlgorithmfromopenpnm.utilsimportDocorator,TypedSet,Workspacefromopenpnm.utilsimportcheck_data_healthfromopenpnmimportsolversfrom._solutionimportSteadyStateSolution,SolutionContainer__all__=['Transport']docstr=Docorator()logger=logging.getLogger(__name__)ws=Workspace()@docstr.get_sections(base='TransportSettings',sections=['Parameters'])@docstr.dedentclassTransportSettings:""" Defines the settings for Transport algorithms Parameters ---------- %(AlgorithmSettings.parameters)s quantity : str The name of the physical quantity to be solved for (i.e. 'pore.concentration') conductance : str The name of the pore-scale transport conductance values (i.e 'throat.diffusive_conductance') cache : bool If ``True``, the ``A`` matrix is cached and rather than getting rebuilt. variable_props : list of strings This list (actually a set) indicates which properties are variable and should be updated by the algorithm on each iteration. Note that any properties which already depend on ``'quantity'`` will automatically be updated. """phase=''quantity=''conductance=''cache=Truevariable_props=TypedSet()
[docs]@docstr.get_sections(base='Transport',sections=['Parameters'])@docstr.dedentclassTransport(Algorithm):""" This class implements steady-state linear transport calculations. Parameters ---------- %(Algorithm.parameters)s """def__init__(self,phase,name='trans_?',**kwargs):super().__init__(name=name,**kwargs)self.settings._update(TransportSettings())self.settings['phase']=phase.nameself['pore.bc.rate']=np.nanself['pore.bc.value']=np.nanself._A=Noneself._b=Noneself._pure_A=Noneself._pure_b=Noneself.soln={}def__getitem__(self,key):try:returnsuper().__getitem__(key)exceptKeyError:ifkey=='pore.source':return{}else:raiseKeyError(key)@propertydefx(self):"""Shortcut to the solution currently stored on the algorithm."""returnself[self.settings['quantity']]@x.setterdefx(self,value):self[self.settings['quantity']]=value@docstr.dedentdef_build_A(self):""" Builds the coefficient matrix based on throat conductance values. Notes ----- The conductance to use is specified in stored in the algorithm's settings under ``alg.settings['conductance']``. """gvals=self.settings['conductance']ifgvalsinself.iterative_props:self.settings.cache=Falseifnotself.settings['cache']:self._pure_A=Noneifself._pure_AisNone:phase=self.project[self.settings.phase]g=phase[gvals]am=self.network.create_adjacency_matrix(weights=g,fmt='coo')self._pure_A=spgr.laplacian(am).astype(float)self.A=self._pure_A.copy()def_build_b(self):"""Initializes the RHS vector, b, with zeros."""b=np.zeros(self.Np,dtype=float)self._pure_b=bself.b=self._pure_b.copy()@propertydefA(self):"""The coefficient matrix, A (in Ax = b)"""ifself._AisNone:self._build_A()returnself._A@A.setterdefA(self,value):self._A=value@propertydefb(self):"""The right-hand-side (RHS) vector, b (in Ax = b)"""ifself._bisNone:self._build_b()returnself._b@b.setterdefb(self,value):self._b=valuedef_apply_BCs(self):"""Applies specified boundary conditions by modifying A and b."""if'pore.bc.rate'inself.keys():# Update bind=np.isfinite(self['pore.bc.rate'])self.b[ind]=self['pore.bc.rate'][ind]if'pore.bc.value'inself.keys():f=self.A.diagonal().mean()# Update b (impose bc values)ind=np.isfinite(self['pore.bc.value'])self.b[ind]=self['pore.bc.value'][ind]*f# Update b (subtract quantities from b to keep A symmetric)x_BC=np.zeros_like(self.b)x_BC[ind]=self['pore.bc.value'][ind]self.b[~ind]-=(self.A*x_BC)[~ind]# Update AP_bc=self.to_indices(ind)mask=np.isin(self.A.row,P_bc)|np.isin(self.A.col,P_bc)# Remove entries from A for all BC rows/colsself.A.data[mask]=0# Add diagonal entries back into Adatadiag=self.A.diagonal()datadiag[P_bc]=np.ones_like(P_bc,dtype=float)*fself.A.setdiag(datadiag)self.A.eliminate_zeros()
[docs]defrun(self,solver=None,x0=None,verbose=False):""" Builds the A and b matrices, and calls the solver specified in the ``settings`` attribute. This method stores the solution in the algorithm's ``soln`` attribute as a ``SolutionContainer`` object. The solution itself is stored in the ``x`` attribute of the algorithm as a NumPy array. Parameters ---------- x0 : ndarray Initial guess of unknown variable Returns ------- None """logger.info('Running Transport')ifsolverisNone:solver=getattr(solvers,ws.settings.default_solver)()# Perform pre-solve validationsself._validate_settings()self._validate_topology_health()self._validate_linear_system()# Write x0 to algorithm (needed by _update_iterative_props)self.x=x0=np.zeros_like(self.b)ifx0isNoneelsex0.copy()self["pore.initial_guess"]=x0self._validate_x0()# Initialize the solution objectself.soln=SolutionContainer()self.soln[self.settings['quantity']]=SteadyStateSolution(x0)self.soln.is_converged=False# Build A and b, then solve the system of equationsself._update_A_and_b()self._run_special(solver=solver,x0=x0,verbose=verbose)
def_run_special(self,solver,x0,w=1.0,verbose=None):# Make sure A and b are 'still' well-definedself._validate_linear_system()# Solve and apply under-relaxationx_new,exit_code=solver.solve(A=self.A,b=self.b,x0=x0)self.x=w*x_new+(1-w)*self.x# Update A and b using the recent solution otherwise, for iterative# algorithms, residual will be incorrectly calculated ~0, since A & b# are outdatedself._update_A_and_b()# Update SteadyStateSolution object on algorithmself.soln[self.settings['quantity']][:]=self.xself.soln.is_converged=notbool(exit_code)def_update_A_and_b(self):"""Builds A and b, and applies specified boundary conditions."""self._build_A()self._build_b()self._apply_BCs()def_validate_x0(self):"""Ensures x0 doesn't contain any nans/infs."""x0=self["pore.initial_guess"]ifnotnp.isfinite(x0).all():raiseException("x0 contains inf/nan values")def_validate_settings(self):ifself.settings['quantity']isNone:raiseException("'quantity' hasn't been defined on this algorithm")ifself.settings['conductance']isNone:raiseException("'conductance' hasn't been defined on this algorithm")ifself.settings['phase']isNone:raiseException("'phase' hasn't been defined on this algorithm")def_validate_topology_health(self):""" Ensures the network is not clustered, and if it is, they're at least connected to a boundary condition pore. """Ps=~np.isnan(self['pore.bc.rate'])+~np.isnan(self['pore.bc.value'])ifnotis_fully_connected(network=self.network,pores_BC=Ps):msg=("Your network is clustered, making Ax = b ill-conditioned")raiseException(msg)def_validate_linear_system(self):"""Ensures the linear system Ax = b doesn't contain any nans/infs."""ifnp.isfinite(self.A.data).all()andnp.isfinite(self.b).all():returnraiseException("A or b contains inf/nan values")
[docs]defrate(self,pores=[],throats=[],mode='group'):""" Calculates the net rate of material moving into a given set of pores or throats Parameters ---------- pores : array_like The pores for which the rate should be calculated throats : array_like The throats through which the rate should be calculated mode : str, optional Controls how to return the rate. The default value is 'group'. Options are: =========== ===================================================== mode meaning =========== ===================================================== 'group' Returns the cumulative rate of material 'single' Calculates the rate for each pore individually =========== ===================================================== Returns ------- If ``pores`` are specified, then the returned values indicate the net rate of material exiting the pore or pores. Thus a positive rate indicates material is leaving the pores, and negative values mean material is entering. If ``throats`` are specified the rate is calculated in the direction of the gradient, thus is always positive. If ``mode`` is 'single' then the cumulative rate through the given pores (or throats) are returned as a vector, if ``mode`` is 'group' then the individual rates are summed and returned as a scalar. """pores=self._parse_indices(pores)throats=self._parse_indices(throats)ifthroats.size>0andpores.size>0:raiseException('Must specify either pores or throats, not both')if(throats.size==0)and(pores.size==0):raiseException('Must specify either pores or throats')network=self.project.networkphase=self.project[self.settings['phase']]g=phase[self.settings['conductance']]P12=network['throat.conns']X12=self.x[P12]ifg.size==self.Nt:g=np.tile(g,(2,1)).T# Make conductance an Nt by 2 matrix# The next line is critical for rates to be correct# We could also do "g.T.flatten()" or "g.flatten('F')"g=np.flip(g,axis=1)Qt=np.diff(g*X12,axis=1).ravel()ifthroats.size:R=np.absolute(Qt[throats])ifmode=='group':R=np.sum(R)elifpores.size:Qp=np.zeros((self.Np,))np.add.at(Qp,P12[:,0],-Qt)np.add.at(Qp,P12[:,1],Qt)R=Qp[pores]ifmode=='group':R=np.sum(R)returnnp.array(R,ndmin=1)
[docs]defclear_value_BCs(self):"""Clears all value BCs."""self.set_BC(pores=None,bctype='value',mode='remove')
[docs]defclear_rate_BCs(self):"""Clears all rate BCs"""self.set_BC(pores=None,bctype='rate',mode='remove')
[docs]defset_value_BC(self,pores=None,values=[],mode='add'):""" Applies constant value boundary conditons to the specified pores. These are sometimes referred to as Dirichlet conditions. Parameters ---------- pores : array_like The pore indices where the condition should be applied values : float or array_like The value to apply in each pore. If a scalar is supplied it is assigne to all locations, and if a vector is applied is must be the same size as the indices given in ``pores``. mode : str, optional Controls how the boundary conditions are applied. The default value is 'merge'. For definition of various modes, see the docstring for ``set_BC``. """self.set_BC(pores=pores,bctype='value',bcvalues=values,mode=mode)
[docs]defset_rate_BC(self,pores=None,rates=[],mode='add'):""" Apply constant rate boundary conditons to the specified locations. Parameters ---------- pores : array_like The pore indices where the condition should be applied rates : float or array_like, optional The rates to apply in each pore. If a scalar is supplied that rate is assigned to all locations, and if a vector is supplied it must be the same size as the indices given in ``pores``. mode : str, optional Controls how the boundary conditions are applied. The default value is 'merge'. For definition of various modes, see the docstring for ``set_BC``. """self.set_BC(pores=pores,bctype='rate',bcvalues=rates,mode=mode)