Source code for openpnm.contrib._transient_multiphysics

import logging

import numpy as np

from openpnm.algorithms import Algorithm
from openpnm.algorithms._solution import SolutionContainer, TransientSolution
from openpnm.integrators import ScipyRK45
from openpnm.utils import Docorator

logger = logging.getLogger(__name__)
docstr = Docorator()


__all__ = [
    'TransientMultiPhysics',
]


@docstr.dedent
class TransientMultiPhysicsSettings:
    r"""

    Parameters
    ----------
    %(AlgorithmSettings.parameters)s
    algorithms: list
        List of transient algorithm objects to be solved in a coupled manner

    """
    algorithms = []


[docs] @docstr.dedent class TransientMultiPhysics(Algorithm): r""" A subclass for transient multiphysics simulations. """ def __init__(self, algorithms, **kwargs): super().__init__(**kwargs) self.settings._update(TransientMultiPhysicsSettings()) self.settings.algorithms = [alg.name for alg in algorithms] self._algs = algorithms
[docs] def run(self, x0, tspan, saveat=None, integrator=None): """ Runs all of the transient algorithms simultaneoulsy and returns the solution. Parameters ---------- x0 : ndarray or float Array (or scalar) containing initial condition values. tspan : array_like Tuple (or array) containing the integration time span. saveat : array_like or float, optional If an array is passed, it signifies the time points at which the solution is to be stored, and if a scalar is passed, it refers to the interval at which the solution is to be stored. integrator : Integrator, optional Integrator object which will be used to to the time stepping. Can be instantiated using openpnm.integrators module. Returns ------- TransientSolution The solution object, which is basically a numpy array with the added functionality that it can be called to return the solution at intermediate times (i.e., those not stored in the solution object). In the case of multiphysics, the solution object is a combined array of solutions for each physics. The solution for each physics is available on each algorithm object independently. """ logger.info('Running TransientMultiphysics') if np.isscalar(saveat): saveat = np.arange(*tspan, saveat) if (saveat is not None) and (tspan[1] not in saveat): saveat = np.hstack((saveat, [tspan[1]])) integrator = ScipyRK45() if integrator is None else integrator for i, alg in enumerate(self._algs): # Perform pre-solve validations alg._validate_settings() alg._validate_topology_health() alg._validate_linear_system() # Write x0 to algorithm the obj (needed by _update_iterative_props) x0_i = self._get_x0(x0, i) alg['pore.ic'] = x0_i = np.ones(alg.Np, dtype=float) * x0_i alg._merge_inital_and_boundary_values() # Build RHS (dx/dt = RHS), then integrate the system of ODEs rhs = self._build_rhs() # Integrate RHS using the given solver soln = integrator.solve(rhs, x0, tspan, saveat) # Return dictionary containing solution self.soln = SolutionContainer() for i, alg in enumerate(self._algs): # Slice soln and attach as TransientSolution object to each alg t = soln.t x = soln[i*alg.Np:(i+1)*alg.Np, :] alg.soln = TransientSolution(t, x) # Add solution of each alg to solution dictionary self.soln[alg.settings['quantity']] = alg.soln
def _run_special(self, x0): ... def _build_rhs(self): """ Returns a function handle, which calculates dy/dt = rhs(y, t). Notes ----- ``y`` is a composite array that contains ALL the variables that the multiphysics algorithm solves for, e.g., if the constituent algorithms are ``TransientFickianDiffusion``, and ``TransientFourierConduction``, ``y[0:Np-1]`` refers to the concentration, and ``[Np:2*Np-1]`` refers to the temperature values. """ def ode_func(t, y): # Initialize RHS rhs = [] for i, alg in enumerate(self._algs): # Get x from y, assume alg.Np is same for all algs x = self._get_x0(y, i) # again use helper function # Store x onto algorithm, alg.x = x # Build A and b alg._update_A_and_b() A = alg.A.tocsc() b = alg.b # Retrieve volume V = alg.network[alg.settings["pore_volume"]] # Calcualte RHS rhs_alg = np.hstack(-A.dot(x) + b)/V rhs = np.hstack((rhs, rhs_alg)) return rhs return ode_func def _get_x0(self, x0, i): tmp = [alg.Np for alg in self._algs] idx_end = np.cumsum(tmp) idx_start = np.hstack((0, idx_end[:-1])) x0 = x0[idx_start[i]:idx_end[i]] return x0