import logging
import numpy as np
import openpnm.models.misc as misc
from openpnm.phase import Phase as Phase
from openpnm.utils import Docorator, TypedSet
logger = logging.getLogger(__name__)
docstr = Docorator()
__all__ = [
'MultiPhase',
'multiphase_diffusion',
]
@docstr.dedent
class MultiPhaseSettings:
"""
Parameters
----------
%(PhaseSettings.parameters)s
phases : list of strings
The name of the phase objects which comprize this multiphase object
throat_occupancy : str
Indicates how throat occupancy is determined. Options are:
=========== ==========================================================
mode description
=========== ==========================================================
'automatic' The occupancy of each throat is calculated based on the
occupancy of the two neighboring pores
'manual' The occupancy of each throat must be set by hand by
assigning values to 'throat.occupancy.{phase.name}'
=========== ==========================================================
partition_coef_prefix : str
The throat property which contains the partition coefficient values
"""
phases = TypedSet(types=[str])
throat_occupancy = 'manual'
partition_coef_prefix = 'throat.partition_coef'
[docs]
@docstr.dedent
class MultiPhase(Phase):
"""
Creates a Phase object that represents a multiphase system consisting of
a given list of Phases.
Parameters
----------
phases : list[Phase]
A list containing the phase objects that make up the multiphase system
%(Phase.parameters)s
Notes
-----
This class assumes that only a SINGLE phase exists in each pore/throat.
"""
def __init__(self, phases=[], name='mphase_?', **kwargs):
super().__init__(name=name, **kwargs)
self.settings._update(MultiPhaseSettings())
# Pressure/temperature must come from constituent phases
self.pop('pore.temperature', None)
self.pop('pore.pressure', None)
# Initialize the partition coefficient, K
self._K = np.ones(self.Nt, dtype=float)
prefix = self.settings["partition_coef_prefix"]
self[f"{prefix}.global"] = self._K
# Add supplied phases to phases dict and initialize occupancy to 0
self.add_phases(phases)
def __getitem__(self, key):
try:
vals = super().__getitem__(key)
except KeyError:
vals = self._interleave_data(key)
return vals
@property
def phases(self):
phases = {self.project[item].name: self.project[item]
for item in self.settings['phases']}
return phases
@property
def K(self):
self._build_K()
return self._K
@K.setter
def K(self, value):
self._K = value
[docs]
def add_phases(self, phases):
"""
Adds supplied phases to MultiPhase object and sets occupancy to 0.
Parameters
----------
phases : list[Phase] or Phase
"""
phases = np.array(phases, ndmin=1)
for phase in phases:
if phase.name in self.settings["phases"]:
continue
self.settings['phases'].add(phase.name)
self[f'pore.occupancy.{phase.name}'] = 0.0
self[f'throat.occupancy.{phase.name}'] = 0.0
[docs]
def set_occupancy(self, phase, *, pores=[], throats=[], values=1):
r"""
Specifies occupancy of a phase in each pore or throat. This
method doesn't return any value.
Parameters
----------
phase : Phase
The phase whose occupancy is being specified.
pores : ndarray
The location of pores whose occupancy is to be set.
throats : ndarray
The location of throats whose occupancy is to be set.
values : ndarray or float
Pore/throat occupancy values.
"""
pores = np.array(pores, ndmin=1)
throats = np.array(throats, ndmin=1)
if not(pores.size ^ throats.size):
raise Exception("Must either pass 'pores' or 'throats'")
if phase not in self.project:
raise Exception(f"{phase.name} doesn't belong to this project")
self.add_phases(phase)
if pores.size:
self[f'pore.occupancy.{phase.name}'][pores] = values
if throats.size:
self[f'throat.occupancy.{phase.name}'][throats] = values
if self.settings["throat_occupancy"] == "automatic":
self.regenerate_models(propnames=f"throat.occupancy.{phase.name}")
[docs]
def regenerate_models(self, propnames=None, exclude=[]):
r"""
Regenerate models associated with the Multiphase object
This method works by first regenerating the models associated with
the constituent phases, and then regenerating Multiphase models.
Parameters
----------
propnames : list[str] or str
The list of property names to be regenerated. If None are
given then ALL models are re-run (except for those whose
``regen_mode`` is 'constant').
exclude : list[str]
Since the default behavior is to run ALL models, this can be
used to exclude specific models. It may be more convenient to
supply as list of 2 models to exclude than to specify 8 models
to include.
"""
# Regenerate models associated with phases within MultiPhase object
for phase in self.phases.values():
phase.regenerate_models(propnames=propnames, exclude=exclude)
# Regenerate models specific to MultiPhase object
super().regenerate_models(propnames=propnames, exclude=exclude)
[docs]
def set_binary_partition_coef(self, phases, model, **kwargs):
"""
Sets binary partition coefficient as defined by the interface
concentration ratio of phase 1 to phase 2.
Parameters
----------
phases : list[Phase]
List of the two phases for which the binary partition
coefficient model is being added.
model : OpenPNM model
Model for calculating the binary partition coefficient.
kwargs : dict
Keyword arguments to be passed to the ``model``.
"""
assert len(phases) == 2
# Add partition coefficient interface model to the MultiPhase
propname_prefix = self.settings["partition_coef_prefix"]
self._add_interface_prop(propname_prefix, phases, model, **kwargs)
self._build_K()
def _add_interface_prop(self, propname, phases, model, **kwargs):
"""
Adds an interface model to the MultiPhase object. See Notes.
Notes
-----
Let's say the two phases corresponding to the interface model are
named: 'air' and 'water', and the interface propname to be added
is 'throat.foo'. After augmentation, 'throat.foo.air:water' will
be the propname that's stored on the ``MultiPhase`` object.
Note that because of this convention, the order of the phases that
are passed to this method is important.
"""
# Add "throat" keyword to the begining of propname if no identifier is found
if propname.split(".")[0] not in ["pore", "throat"]:
propname = f"throat.{propname}"
# Check propname is throat property
if propname.startswith("pore"):
raise Exception("'propname' must be a throat property")
# Add model to Multiphase
propname = self._format_interface_prop(propname, phases)
self.add_model(propname, model, **kwargs)
def _format_interface_prop(self, propname, phases):
"""Formats propname as {propname}.{phase[0].name}:{phase[1].name}"""
prefix = propname
suffix = ":".join(phase.name for phase in phases)
return f"{prefix}.{suffix}"
def _get_phase_labels(self, formatted_propname):
"""Retrieves phases names from a formatted propname"""
assert ":" in formatted_propname
return formatted_propname.split(".")[-1].split(":")
def _get_interface_throats(self, phase1, phase2):
conns = self.network.conns
occ1 = self[f"pore.occupancy.{phase1}"][conns]
occ2 = self[f"pore.occupancy.{phase2}"][conns]
idx12, = np.where((occ1[:, 0] == 1) & (occ2[:, 1] == 1))
idx21, = np.where((occ2[:, 0] == 1) & (occ1[:, 1] == 1))
return idx12, idx21
def _build_K(self):
"""Updates the global partition coefficient array"""
prefix = self.settings["partition_coef_prefix"]
self._K = np.ones(self.Nt, dtype=float)
# Find all binary partition coefficient models
models = [k for k in self.models.keys() if k.startswith(prefix)]
# Modify the global partition coefficient for each phase pair
for model in models:
K12 = self[model]
phase1, phase2 = self._get_phase_labels(model)
idx12, idx21 = self._get_interface_throats(phase1, phase2)
self._K[idx12] = K12[idx12]
self._K[idx21] = 1 / K12[idx21]
# Store a reference in self as a propname for convenience
self[f"{prefix}.global"][:] = self._K
def _interleave_data(self, prop):
"""Gathers property values from component phases to build a single array."""
element = self._parse_element(prop)[0]
vals = np.zeros(self._count(element=element), dtype=float)
# Retrieve property from constituent phases (weight = occupancy)
for phase in self.phases.values():
vals += phase[prop] * self[f"{element}.occupancy.{phase.name}"]
return vals
def _set_automatic_throat_occupancy(self, mode="mean"):
"""
Automatically interpolates throat occupancy based on that in
adjacent pores. This method doesn't return any value.
Parameters
----------
mode : str
Interpolation method. Options are:
=========== =====================================================
mode meaning
=========== =====================================================
'mean' sets the throat occupancy as the average of that in
adjacent pores.
'min' sets the throat occupancy as the minimum value of
that in adjacent pores.
'max' sets the throat occupancy as the maximum value of
that in adjacent pores.
=========== =====================================================
"""
self.settings['throat_occupancy'] = 'automatic'
for phase in self.phases.values():
self.add_model(propname=f"throat.occupancy.{phase.name}",
model=misc.from_neighbor_pores,
prop=f"pore.occupancy.{phase.name}",
mode=mode)
[docs]
def multiphase_diffusion(phase,
pore_diffusivity="pore.diffusivity",
throat_diffusivity="throat.diffusivity",
size_factors="throat.diffusive_size_factors",
partition_coef_global="throat.partition_coef.global"):
r"""
Calculates the diffusive conductance of conduits for multiphase systems.
Parameters
----------
%(phase)s
pore_diffusivity : str
%(dict_blurb)s pore diffusivity
throat_diffusivity : str
%(dict_blurb)s throat diffusivity
size_factors : str
%(dict_blurb)s conduit size factors
Returns
-------
%(return_arr)s diffusive conductance
Notes
-----
This method assumes that ``phase["partition_coef"]`` contains information
on binary phase partitioning. See ``MultiPhase`` class documentation for
more information.
"""
network = phase.network
cn = network.conns
SF = network[size_factors]
if isinstance(SF, dict):
F1, Ft, F2 = SF.values()
elif SF.ndim > 1:
F1, Ft, F2 = SF.T
else:
F1, Ft, F2 = np.inf, SF, np.inf
# Fetch model parameters
D1, D2 = phase[pore_diffusivity][cn].T
Dt = phase[throat_diffusivity]
g1 = D1 * F1
gt = Dt * Ft
g2 = D2 * F2
# Apply Henry's partitioning coefficient
# Note: m12 = (G21*c1 - G12*c2) NOT (G12*c1 - G21*c2)
K12 = phase[partition_coef_global]
G21 = (1/g1 + 0.5/gt + K12 * (1/g2 + 0.5/gt)) ** -1
G12 = K12 * G21
return np.vstack((G12, G21)).T