Source code for openpnm.topotools._topotools

import logging
import numpy as np
import scipy as sp
from scipy.spatial import cKDTree
from scipy.sparse import csgraph
from scipy.spatial import ConvexHull
from openpnm.utils import Workspace
import openpnm._skgraph as skgr

logger = logging.getLogger(__name__)
ws = Workspace()
__all__ = [

[docs] def isoutside(**kwargs): return**kwargs)
isoutside.__doc__ =
[docs] def rotate_coords(network, **kwargs): network['pore.coords'] =, **kwargs) return network
rotate_coords.__doc__ =
[docs] def shear_coords(network, **kwargs): network['pore.coords'] =, **kwargs) return network
shear_coords.__doc__ =
[docs] def template_sphere_shell(r_outer, r_inner=0): return, r_inner)
template_sphere_shell.__doc__ = \
[docs] def template_cylinder_annulus(z, r_outer, r_inner=0): return, r_outer, r_inner)
template_cylinder_annulus.__doc__ = \
[docs] def generate_base_points(num_points, domain_size, reflect=True): return num_points, domain_size, reflect, )
generate_base_points.__doc__ = \
[docs] def reflect_base_points(points, domain_size): return, domain_size)
reflect_base_points.__doc__ = \
[docs] def get_spacing(network): return
get_spacing.__doc__ =
[docs] def get_shape(network): return
get_shape.__doc__ =
[docs] def filter_pores_by_z(network, pores, z): return skgr.queries.filter_by_z(network=network, inds=pores, z=z)
filter_pores_by_z.__doc__ = skgr.queries.filter_by_z.__doc__
[docs] def find_interface_throats(network, P1, P2): return skgr.queries.find_common_edges(network=network, inds_1=P1, inds_2=P2)
find_interface_throats.__doc__ = skgr.queries.find_common_edges.__doc__
[docs] def dimensionality(network): r""" Determines whether a network is 1D, 2D or 3D, and in which dimensions Parameters ---------- network : dict The OpenPNM network of interest Returns ------- dims : boolean mask A 3 x 1 array of booleans with ``True`` indicating if any dimensionality exists on each axis. Notes ----- This function looks at the coordinates of each pore and if any axes all have the same values that axis is considered non-dimensional. """ return
[docs] def trim(network, pores=[], throats=[]): """ Remove pores or throats from the network Parameters ---------- network : Network The Network from which pores or throats should be removed pores (or throats) : array_like The indices of the of the pores or throats to be removed from the network. """ pores = network._parse_indices(pores) throats = network._parse_indices(throats) Pkeep = np.copy(network['pore.all']) Tkeep = np.copy(network['throat.all']) if np.size(pores) > 0: Pkeep[pores] = False if not np.any(Pkeep): raise Exception('Cannot delete ALL pores') # # Performing customized find_neighbor_throats which is much faster, # # but not general for other types of queries # temp = np.in1d(network['throat.conns'].flatten(), pores) # temp = np.reshape(temp, (network.Nt, 2)) # Ts = np.any(temp, axis=1) # Ts = network.Ts[Ts] Ts = network.find_neighbor_throats(pores=~Pkeep, mode='union') if len(Ts) > 0: Tkeep[Ts] = False if np.size(throats) > 0: Tkeep[throats] = False # The following IF catches the special case of deleting ALL throats # It removes all throat props, adds 'all', and skips rest of function if not np.any(Tkeep):'Removing ALL throats from network') for item in list(network.keys()): if item.split('.', 1)[0] == 'throat': del network[item] network['throat.all'] = np.array([], ndmin=1) return # Temporarily store throat conns and pore map for processing later Np_old = network.Np Nt_old = network.Nt Pkeep_inds = np.where(Pkeep)[0] Tkeep_inds = np.where(Tkeep)[0] Pmap = np.ones((network.Np,), dtype=int)*-1 tpore1 = network['throat.conns'][:, 0] tpore2 = network['throat.conns'][:, 1] # Delete specified pores and throats from all objects for obj in network.project: if (obj.Np == Np_old) and (obj.Nt == Nt_old): Ps = Pkeep_inds Ts = Tkeep_inds for key in list(obj.keys()): temp = obj.pop(key) if key.split('.', 1)[0] == 'throat': obj.update({key: temp[Ts]}) if key.split('.', 1)[0] == 'pore': obj.update({key: temp[Ps]}) # Remap throat connections Pmap[Pkeep] = np.arange(0, np.sum(Pkeep)) Tnew1 = Pmap[tpore1[Tkeep]] Tnew2 = Pmap[tpore2[Tkeep]] network.update({'throat.conns': np.vstack((Tnew1, Tnew2)).T}) # Clear adjacency and incidence matrices which will be out of date now network._am.clear() network._im.clear()
[docs] def extend(network, coords=[], conns=[], labels=[], **kwargs): r""" Add pores or throats to the network from a list of coords or conns. Parameters ---------- network : Network The network to which pores or throats should be added coords : array_like The coordinates of the pores to add. These will be appended to the 'pore.coords' array so should be of shape N-by-3, where N is the number of pores in the list. conns : array_like The throat connections to add. These will be appended to the 'throat.conns' array so should be of shape N-by-2. Note that the numbering must point to existing pores. labels : str, or list[str], optional A list of labels to apply to the new pores and throats """ if 'throat_conns' in kwargs.keys(): conns = kwargs['throat_conns'] if 'pore_coords' in kwargs.keys(): coords = kwargs['pore_coords'] coords = np.array(coords) conns = np.array(conns) Np_old = network.num_pores() Nt_old = network.num_throats() Np = Np_old + coords.shape[0] Nt = Nt_old + conns.shape[0] if np.any(conns > Np): raise Exception('Some throat conns point to non-existent pores') network.update({'pore.all': np.ones([Np, ], dtype=bool), 'throat.all': np.ones([Nt, ], dtype=bool)}) # Add coords and conns if np.size(coords) > 0: coords = np.vstack((network['pore.coords'], coords)) network['pore.coords'] = coords if np.size(conns) > 0: conns = np.vstack((network['throat.conns'], conns)) network['throat.conns'] = conns # Increase size of any prop or label arrays already on network and phases objs = list(network.project.phases) objs.append(network) for obj in objs: obj.update({'pore.all': np.ones([Np, ], dtype=bool), 'throat.all': np.ones([Nt, ], dtype=bool)}) for item in list(obj.keys()): N = obj._count(element=item.split('.', 1)[0]) if obj[item].shape[0] < N: arr = obj.pop(item) s = arr.shape if arr.dtype == bool: obj[item] = np.zeros(shape=(N, *s[1:]), dtype=bool) else: obj[item] = np.ones(shape=(N, *s[1:]), dtype=float)*np.nan obj[item][:arr.shape[0]] = arr # Regenerate models on all objects to fill new elements for obj in network.project.phases: if hasattr(obj, 'models'): obj.regenerate_models() # Apply labels, if supplied if labels != []: # Convert labels to list if necessary if isinstance(labels, str): labels = [labels] for label in labels: # Remove pore or throat from label, if present label = label.split('.', 1)[-1] if np.size(coords) > 0: Ps = np.r_[Np_old:Np] if 'pore.'+label not in network.labels(): network['pore.'+label] = False network['pore.'+label][Ps] = True if np.size(conns) > 0: Ts = np.r_[Nt_old:Nt] if 'throat.'+label not in network.labels(): network['throat.'+label] = False network['throat.'+label][Ts] = True # Clear adjacency and incidence matrices which will be out of date now network._am.clear() network._im.clear()
[docs] def label_faces(network, tol=0.0, label='surface'): r""" Finds pores on the surface of the network and labels them according to whether they are on the *top*, *bottom*, etc. This function assumes the network is cubic in shape Parameters ---------- network : Network The network to apply the labels tol : scalar The tolerance for defining what counts as a surface pore, which is specifically meant for random networks. All pores with ``tol`` of the maximum or minimum along each axis are counts as pores. The default is 0. label : str An identifying label to isolate the pores on the faces of the network. The default is 'surface'. Surface pores can be found using ``find_surface_pores``. """ if label is not None: label = label.split('.', 1)[-1] if 'pore.'+label not in network.labels(): find_surface_pores(network, label=label) Psurf = network['pore.'+label] else: Psurf = True # So it will "do nothing" below crds = network['pore.coords'] xmin, xmax = np.amin(crds[:, 0]), np.amax(crds[:, 0]) xspan = xmax - xmin ymin, ymax = np.amin(crds[:, 1]), np.amax(crds[:, 1]) yspan = ymax - ymin zmin, zmax = np.amin(crds[:, 2]), np.amax(crds[:, 2]) zspan = zmax - zmin dims = dimensionality(network) if dims[0]: network['pore.left'] = (crds[:, 0] <= (xmin + tol*xspan)) * Psurf network['pore.xmin'] = np.copy(network['pore.left']) network['pore.right'] = (crds[:, 0] >= (xmax - tol*xspan)) * Psurf network['pore.xmax'] = np.copy(network['pore.right']) if dims[1]: network['pore.front'] = (crds[:, 1] <= (ymin + tol*yspan)) * Psurf network['pore.ymin'] = np.copy(network['pore.front']) network['pore.back'] = (crds[:, 1] >= (ymax - tol*yspan)) * Psurf network['pore.ymax'] = np.copy(network['pore.back']) if dims[2]: network['pore.bottom'] = (crds[:, 2] <= (zmin + tol*zspan)) * Psurf network['pore.zmin'] = np.copy(network['pore.bottom']) network[''] = (crds[:, 2] >= (zmax - tol*zspan)) * Psurf network['pore.zmax'] = np.copy(network[''])
[docs] def find_surface_pores(network, markers=None, label='surface'): r""" Find the pores on the surface of the domain by performing a Delaunay triangulation between the network pores and some external ``markers``. All pores connected to these external marker points are considered surface pores. Parameters ---------- network: Network The network for which the surface pores are to be found markers: array_like 3 x N array of the marker coordinates to use in the triangulation. The labeling is performed in one step, so all points are added, and then any pores connected to at least one marker is given the provided label. By default, this function will automatically generate 6 points outside each axis of the network domain. Users may wish to specify a single external marker point and provide an appropriate label in order to identify specific faces. For instance, the marker may be *above* the domain, and the label might be 'top_surface'. label : str The label to apply to the pores. The default is 'surface'. Notes ----- This function does not check whether the given markers actually lie outside the domain, allowing the labeling of *internal* sufaces. If this method fails to mark some surface pores, consider sending more markers on each face. """ import scipy.spatial as sptl dims = dimensionality(network) coords = network['pore.coords'][:, dims] if markers is None: # normalize coords to a 1 unit cube centered on origin coords -= np.amin(coords, axis=0) coords /= np.amax(coords, axis=0) coords -= 0.5 npts = max((network.Np/10, 100)) if sum(dims) == 1: network['pore.'+label] = True return if sum(dims) == 2: r = 0.75 theta = np.linspace(0, 2*np.pi, int(npts), dtype=float) x = r*np.cos(theta) y = r*np.sin(theta) markers = np.vstack((x, y)).T if sum(dims) == 3: r = 1.00 indices = np.arange(0, int(npts), dtype=float) + 0.5 phi = np.arccos(1 - 2*indices/npts) theta = np.pi * (1 + 5**0.5) * indices x = r*np.cos(theta) * np.sin(phi) y = r*np.sin(theta) * np.sin(phi) z = r*np.cos(phi) markers = np.vstack((x, y, z)).T else: if sum(dims) == 1: pass if sum(dims) == 2: markers = np.atleast_2d(markers) if markers.shape[1] != 2: raise Exception('Network appears planar, so markers must be 2D') if sum(dims) == 3: markers = np.atleast_2d(markers) if markers.shape[1] != 3: raise Exception('Markers must be 3D for this network') pts = np.vstack((coords, markers)) tri = sptl.Delaunay(pts, incremental=False) (indices, indptr) = tri.vertex_neighbor_vertices for k in range(network.Np, tri.npoints): neighbors = indptr[indices[k]:indices[k+1]] inds = np.where(neighbors < network.Np) neighbors = neighbors[inds] if 'pore.'+label not in network.keys(): network['pore.'+label] = False network['pore.'+label][neighbors] = True
[docs] def clone_pores(network, pores, labels=['clone'], mode='parents'): r""" Clones the specified pores and adds them to the network Parameters ---------- network : Network The Network object to which the new pores are to be added pores : array_like List of pores to clone labels : str, or list[str] The labels to apply to the clones, default is 'clone' mode : str Controls the connections between parents and clones. Options are: =========== ========================================================== mode description =========== ========================================================== 'parents' Each clone is connected only to its parent.(Default) 'siblings' Clones are only connected to each other in the same manner as parents were connected. 'isolated' No connections between parents or siblings =========== ========================================================== """ if isinstance(labels, str): labels = [labels] network._parse_indices(pores) Np = network.Np Nt = network.Nt # Clone pores parents = np.array(pores, ndmin=1) pcurrent = network['pore.coords'] pclone = pcurrent[pores, :] pnew = np.concatenate((pcurrent, pclone), axis=0) Npnew = np.shape(pnew)[0] clones = np.arange(Np, Npnew) # Create cloned pores first extend(network=network, pore_coords=pclone) # Apply provided labels to cloned pores for item in labels: network.set_label(label=item, pores=range(Np, Npnew)) # Add connections between parents and clones if mode == 'parents': tclone = np.vstack((parents, clones)).T extend(network=network, conns=tclone) elif mode == 'siblings': ts = network.find_neighbor_throats(pores=pores, mode='xnor') mapping = np.zeros([network.Np, ], dtype=int) mapping[pores] = np.arange(Np, network.Np) tclone = mapping[network['throat.conns'][ts]] extend(network=network, throat_conns=tclone) elif mode == 'isolated': pass Ntnew = network.Nt for item in labels: network.set_label(label=item, throats=range(Nt, Ntnew)) # Clear adjacency and incidence matrices which will be out of date now network._am.clear() network._im.clear()
[docs] def merge_networks(network, donor=[]): r""" Combine multiple networks into one without making any topological connections Parameters ---------- network : Network The network to which all the other networks should be added. donor : Network or list of Objects The network object(s) to add to the given network See Also -------- extend trim stitch """ if isinstance(donor, list): donors = donor else: donors = [donor] for donor in donors: network['throat.conns'] = np.vstack((network['throat.conns'], donor['throat.conns'] + network.Np)) network['pore.coords'] = np.vstack((network['pore.coords'], donor['pore.coords'])) p_all = np.ones((np.shape(network['pore.coords'])[0],), dtype=bool) t_all = np.ones((np.shape(network['throat.conns'])[0],), dtype=bool) network.update({'pore.all': p_all}) network.update({'throat.all': t_all}) for key in set(network.keys()).union(set(donor.keys())): if key.split('.')[1] not in ['conns', 'coords', '_id', 'all']: if key in network.keys(): pop_flag = False # If key not on donor add it first with dummy values to # simplify merging later if key not in donor.keys(): logger.debug('Adding ' + key + ' to donor') if network[key].dtype == bool: # Deal with labels donor[key] = False else: # Deal with numerical data element = key.split('.', 1)[0] shape = list(network[key].shape) N = donor._count(element) shape[0] = N donor[key] = np.empty(shape=shape)*np.nan pop_flag = True # Then merge it with existing array on network if len(network[key].shape) == 1: temp = np.hstack((network[key], donor[key])) else: temp = np.vstack((network[key], donor[key])) network[key] = temp if pop_flag: donor.pop(key, None) else: # If key not on network add it first logger.debug('Adding ' + key + ' to network') if donor[key].dtype == bool: network[key] = False else: data_shape = list(donor[key].shape) pore_prop = True if key.split(".")[0] == "pore" else False data_shape[0] = network.Np if pore_prop else network.Nt network[key] = np.empty(data_shape) * np.nan # Then append donor values to network s = np.shape(donor[key])[0] network[key][-s:] = donor[key] # Clear adjacency and incidence matrices which will be out of date now network._am.clear() network._im.clear()
[docs] def stitch(network, donor, P_network, P_donor, method='nearest', len_max=np.inf, label_suffix='', label_stitches='stitched'): r""" Stitches a second a network to the current network. Parameters ---------- network : Network The Network to which to donor Network will be attached donor : Network The Network to stitch on to the current Network P_network : array_like The pores on the current Network P_donor : array_like The pores on the donor Network label_suffix : str or None Some text to append to each label in the donor Network before inserting them into the recipient. The default is to append no text, but a common option would be to append the donor Network's name. To insert none of the donor labels, use ``None``. label_stitches : str or list[str] The label to apply to the newly created 'stitch' throats. The defaul is 'stitched'. If performing multiple stitches in a row it might be helpful to the throats created during each step uniquely for later identification. len_max : float Set a length limit on length of new throats method : str (default = 'nearest') The method to use when making pore to pore connections. Options are: ========= ============================================================= mode description ========= ============================================================= 'radius' Connects each pore on the recipient network to the nearest pores on the donor network, within ``len_max`` 'nearest' Connects each pore on the recipienet network to the nearest pore on the donor network. ========= ============================================================= Notes ----- Before stitching it is necessary to translate the pore coordinates of one of the Networks so that it is positioned correctly relative to the other. This is illustrated in the example below. """ # Parse inputs if isinstance(label_stitches, str): label_stitches = [label_stitches] for s in label_stitches: if s not in network.keys(): network['throat.' + s] = False # Get the initial number of pores and throats N_init = {} N_init['pore'] = network.Np N_init['throat'] = network.Nt if method == 'nearest': P1 = P_network P2 = P_donor + N_init['pore'] # Increment pores on donor C1 = network['pore.coords'][P_network] C2 = donor['pore.coords'][P_donor] D = sp.spatial.distance.cdist(C1, C2) [P1_ind, P2_ind] = np.where(D == D.min(axis=0)) conns = np.vstack((P1[P1_ind], P2[P2_ind])).T elif method == 'radius': P1 = P_network P2 = P_donor + N_init['pore'] # Increment pores on donor C1 = network['pore.coords'][P_network] C2 = donor['pore.coords'][P_donor] D = sp.spatial.distance.cdist(C1, C2) [P1_ind, P2_ind] = np.where(D <= len_max) conns = np.vstack((P1[P1_ind], P2[P2_ind])).T else: raise Exception('<{}> method not supported'.format(method)) merge_networks(network, donor) # Add the new stitch throats to the Network extend(network=network, throat_conns=conns, labels=label_stitches) # Remove donor from Workspace, if present # This check allows for the reuse of a donor Network multiple times for sim in list(ws.values()): if donor in sim: del ws[]
def stitch_pores(network, pores1, pores2, mode='gabriel'): r""" Stitches together pores in a network with disconnected clusters Parameters ---------- network : OpenPNM Network The network to operate upon pores1 and pores2: array_like The pore indices of the disconnected clusters to be joined mode : str Dictates which tesselation method is used to identify which pores to stitch together. Options are: =========== ========================================================== mode meaning =========== ========================================================== 'delaunay' Uses the delaunay tesselation method =========== ========================================================== Returns ------- None The network is operated on 'in-place' so nothing is returned. """ raise NotImplementedError() from import Delaunay pores1 = network._parse_indices(pores1) pores2 = network._parse_indices(pores2) C1 = network.coords[pores1, :] C2 = network.coords[pores2, :] crds = np.vstack((C1, C2)) if mode == 'delaunay': shape = get_shape(network) * dimensionality(network) net = Delaunay(points=crds, shape=shape) else: raise Exception('Unsupported mode') net.set_label(pores=range(len(pores1)), label='') net.set_label(pores=range(len(pores2)), label='pore.two') Ts = net.find_neighbor_throats(pores=net.pores('one'), mode='xor') conns = net.conns[Ts] mapped_conns = np.vstack((pores1[conns[:, 0]], pores2[conns[:, 1] - len(pores1)])).T mapped_conns = np.sort(mapped_conns, axis=1) extend(network=network, conns=mapped_conns, labels='stitched')
[docs] def connect_pores(network, pores1, pores2, labels=['new_conns']): r""" Returns the possible connections between two groups of pores Parameters ---------- network : Network The network to which the pores should be added pores1 : array_like The first group of pores on the network pores2 : array_like The second group of pores on the network labels : list of strings The labels to apply to the new throats. The default is ``'new_conns'``. Notes ----- The method also works if ``pores1`` and ``pores2`` are list of lists, in which case it consecutively connects corresponding members of the two lists in a 1-to-1 fashion. Example: pores1 = [[0, 1], [2, 3]] and pores2 = [[5], [7, 9]] leads to creation of the following connections: :: 0 --> 5 2 --> 7 3 --> 7 1 --> 5 2 --> 9 3 --> 9 If you want to use the batch functionality, make sure that each element within ``pores1`` and ``pores2`` are of type list or ndarray. """ # Assert that `pores1` and `pores2` are list of lists try: len(pores1[0]) except (TypeError, IndexError): pores1 = [pores1] try: len(pores2[0]) except (TypeError, IndexError): pores2 = [pores2] if len(pores1) != len(pores2): raise Exception('Running in batch mode! pores1 and pores2 must be' + ' of the same length.') arr1, arr2 = [], [] for ps1, ps2 in zip(pores1, pores2): size1 = np.size(ps1) size2 = np.size(ps2) arr1.append(np.repeat(ps1, size2)) arr2.append(np.tile(ps2, size1)) conns = np.vstack([np.concatenate(arr1), np.concatenate(arr2)]).T extend(network=network, throat_conns=conns, labels=labels)
[docs] def merge_pores(network, pores, labels=['merged'], include_neighbors=True): r""" Combines a selection of pores into a new single pore located at the centroid of the selected pores (and optionally their neighbors) and connected to all of their neighbors. Parameters ---------- network : Network pores : array_like The list of pores which are to be combined into a new single pore labels : str or list[str] The labels to apply to the new pore and new throat connections Notes ----- (1) The method also works if a list of lists is passed, in which case it consecutively merges the given selections of pores. (2) The selection of pores should be chosen carefully, preferrable so that they all form a continuous cluster. For instance, it is recommended to use the ``find_nearby_pores`` method to find all pores within a certain distance of a given pore, and these can then be merged without causing any abnormal connections. """ # Assert that `pores` is list of lists try: len(pores[0]) except (TypeError, IndexError): pores = [pores] N = len(pores) NBs, XYZs = [], [] for Ps in pores: temp = network.find_neighbor_pores(pores=Ps, mode='union', flatten=True, include_input=False) NBs.append(temp) if len(Ps) == 2: XYZs.append(np.mean(network["pore.coords"][Ps], axis=0)) else: if include_neighbors: points = np.concatenate((temp, Ps)) else: points = Ps XYZs.append(hull_centroid(network["pore.coords"][points])) extend(network, pore_coords=XYZs, labels=labels) Pnew = network.Ps[-N::] # Possible throats between new pores: This only happens when running in # batch mode, i.e. multiple groups of pores are to be merged. In case # some of these groups share elements, possible throats between the # intersecting elements is not captured and must be added manually. pores_set = [set(items) for items in pores] NBs_set = [set(items) for items in NBs] ps1, ps2 = [], [] from itertools import combinations for i, j in combinations(range(N), 2): if not NBs_set[i].isdisjoint(pores_set[j]): ps1.append([network.Ps[-N+i]]) ps2.append([network.Ps[-N+j]]) # Add (possible) connections between the new pores connect_pores(network, pores1=ps1, pores2=ps2, labels=labels) # Add connections between the new pores and the rest of the network connect_pores(network, pores2=np.split(Pnew, N), pores1=NBs, labels=labels) # Trim merged pores from the network trim(network=network, pores=np.concatenate(pores))
[docs] def hull_centroid(points): r""" Computes centroid of the convex hull enclosing the given coordinates. Parameters ---------- points : Np by 3 ndarray Coordinates (xyz) Returns ------- centroid : array A 3 by 1 Numpy array containing coordinates of the centroid. """ dim = [np.unique(points[:, i]).size != 1 for i in range(3)] hull = ConvexHull(points[:, dim]) centroid = points.mean(axis=0) centroid[dim] = hull.points[hull.vertices].mean(axis=0) return centroid
[docs] def add_boundary_pores(network, pores, offset=None, move_to=None, apply_label='boundary'): r""" This method uses ``clone_pores`` to clone the input pores, then shifts them the specified amount and direction, then applies the given label. Parameters ---------- pores : array_like List of pores to offset. If no pores are specified, then it assumes that all surface pores are to be cloned. offset : 3 x 1 array The distance in vector form which the cloned boundary pores should be offset. Either this, or ``move_to`` must be specified. move_to : 3 x 1 array The location to move the boundary pores to. A value of ``None`` indicates that no translation should be applied in that axis. For instance, ``[None, None, 0]`` indicates that the boundary pores should moved along the z-axis to the specified location. Either this or ``offset`` must be specified. apply_label : str This label is applied to the boundary pores. Default is 'boundary'. """ # Parse the input pores Ps = np.array(pores, ndmin=1) if Ps.dtype is bool: Ps = network.to_indices(Ps) if np.size(pores) == 0: # Handle an empty array if given return np.array([], dtype=np.int64) # Clone the specifed pores clone_pores(network=network, pores=Ps) newPs = network.pores('pore.clone') del network['pore.clone'] newTs = network.throats('clone') del network['throat.clone'] if offset is not None: # Offset the cloned pores network['pore.coords'][newPs] += offset if move_to is not None: # Move the cloned pores for i, d in enumerate(move_to): if d is not None: temp = network['pore.coords'][newPs] temp[:, i] = d network['pore.coords'][newPs] = temp # Apply labels to boundary pores (trim leading 'pores' if present) label = apply_label.split('.', 1)[-1] plabel = 'pore.' + label tlabel = 'throat.' + label network[plabel] = False network[plabel][newPs] = True network[tlabel] = False network[tlabel][newTs] = True
[docs] def iscoplanar(coords): r""" Determines if given pores are coplanar with each other Parameters ---------- coords : array_like List of pore coords to check for coplanarity. At least 3 pores are required. Returns ------- results : bool A boolean value of whether given points are coplanar (``True``) or not (``False``) """ coords = np.array(coords, ndmin=1) if np.shape(coords)[0] < 3: raise Exception('At least 3 input pores are required') Px = coords[:, 0] Py = coords[:, 1] Pz = coords[:, 2] # Do easy check first, for common coordinate if np.shape(np.unique(Px))[0] == 1: return True if np.shape(np.unique(Py))[0] == 1: return True if np.shape(np.unique(Pz))[0] == 1: return True # Perform rigorous check using vector algebra # Grab first basis vector from list of coords n1 = np.array((Px[1] - Px[0], Py[1] - Py[0], Pz[1] - Pz[0])).T n = np.array([0.0, 0.0, 0.0]) i = 1 while n.sum() == 0: if i >= (np.size(Px) - 1): logger.warning('No valid basis vectors found') return False # Chose a secon basis vector n2 = np.array((Px[i+1] - Px[i], Py[i+1] - Py[i], Pz[i+1] - Pz[i])).T # Find their cross product n = np.cross(n1, n2) i += 1 # Create vectors between all other pairs of points r = np.array((Px[1:-1] - Px[0], Py[1:-1] - Py[0], Pz[1:-1] - Pz[0])) # Ensure they all lie on the same plane n_dot =, r) return bool(np.sum(np.absolute(n_dot)) == 0)
[docs] def is_fully_connected(network, pores_BC=None): r""" Checks whether network is fully connected, i.e. not clustered. Parameters ---------- network : Network The network whose connectivity to check. pores_BC : array_like (optional) The pore indices of boundary conditions (inlets/outlets). Returns ------- bool If ``pores_BC`` is not specified, then returns ``True`` only if the entire network is connected to the same cluster. If ``pores_BC`` is given, then returns ``True`` only if all clusters are connected to the given boundary condition pores. """ am = network.get_adjacency_matrix(fmt='lil').copy() temp = csgraph.connected_components(am, directed=False)[1] is_connected = np.unique(temp).size == 1 # Ensure all clusters are part of pores, if given if not is_connected and pores_BC is not None: am.resize(network.Np + 1, network.Np + 1) pores_BC = network._parse_indices(pores_BC) am.rows[-1] = pores_BC.tolist()[-1] = np.arange(network.Nt, network.Nt + len(pores_BC)).tolist() temp = csgraph.connected_components(am, directed=False)[1] is_connected = np.unique(temp).size == 1 return is_connected
[docs] def get_domain_area(network, inlets=None, outlets=None): r""" Determine the cross sectional area relative to the inlets/outlets. Parameters ---------- network : Network The network object containing the pore coordinates inlets : array_like The pore indices of the inlets. outlets : array_Like The pore indices of the outlets. Returns ------- area : scalar The cross sectional area relative to the inlets/outlets. """ logger.warning('Attempting to estimate inlet area...will be low') if dimensionality(network).sum() != 3: raise Exception('The network is not 3D, specify area manually') inlets = network.coords[inlets] outlets = network.coords[outlets] if not iscoplanar(inlets): logger.error('Detected inlet pores are not coplanar') if not iscoplanar(outlets): logger.error('Detected outlet pores are not coplanar') Nin = np.ptp(inlets, axis=0) > 0 if Nin.all(): logger.warning('Detected inlets are not oriented along a principle axis') Nout = np.ptp(outlets, axis=0) > 0 if Nout.all(): logger.warning('Detected outlets are not oriented along a principle axis') hull_in = ConvexHull(points=inlets[:, Nin]) hull_out = ConvexHull(points=outlets[:, Nout]) if hull_in.volume != hull_out.volume: logger.error('Inlet and outlet faces are different area') area = hull_in.volume # In 2D: volume=area, area=perimeter return area
[docs] def get_domain_length(network, inlets=None, outlets=None): r""" Determine the domain length relative to the inlets/outlets. Parameters ---------- network : Network The network object containing the pore coordinates inlets : array_like The pore indices of the inlets. outlets : array_Like The pore indices of the outlets. Returns ------- area : scalar The domain length relative to the inlets/outlets. """ msg = ('Attempting to estimate domain length...could be low if' ' boundary pores were not added') logger.warning(msg) inlets = network.coords[inlets] outlets = network.coords[outlets] if not iscoplanar(inlets): logger.error('Detected inlet pores are not coplanar') if not iscoplanar(outlets): logger.error('Detected inlet pores are not coplanar') tree = cKDTree(data=inlets) Ls = np.unique(np.float64(tree.query(x=outlets)[0])) if not np.allclose(Ls, Ls[0]): logger.error('A unique value of length could not be found') length = Ls[0] return length
[docs] def reduce_coordination(network, z): r""" Deletes throats on network to match specified average coordination number Parameters ---------- target : Network The network whose throats are to be trimmed z : scalar The desired average coordination number. It is not possible to specify the distribution of the coordination, only the mean value. Returns ------- trim : ndarray A boolean array with ``True`` values indicating which pores to trim (using ``op.topotools.trim``) to obtain the desired average coordination number. Notes ----- This method first finds the minimum spanning tree of the network using random weights on each throat, then assures that these throats are *not* deleted, in order to maintain network connectivity. The list of throats to trim is generated randomly from the throats *not* on the spanning tree. """ # Find minimum spanning tree using random weights am = network.create_adjacency_matrix(weights=np.random.rand(network.Nt), triu=False) mst = csgraph.minimum_spanning_tree(am, overwrite=True) mst = mst.tocoo() # Label throats on spanning tree to avoid deleting them Ts = network.find_connecting_throat(mst.row, mst.col) Ts = np.hstack(Ts) network['throat.mst'] = False network['throat.mst'][Ts] = True # Trim throats not on the spanning tree to acheive desired coordination Ts = np.random.permutation(network.throats('mst', mode='nor')) del network['throat.mst'] Ts = Ts[:int(network.Nt - network.Np*(z/2))] Ts = network.to_mask(throats=Ts) return Ts
[docs] def add_reservoir_pore(cls, network, pores, offset=0.1): r""" Adds a single pore connected to all ``pores`` to act as a reservoir This function is mostly needed to make network compatible with the Statoil file format, which requires reservoir pores on the inlet and outlet faces. Parameters ---------- network : Network The network to which the reservoir pore should be added pores : array_like The pores to which the reservoir pore should be connected to offset : scalar Controls the distance which the reservoir is offset from the given ``pores``. The total displacement is found from the network dimension normal to given ``pores``, multiplied by ``offset``. Returns ------- project : list An OpenPNM project object with a new geometry object added to represent the newly added pore and throats. """ import openpnm.models.geometry as mods # Check if a label was given and fetch actual indices if isinstance(pores, str): # Convert 'face' into 'pore.face' if necessary if not pores.startswith('pore.'): pores = 'pore.' + pores pores = network.pores(pores) # Find coordinates of pores on given face coords = network['pore.coords'][pores] # Normalize the coordinates based on full network size c_norm = coords/network['pore.coords'].max(axis=0) # Identify axis of face by looking for dim with smallest delta diffs = np.amax(c_norm - np.average(c_norm, axis=0), axis=0) ax = np.where(diffs == diffs.min())[0][0] # Add new pore at center of domain new_coord = network['pore.coords'].mean(axis=0) domain_half_length = np.ptp(network['pore.coords'][:, ax])/2 if coords[:, ax].mean() < network['pore.coords'][:, ax].mean(): new_coord[ax] = new_coord[ax] - domain_half_length*(1 + offset) if coords[:, ax].mean() > network['pore.coords'][:, ax].mean(): new_coord[ax] = new_coord[ax] + domain_half_length*(1 + offset) # Ps = np.arange(network.Np, network.Np + 1) extend(network=network, coords=[new_coord], labels=['reservoir']) conns = [[P, network.Np-1] for P in pores] # Ts = np.arange(network.Nt, network.Nt + len(conns)) extend(network=network, conns=conns, labels=['reservoir']) # Compute the geometrical properties of the reservoir pore and throats # Confirm if network has any geometry props on it props = {'throat.length', 'pore.diameter', 'throat.volume'} if len(set(network.keys()).intersection(props)) > 0: raise Exception('Geometrical properties should be moved to a ' + 'geometry object first') # or just do this?: geo = Imported(network=network) network.add_model(propname='pore.diameter', model=mods.geometry.pore_size.largest_sphere) network.add_model(propname='throat.diameter_temp', model=mods.geometry.throat_size.from_neighbor_pores, mode='min') network.add_model(propname='throat.diameter', model=mods.misc.scaled, prop='throat.diameter_temp', factor=0.5) network.add_model(propname='throat.volume', model=mods.geometry.throat_volume.cylinder) return network.project