Source code for openpnm.algorithms._invasion_percolation
importheapqashqimportloggingfromcollectionsimportnamedtupleimportnumpyasnpfromnumbaimportjit,njitfromtqdm.autoimporttqdmfromopenpnm._skgraph.queriesimportqupc_initialize,qupc_reduce,qupc_updatefromopenpnm._skgraph.simulationsimportbond_percolation,site_percolationfromopenpnm.algorithmsimportAlgorithmfromopenpnm.utilsimportDocorator__all__=['InvasionPercolation',]logger=logging.getLogger(__name__)docstr=Docorator()@docstr.get_sections(base='IPSettings',sections=['Parameters','Other Parameters'])@docstr.dedentclassIPSettings:r""" Parameters ---------- %(AlgorithmSettings.parameters)s pore_volume : str The dictionary key for the pore volume array throat_volume : str The dictionary key for the throat volume array entry_pressure : str The dictionary key for the throat capillary pressure """phase=''pore_volume='pore.volume'throat_volume='throat.volume'entry_pressure='throat.entry_pressure'
[docs]classInvasionPercolation(Algorithm):r""" A classic invasion percolation algorithm optimized for speed with numba Parameters ---------- network : Network The Network upon which the invasion will occur Notes ----- This algorithm uses a `binary heap <https://en.wikipedia.org/wiki/Binary_heap>`_ to store a list of all accessible throats, sorted according to entry pressure. This means that item [0] in the heap is the most easily invaded throat that is currently accessible by the invading fluid, so looking up which throat to invade next is computationally trivial. In order to keep the list sorted, adding new throats to the list takes more time; however, the heap data structure is very efficient at this. """def__init__(self,phase,name='ip_?',**kwargs):super().__init__(name=name,**kwargs)self.settings._update(IPSettings())self.settings['phase']=phase.nameself['pore.bc.inlet']=Falseself['pore.bc.outlet']=Falseself.reset()
# self['pore.residual'] = False# self['throat.residual'] = Falsedef_set_residual(self,pores=None,throats=None,mode='add'):# pragma: no coverraiseNotImplementedError("The ability to add residual nwp is not ready yet")ifmode=='add':ifporesisnotNone:self['pore.residual'][pores]=TrueifthroatsisnotNone:self['throat.residual'][throats]=Trueelifmode=='drop':ifporesisnotNone:self['pore.residual'][pores]=FalseifthroatsisnotNone:self['throat.residual'][throats]=Falseelifmode=='clear':ifporesisnotNone:self['pore.residual']=FalseifthroatsisnotNone:self['throat.residual']=Falseelifmode=='overwrite':ifporesisnotNone:self['pore.residual']=Falseself['pore.residual'][pores]=TrueifthroatsisnotNone:self['throat.residual']=Falseself['throat.residual'][throats]=True
[docs]defset_inlet_BC(self,pores=None,mode='add'):r""" Specifies which pores are treated as inlets for the invading phase Parameters ---------- pores : ndarray The indices of the pores from which the invading fluid invasion should start mode : str or list of str, optional Controls how the boundary conditions are applied. Options are: ============ ===================================================== mode meaning ============ ===================================================== 'add' (default) Adds the supplied boundary conditions to the given locations. Raises an exception if values of any type already exist in the given locations. 'overwrite' Adds supplied boundary conditions to the given locations, including overwriting conditions of the given type or any other type that may be present in the given locations. 'remove' Removes boundary conditions of the specified type from the specified locations. If ``bctype`` is not specified then *all* types are removed. If no locations are given then values are remvoed from *all* locations. ============ ===================================================== If a list of strings is provided, then each mode in the list is handled in order, so that ``['remove', 'add']`` will give the same results add ``'overwrite'``. """self.set_BC(pores=pores,bcvalues=True,bctype='inlet',mode=mode)self.reset()self['pore.invasion_sequence'][self['pore.bc.inlet']]=0
[docs]defset_outlet_BC(self,pores=None,mode='add'):r""" Specifies which pores are treated as outlets for the defending phase This must be specified if trapping is to be considered. Parameters ---------- pores : ndarray The indices of the pores from which the defending fluid exits the domain mode : str or list of str, optional Controls how the boundary conditions are applied. Options are: ============ ===================================================== mode meaning ============ ===================================================== 'add' (default) Adds the supplied boundary conditions to the given locations. Raises an exception if values of any type already exist in the given locations. 'overwrite' Adds supplied boundary conditions to the given locations, including overwriting conditions of the given type or any other type that may be present in the given locations. 'remove' Removes boundary conditions of the specified type from the specified locations. If ``bctype`` is not specified then *all* types are removed. If no locations are given then values are remvoed from *all* locations. ============ ===================================================== If a list of strings is provided, then each mode in the list is handled in order, so that ``['remove', 'add']`` will give the same results add ``'overwrite'``. """self.set_BC(pores=pores,bcvalues=True,bctype='outlet',mode=mode)
[docs]defrun(self):r""" Performs the algorithm for the given number of steps """# Setup arrays and info# TODO: This should be called conditionally so that it doesn't# overwrite existing data when doing a few steps at a timeself._run_setup()n_steps=np.inf# Create incidence matrix for use in _run_accelerated which is jitim=self.network.create_incidence_matrix(fmt='csr')# Perform initial analysis on input poresTs=self.project.network.find_neighbor_throats(pores=self['pore.bc.inlet'])t_start=self['throat.order'][Ts]t_inv,p_inv,p_inv_t= \
_run_accelerated(t_start=t_start,t_sorted=self['throat.sorted'],t_order=self['throat.order'],t_inv=self['throat.invasion_sequence'],p_inv=self['pore.invasion_sequence'],p_inv_t=np.zeros_like(self['pore.invasion_sequence']),conns=self.project.network['throat.conns'],idx=im.indices,indptr=im.indptr,n_steps=n_steps)# Transfer results onto algorithm objectself['throat.invasion_sequence']=t_invself['pore.invasion_sequence']=p_invself['throat.invasion_pressure']=self['throat.entry_pressure']self['pore.invasion_pressure']=self['throat.entry_pressure'][p_inv_t]# Set invasion pressure of inlets to 0self['pore.invasion_pressure'][self['pore.invasion_sequence']==0]=0.0
# Set invasion sequence and pressure of any residual pores/throats to 0# self['throat.invasion_sequence'][self['throat.residual']] = 0# self['pore.invasion_sequence'][self['pore.residual']] = 0def_run_setup(self):self['pore.invasion_sequence'][self['pore.bc.inlet']]=0# self['pore.invasion_sequence'][self['pore.residual']] = 0# self['throat.invasion_sequence'][self['throat.residual']] = 0# Set throats between inlets as trappedTs=self.network.find_neighbor_throats(self['pore.bc.inlet'],mode='xnor')self['throat.trapped'][Ts]=True# Get throat capillary pressures from phase and updatephase=self.project[self.settings['phase']]self['throat.entry_pressure']=phase[self.settings['entry_pressure']]# self['throat.entry_pressure'][self['throat.residual']] = 0.0# Generated indices into t_entry giving a sorted listself['throat.sorted']=np.argsort(self['throat.entry_pressure'],axis=0)self['throat.order']=0self['throat.order'][self['throat.sorted']]=np.arange(0,self.Nt)
[docs]defpc_curve(self):r""" Get the percolation data as the non-wetting phase saturation vs the capillary pressure. """net=self.project.networkpvols=net[self.settings['pore_volume']]tvols=net[self.settings['throat_volume']]tot_vol=np.sum(pvols)+np.sum(tvols)# Normalizepvols/=tot_voltvols/=tot_vol# Remove trapped volumepmask=self['pore.invasion_sequence']>=0tmask=self['throat.invasion_sequence']>=0pvols=pvols[pmask]tvols=tvols[tmask]pseq=self['pore.invasion_sequence'][pmask]tseq=self['throat.invasion_sequence'][tmask]pPc=self['pore.invasion_pressure'][pmask]tPc=self['throat.invasion_pressure'][tmask]vols=np.concatenate((pvols,tvols))seqs=np.concatenate((pseq,tseq))Pcs=np.concatenate((pPc,tPc))data=np.rec.fromarrays([seqs,vols,Pcs],formats=['i','f','f'],names=['seq','vol','Pc'])data.sort(axis=0,order='seq')pc_curve=namedtuple('pc_curve',('pc','snwp'))data=pc_curve(data.Pc,np.cumsum(data.vol))returndata
[docs]defapply_trapping(self):r""" Adjusts the invasion sequence of pores and throats that are trapped. This method uses the reverse invasion percolation procedure outlined by Masson [1]. Returns ------- This function does not return anything. It adjusts the ``'pore.invasion_sequence'`` and ``'throat.invasion_sequence'`` arrays on the object by setting trapped pores/throats to ``ninf``. It also puts ``True`` values into the ``'pore.trapped'`` and ``'throat.trapped'`` arrays. Notes ----- Outlet pores must be specified (using ``set_outlet_BC`` or putting ``True`` values in ``alg['pore.bc.outlet']``) or else an exception is raised. References ---------- [1] Masson, Y. https://doi.org/10.1016/j.cageo.2016.02.003 """outlets=np.where(self['pore.bc.outlet'])[0]am=self.network.create_adjacency_matrix(fmt='csr')inv_seq=self['pore.invasion_sequence']self['pore.trapped']=_find_trapped_pores(inv_seq,am.indices,am.indptr,outlets)# Update invasion sequenceself['pore.invasion_sequence'][self['pore.trapped']]=-1# Find which throats are trapped, including throats which were invaded# after both of it's pores were invaded (hence have a unique invasion# sequence number).pmask=self['pore.invasion_sequence'][self.network.conns]tmask=np.stack((self['throat.invasion_sequence'],self['throat.invasion_sequence'])).Thits=~np.any(pmask==tmask,axis=1)self['throat.trapped']=hitsself['throat.invasion_sequence'][hits]=-1# Make some adjustmentsPmask=self['pore.invasion_sequence']<0Tmask=self['throat.invasion_sequence']<0self['pore.invasion_sequence']= \
self['pore.invasion_sequence'].astype(float)self['pore.invasion_sequence'][Pmask]=np.infself['throat.invasion_sequence']= \
self['throat.invasion_sequence'].astype(float)self['throat.invasion_sequence'][Tmask]=np.inf
def_apply_trapping_slow(self,step_size=1,mode='mixed'):# pragma: no cover# TODO: Make sure this function actually works and keep it for debuggingN=self['throat.invasion_sequence'].max()pseq=self['pore.invasion_sequence']tseq=self['throat.invasion_sequence']msg='Evaluating trapping'foriintqdm(range(0,int(N),step_size),msg):ifmode=='bond':i+=1s,b=bond_percolation(conns=self.network.conns,occupied_bonds=tseq>i)elifmode=='site':s,b=site_percolation(conns=self.network.conns,occupied_sites=pseq>i)clusters=np.unique(s[self['pore.bc.outlet']])self['pore.trapped']+=np.isin(s,clusters,invert=True)*(pseq>i)self['throat.trapped']+=np.isin(b,clusters,invert=True)*(tseq>i)# Set trapped pores/throats to uninvaded and adjust invasion sequenceself['pore.invasion_sequence'][self['pore.trapped']]=-1self['throat.invasion_sequence'][self['throat.trapped']]=-1
# Set any residual pores within trapped clusters back to untrapped# self['pore.trapped'][self['pore.residual']] = False# self['throat.trapped'][self['throat.residual']] = False@jit(forceobj=True)def_find_trapped_pores(inv_seq,indices,indptr,outlets):Np=len(inv_seq)sorted_seq=np.vstack((inv_seq.astype(np.int_),np.arange(Np,dtype=np.int_))).Tsorted_seq=sorted_seq[sorted_seq[:,0].argsort()][::-1]cluster=-np.ones(Np,dtype=np.int_)trapped_pores=np.zeros(Np,dtype=bool)trapped_clusters=np.zeros(Np,dtype=bool)# cluster_map = qupc_initialize(Np)cluster_map=np.arange(Np,dtype=np.int_)next_cluster_num=0i=-1forstep,poreinsorted_seq:i+=1step,pore=sorted_seq[i,:]n=indices[indptr[pore]:indptr[pore+1]]nc=cluster_map[cluster[n]][inv_seq[n]>step]nc_uniq=np.unique(nc)ifnc.size==0:# Found an isolated pore, start a new clustercluster[pore]=next_cluster_num# If pore is an outlet then note cluster as no longer trappedifporeinoutlets:trapped_clusters[next_cluster_num]=Falseelse:# Otherwise note this cluster as being a trapped clustertrapped_clusters[next_cluster_num]=True# Note this pore as trapped as welltrapped_pores[pore]=True# Increment cluster number for next timenext_cluster_num+=1elifnc_uniq.size==1:c=nc_uniq[0]# Neighbors have one unique cluster number, so assign it to current porecluster[pore]=c# If pore is an outlet then note cluster as no longer trappedifporeinoutlets:trapped_clusters[c]=False# Also set all joined clusters to not trappedcluster_map=qupc_reduce(cluster_map)hits=np.where(cluster_map==cluster_map[c])[0]trapped_clusters[hits]=False# If this cluster number is part of a trapped cluster then# mark pore as trappediftrapped_clusters[c]:trapped_pores[pore]=Trueelifnc_uniq.size>1:cluster[pore]=min(nc_uniq)# Merge all clusters into a single clusterforcinnc:qupc_update(cluster_map,c,min(nc_uniq))cluster_map=qupc_reduce(cluster_map)# If all neighboring clusters are trapped, then set current pore to# trapped as wellifnp.all(trapped_clusters[nc]):trapped_pores[pore]=Trueelse:# Otherwise set all neighbor clusters to untrapped!trapped_clusters[nc]=Falsereturntrapped_pores@njitdef_run_accelerated(t_start,t_sorted,t_order,t_inv,p_inv,p_inv_t,conns,idx,indptr,n_steps):# pragma: no coverr""" Numba-jitted run method for InvasionPercolation class. Notes ----- ``idx`` and ``indptr`` are properties are the network's incidence matrix, and are used to quickly find neighbor throats. Numba doesn't like foreign data types (i.e. Network), and so ``find_neighbor_throats`` method cannot be called in a jitted method. Nested wrapper is for performance issues (reduced OpenPNM import) time due to local numba import """# TODO: The following line is supposed to be numba's new list, but the# heap does not work with this# queue = List(t_start)queue=list(t_start)hq.heapify(queue)count=1whilecount<(n_steps+1):# Find throat at the top of the queuet=hq.heappop(queue)# Extract actual throat numbert_next=t_sorted[t]t_inv[t_next]=count# If throat is duplicatedwhilelen(queue)>0andqueue[0]==t:_=hq.heappop(queue)# Find pores connected to newly invaded throat from am in coo formatPs=conns[t_next]# Remove already invaded pores from PsPs=Ps[p_inv[Ps]<0]# If either of the neighboring pores are uninvaded (-1), set it to# invaded and add its neighboring throats to the queueiflen(Ps)>0:p_inv[Ps]=countp_inv_t[Ps]=t_nextforiinPs:# Get neighboring throat numbers from im in csr formatTs=idx[indptr[i]:indptr[i+1]]# Keep only throats which are uninvadedTs=Ts[t_inv[Ts]<0]foriinTs:# Add throat to the queuehq.heappush(queue,t_order[i])count+=1iflen(queue)==0:breakreturnt_inv,p_inv,p_inv_t# %%if__name__=='__main__':importmatplotlib.pyplotaspltimportopenpnmasopnp.random.seed(0)Nx,Ny,Nz=25,25,1pn=op.network.Cubic(shape=[Nx,Ny,Nz],spacing=1e-4)pn.add_model_collection(op.models.collections.geometry.spheres_and_cylinders)pn.regenerate_models()pn['pore.volume@left']=0.0# op.topotools.trim(pn, pores=[380, 395])water=op.phase.Water(network=pn,name='h2o')water.add_model_collection(op.models.collections.physics.standard)water.regenerate_models()ip=InvasionPercolation(network=pn,phase=water)ip.set_inlet_BC(pn.pores('left'))ip.run()ip.set_outlet_BC(pn.pores('right'))# ip.apply_trapping()# %%if0:pseq=np.copy(ip['pore.invasion_sequence'])tseq=np.copy(ip['throat.invasion_sequence'])ax=op.topotools.plot_connections(pn,tseq>=0,linestyle='--',c='b',linewidth=3)op.topotools.plot_coordinates(pn,pseq>=0,c='r',marker='x',markersize=100,ax=ax)op.topotools.plot_coordinates(pn,pseq<0,c='c',marker='o',markersize=100,ax=ax)# %%if1:drn=op.algorithms.Drainage(network=pn,phase=water)drn.set_inlet_BC(pn.pores('left'))pressures=np.unique(ip['pore.invasion_pressure'])# pressures = np.logspace(np.log10(0.1e3), np.log10(2e4), 100)drn.run(pressures=pressures)drn.set_outlet_BC(pn.pores('right'))# drn.apply_trapping()fig,ax=plt.subplots(1,1)ax.step(*ip.pc_curve(),'b',where='post')ax.step(*drn.pc_curve(),'r--',where='post')ax.set_ylim([0,1])# %%if0:pseq=ip['pore.invasion_sequence']pseq[pseq==-1]=pseq.max()+1tseq=ip['throat.invasion_sequence']tseq[tseq==-1]=tseq.max()+1forj,iinenumerate(tqdm(np.unique(tseq)[:-1])):ax=op.topotools.plot_connections(pn,tseq<=i,linestyle='--',c='r',linewidth=3)op.topotools.plot_coordinates(pn,pseq<=i,c='b',marker='x',markersize=100,ax=ax)op.topotools.plot_coordinates(pn,ip['pore.trapped'],c='c',marker='o',markersize=100,ax=ax)plt.savefig(f"{str(j).zfill(3)}.png")plt.close()