Source code for openpnm.algorithms._invasion_percolation

import heapq as hq
import logging
from collections import namedtuple

import numpy as np
from numba import jit, njit
from tqdm.auto import tqdm

from openpnm._skgraph.queries import qupc_initialize, qupc_reduce, qupc_update
from openpnm._skgraph.simulations import bond_percolation, site_percolation
from openpnm.algorithms import Algorithm
from openpnm.utils import Docorator

__all__ = [
    'InvasionPercolation',
]


logger = logging.getLogger(__name__)
docstr = Docorator()


@docstr.get_sections(base='IPSettings',
                     sections=['Parameters', 'Other Parameters'])
@docstr.dedent
class IPSettings:
    r"""

    Parameters
    ----------
    %(AlgorithmSettings.parameters)s
    pore_volume : str
        The dictionary key for the pore volume array
    throat_volume : str
        The dictionary key for the throat volume array
    entry_pressure : str
        The dictionary key for the throat capillary pressure

    """
    phase = ''
    pore_volume = 'pore.volume'
    throat_volume = 'throat.volume'
    entry_pressure = 'throat.entry_pressure'


[docs] class InvasionPercolation(Algorithm): r""" A classic invasion percolation algorithm optimized for speed with numba Parameters ---------- network : Network The Network upon which the invasion will occur Notes ----- This algorithm uses a `binary heap <https://en.wikipedia.org/wiki/Binary_heap>`_ to store a list of all accessible throats, sorted according to entry pressure. This means that item [0] in the heap is the most easily invaded throat that is currently accessible by the invading fluid, so looking up which throat to invade next is computationally trivial. In order to keep the list sorted, adding new throats to the list takes more time; however, the heap data structure is very efficient at this. """ def __init__(self, phase, name='ip_?', **kwargs): super().__init__(name=name, **kwargs) self.settings._update(IPSettings()) self.settings['phase'] = phase.name self['pore.bc.inlet'] = False self['pore.bc.outlet'] = False self.reset()
[docs] def reset(self): self['pore.invasion_sequence'] = -1 self['throat.invasion_sequence'] = -1 self['pore.trapped'] = False self['throat.trapped'] = False
# self['pore.residual'] = False # self['throat.residual'] = False def _set_residual(self, pores=None, throats=None, mode='add'): # pragma: no cover raise NotImplementedError("The ability to add residual nwp is not ready yet") if mode == 'add': if pores is not None: self['pore.residual'][pores] = True if throats is not None: self['throat.residual'][throats] = True elif mode == 'drop': if pores is not None: self['pore.residual'][pores] = False if throats is not None: self['throat.residual'][throats] = False elif mode == 'clear': if pores is not None: self['pore.residual'] = False if throats is not None: self['throat.residual'] = False elif mode == 'overwrite': if pores is not None: self['pore.residual'] = False self['pore.residual'][pores] = True if throats is not None: self['throat.residual'] = False self['throat.residual'][throats] = True
[docs] def set_inlet_BC(self, pores=None, mode='add'): r""" Specifies which pores are treated as inlets for the invading phase Parameters ---------- pores : ndarray The indices of the pores from which the invading fluid invasion should start mode : str or list of str, optional Controls how the boundary conditions are applied. Options are: ============ ===================================================== mode meaning ============ ===================================================== 'add' (default) Adds the supplied boundary conditions to the given locations. Raises an exception if values of any type already exist in the given locations. 'overwrite' Adds supplied boundary conditions to the given locations, including overwriting conditions of the given type or any other type that may be present in the given locations. 'remove' Removes boundary conditions of the specified type from the specified locations. If ``bctype`` is not specified then *all* types are removed. If no locations are given then values are remvoed from *all* locations. ============ ===================================================== If a list of strings is provided, then each mode in the list is handled in order, so that ``['remove', 'add']`` will give the same results add ``'overwrite'``. """ self.set_BC(pores=pores, bcvalues=True, bctype='inlet', mode=mode) self.reset() self['pore.invasion_sequence'][self['pore.bc.inlet']] = 0
[docs] def set_outlet_BC(self, pores=None, mode='add'): r""" Specifies which pores are treated as outlets for the defending phase This must be specified if trapping is to be considered. Parameters ---------- pores : ndarray The indices of the pores from which the defending fluid exits the domain mode : str or list of str, optional Controls how the boundary conditions are applied. Options are: ============ ===================================================== mode meaning ============ ===================================================== 'add' (default) Adds the supplied boundary conditions to the given locations. Raises an exception if values of any type already exist in the given locations. 'overwrite' Adds supplied boundary conditions to the given locations, including overwriting conditions of the given type or any other type that may be present in the given locations. 'remove' Removes boundary conditions of the specified type from the specified locations. If ``bctype`` is not specified then *all* types are removed. If no locations are given then values are remvoed from *all* locations. ============ ===================================================== If a list of strings is provided, then each mode in the list is handled in order, so that ``['remove', 'add']`` will give the same results add ``'overwrite'``. """ self.set_BC(pores=pores, bcvalues=True, bctype='outlet', mode=mode)
[docs] def run(self): r""" Performs the algorithm for the given number of steps """ # Setup arrays and info # TODO: This should be called conditionally so that it doesn't # overwrite existing data when doing a few steps at a time self._run_setup() n_steps = np.inf # Create incidence matrix for use in _run_accelerated which is jit im = self.network.create_incidence_matrix(fmt='csr') # Perform initial analysis on input pores Ts = self.project.network.find_neighbor_throats(pores=self['pore.bc.inlet']) t_start = self['throat.order'][Ts] t_inv, p_inv, p_inv_t = \ _run_accelerated( t_start=t_start, t_sorted=self['throat.sorted'], t_order=self['throat.order'], t_inv=self['throat.invasion_sequence'], p_inv=self['pore.invasion_sequence'], p_inv_t=np.zeros_like(self['pore.invasion_sequence']), conns=self.project.network['throat.conns'], idx=im.indices, indptr=im.indptr, n_steps=n_steps) # Transfer results onto algorithm object self['throat.invasion_sequence'] = t_inv self['pore.invasion_sequence'] = p_inv self['throat.invasion_pressure'] = self['throat.entry_pressure'] self['pore.invasion_pressure'] = self['throat.entry_pressure'][p_inv_t] # Set invasion pressure of inlets to 0 self['pore.invasion_pressure'][self['pore.invasion_sequence'] == 0] = 0.0
# Set invasion sequence and pressure of any residual pores/throats to 0 # self['throat.invasion_sequence'][self['throat.residual']] = 0 # self['pore.invasion_sequence'][self['pore.residual']] = 0 def _run_setup(self): self['pore.invasion_sequence'][self['pore.bc.inlet']] = 0 # self['pore.invasion_sequence'][self['pore.residual']] = 0 # self['throat.invasion_sequence'][self['throat.residual']] = 0 # Set throats between inlets as trapped Ts = self.network.find_neighbor_throats(self['pore.bc.inlet'], mode='xnor') self['throat.trapped'][Ts] = True # Get throat capillary pressures from phase and update phase = self.project[self.settings['phase']] self['throat.entry_pressure'] = phase[self.settings['entry_pressure']] # self['throat.entry_pressure'][self['throat.residual']] = 0.0 # Generated indices into t_entry giving a sorted list self['throat.sorted'] = np.argsort(self['throat.entry_pressure'], axis=0) self['throat.order'] = 0 self['throat.order'][self['throat.sorted']] = np.arange(0, self.Nt)
[docs] def pc_curve(self): r""" Get the percolation data as the non-wetting phase saturation vs the capillary pressure. """ net = self.project.network pvols = net[self.settings['pore_volume']] tvols = net[self.settings['throat_volume']] tot_vol = np.sum(pvols) + np.sum(tvols) # Normalize pvols /= tot_vol tvols /= tot_vol # Remove trapped volume pmask = self['pore.invasion_sequence'] >= 0 tmask = self['throat.invasion_sequence'] >= 0 pvols = pvols[pmask] tvols = tvols[tmask] pseq = self['pore.invasion_sequence'][pmask] tseq = self['throat.invasion_sequence'][tmask] pPc = self['pore.invasion_pressure'][pmask] tPc = self['throat.invasion_pressure'][tmask] vols = np.concatenate((pvols, tvols)) seqs = np.concatenate((pseq, tseq)) Pcs = np.concatenate((pPc, tPc)) data = np.rec.fromarrays([seqs, vols, Pcs], formats=['i', 'f', 'f'], names=['seq', 'vol', 'Pc']) data.sort(axis=0, order='seq') pc_curve = namedtuple('pc_curve', ('pc', 'snwp')) data = pc_curve(data.Pc, np.cumsum(data.vol)) return data
[docs] def apply_trapping(self): r""" Adjusts the invasion sequence of pores and throats that are trapped. This method uses the reverse invasion percolation procedure outlined by Masson [1]. Returns ------- This function does not return anything. It adjusts the ``'pore.invasion_sequence'`` and ``'throat.invasion_sequence'`` arrays on the object by setting trapped pores/throats to ``ninf``. It also puts ``True`` values into the ``'pore.trapped'`` and ``'throat.trapped'`` arrays. Notes ----- Outlet pores must be specified (using ``set_outlet_BC`` or putting ``True`` values in ``alg['pore.bc.outlet']``) or else an exception is raised. References ---------- [1] Masson, Y. https://doi.org/10.1016/j.cageo.2016.02.003 """ outlets = np.where(self['pore.bc.outlet'])[0] am = self.network.create_adjacency_matrix(fmt='csr') inv_seq = self['pore.invasion_sequence'] self['pore.trapped'] = _find_trapped_pores(inv_seq, am.indices, am.indptr, outlets) # Update invasion sequence self['pore.invasion_sequence'][self['pore.trapped']] = -1 # Find which throats are trapped, including throats which were invaded # after both of it's pores were invaded (hence have a unique invasion # sequence number). pmask = self['pore.invasion_sequence'][self.network.conns] tmask = np.stack((self['throat.invasion_sequence'], self['throat.invasion_sequence'])).T hits = ~np.any(pmask == tmask, axis=1) self['throat.trapped'] = hits self['throat.invasion_sequence'][hits] = -1 # Make some adjustments Pmask = self['pore.invasion_sequence'] < 0 Tmask = self['throat.invasion_sequence'] < 0 self['pore.invasion_sequence'] = \ self['pore.invasion_sequence'].astype(float) self['pore.invasion_sequence'][Pmask] = np.inf self['throat.invasion_sequence'] = \ self['throat.invasion_sequence'].astype(float) self['throat.invasion_sequence'][Tmask] = np.inf
def _apply_trapping_slow(self, step_size=1, mode='mixed'): # pragma: no cover # TODO: Make sure this function actually works and keep it for debugging N = self['throat.invasion_sequence'].max() pseq = self['pore.invasion_sequence'] tseq = self['throat.invasion_sequence'] msg = 'Evaluating trapping' for i in tqdm(range(0, int(N), step_size), msg): if mode == 'bond': i += 1 s, b = bond_percolation(conns=self.network.conns, occupied_bonds=tseq > i) elif mode == 'site': s, b = site_percolation(conns=self.network.conns, occupied_sites=pseq > i) clusters = np.unique(s[self['pore.bc.outlet']]) self['pore.trapped'] += np.isin(s, clusters, invert=True)*(pseq > i) self['throat.trapped'] += np.isin(b, clusters, invert=True)*(tseq > i) # Set trapped pores/throats to uninvaded and adjust invasion sequence self['pore.invasion_sequence'][self['pore.trapped']] = -1 self['throat.invasion_sequence'][self['throat.trapped']] = -1
# Set any residual pores within trapped clusters back to untrapped # self['pore.trapped'][self['pore.residual']] = False # self['throat.trapped'][self['throat.residual']] = False @jit(forceobj=True) def _find_trapped_pores(inv_seq, indices, indptr, outlets): Np = len(inv_seq) sorted_seq = np.vstack((inv_seq.astype(np.int_), np.arange(Np, dtype=np.int_))).T sorted_seq = sorted_seq[sorted_seq[:, 0].argsort()][::-1] cluster = -np.ones(Np, dtype=np.int_) trapped_pores = np.zeros(Np, dtype=bool) trapped_clusters = np.zeros(Np, dtype=bool) # cluster_map = qupc_initialize(Np) cluster_map = np.arange(Np, dtype=np.int_) next_cluster_num = 0 i = -1 for step, pore in sorted_seq: i += 1 step, pore = sorted_seq[i, :] n = indices[indptr[pore]:indptr[pore+1]] nc = cluster_map[cluster[n]][inv_seq[n] > step] nc_uniq = np.unique(nc) if nc.size == 0: # Found an isolated pore, start a new cluster cluster[pore] = next_cluster_num # If pore is an outlet then note cluster as no longer trapped if pore in outlets: trapped_clusters[next_cluster_num] = False else: # Otherwise note this cluster as being a trapped cluster trapped_clusters[next_cluster_num] = True # Note this pore as trapped as well trapped_pores[pore] = True # Increment cluster number for next time next_cluster_num += 1 elif nc_uniq.size == 1: c = nc_uniq[0] # Neighbors have one unique cluster number, so assign it to current pore cluster[pore] = c # If pore is an outlet then note cluster as no longer trapped if pore in outlets: trapped_clusters[c] = False # Also set all joined clusters to not trapped cluster_map = qupc_reduce(cluster_map) hits = np.where(cluster_map == cluster_map[c])[0] trapped_clusters[hits] = False # If this cluster number is part of a trapped cluster then # mark pore as trapped if trapped_clusters[c]: trapped_pores[pore] = True elif nc_uniq.size > 1: cluster[pore] = min(nc_uniq) # Merge all clusters into a single cluster for c in nc: qupc_update(cluster_map, c, min(nc_uniq)) cluster_map = qupc_reduce(cluster_map) # If all neighboring clusters are trapped, then set current pore to # trapped as well if np.all(trapped_clusters[nc]): trapped_pores[pore] = True else: # Otherwise set all neighbor clusters to untrapped! trapped_clusters[nc] = False return trapped_pores @njit def _run_accelerated(t_start, t_sorted, t_order, t_inv, p_inv, p_inv_t, conns, idx, indptr, n_steps): # pragma: no cover r""" Numba-jitted run method for InvasionPercolation class. Notes ----- ``idx`` and ``indptr`` are properties are the network's incidence matrix, and are used to quickly find neighbor throats. Numba doesn't like foreign data types (i.e. Network), and so ``find_neighbor_throats`` method cannot be called in a jitted method. Nested wrapper is for performance issues (reduced OpenPNM import) time due to local numba import """ # TODO: The following line is supposed to be numba's new list, but the # heap does not work with this # queue = List(t_start) queue = list(t_start) hq.heapify(queue) count = 1 while count < (n_steps + 1): # Find throat at the top of the queue t = hq.heappop(queue) # Extract actual throat number t_next = t_sorted[t] t_inv[t_next] = count # If throat is duplicated while len(queue) > 0 and queue[0] == t: _ = hq.heappop(queue) # Find pores connected to newly invaded throat from am in coo format Ps = conns[t_next] # Remove already invaded pores from Ps Ps = Ps[p_inv[Ps] < 0] # If either of the neighboring pores are uninvaded (-1), set it to # invaded and add its neighboring throats to the queue if len(Ps) > 0: p_inv[Ps] = count p_inv_t[Ps] = t_next for i in Ps: # Get neighboring throat numbers from im in csr format Ts = idx[indptr[i]:indptr[i+1]] # Keep only throats which are uninvaded Ts = Ts[t_inv[Ts] < 0] for i in Ts: # Add throat to the queue hq.heappush(queue, t_order[i]) count += 1 if len(queue) == 0: break return t_inv, p_inv, p_inv_t # %% if __name__ == '__main__': import matplotlib.pyplot as plt import openpnm as op np.random.seed(0) Nx, Ny, Nz = 25, 25, 1 pn = op.network.Cubic(shape=[Nx, Ny, Nz], spacing=1e-4) pn.add_model_collection(op.models.collections.geometry.spheres_and_cylinders) pn.regenerate_models() pn['pore.volume@left'] = 0.0 # op.topotools.trim(pn, pores=[380, 395]) water = op.phase.Water(network=pn, name='h2o') water.add_model_collection(op.models.collections.physics.standard) water.regenerate_models() ip = InvasionPercolation(network=pn, phase=water) ip.set_inlet_BC(pn.pores('left')) ip.run() ip.set_outlet_BC(pn.pores('right')) # ip.apply_trapping() # %% if 0: pseq = np.copy(ip['pore.invasion_sequence']) tseq = np.copy(ip['throat.invasion_sequence']) ax = op.topotools.plot_connections(pn, tseq >= 0, linestyle='--', c='b', linewidth=3) op.topotools.plot_coordinates(pn, pseq >= 0, c='r', marker='x', markersize=100, ax=ax) op.topotools.plot_coordinates(pn, pseq < 0, c='c', marker='o', markersize=100, ax=ax) # %% if 1: drn = op.algorithms.Drainage(network=pn, phase=water) drn.set_inlet_BC(pn.pores('left')) pressures = np.unique(ip['pore.invasion_pressure']) # pressures = np.logspace(np.log10(0.1e3), np.log10(2e4), 100) drn.run(pressures=pressures) drn.set_outlet_BC(pn.pores('right')) # drn.apply_trapping() fig, ax = plt.subplots(1, 1) ax.step(*ip.pc_curve(), 'b', where='post') ax.step(*drn.pc_curve(), 'r--', where='post') ax.set_ylim([0, 1]) # %% if 0: pseq = ip['pore.invasion_sequence'] pseq[pseq == -1] = pseq.max() + 1 tseq = ip['throat.invasion_sequence'] tseq[tseq == -1] = tseq.max() + 1 for j, i in enumerate(tqdm(np.unique(tseq)[:-1])): ax = op.topotools.plot_connections(pn, tseq <= i, linestyle='--', c='r', linewidth=3) op.topotools.plot_coordinates(pn, pseq <= i, c='b', marker='x', markersize=100, ax=ax) op.topotools.plot_coordinates(pn, ip['pore.trapped'], c='c', marker='o', markersize=100, ax=ax) plt.savefig(f"{str(j).zfill(3)}.png") plt.close()