"""Plant abstractions: the stateful, sample-by-sample side of a process. A *process model* (:mod:`pidtune.models`) is a declarative description — "FOPDT with :math:`K=2,\\ \\tau=10,\\ \\theta=3`". A *plant* is the runtime object that the loop engine actually drives: it holds internal state (integrator states, the dead-time delay line, the last held input) and advances one fixed step at a time. The split exists for three reasons (see ``docs/design/01_architecture.md``, section "Models vs. plants"): 1. **Tuners never need state.** Rule-based tuners consume model *parameters* only; coupling them to a stateful object would make them harder to test and to serialise. 2. **Simulation needs more than linear models.** Users validating a tuning against reality frequently have a nonlinear ODE, actuator saturation, or measurement quantisation. :class:`FunctionPlant` admits any of these without touching the model layer. 3. **Discretisation is a simulation concern.** The same FOPDT model may be discretised with zero-order hold for honest closed-loop replay, or with Tustin for frequency-shape fidelity; that choice belongs next to the loop engine, not in the model. Contract -------- Every plant follows a strict three-phase lifecycle enforced by the loop engine: 1. :meth:`Plant.initialize` — called exactly once per simulation run with the fixed step ``dt``. Linear plants discretise here. 2. :meth:`Plant.step` — called once per sample with the held control value ``u``; returns the measurement ``y`` *at the end of* the step. Calling ``step`` before ``initialize`` raises ``SimulationError`` (see :mod:`pidtune.exceptions`). 3. :meth:`Plant.reset` — returns the plant to its initial state so the same instance can be re-run (e.g. by optimisation-based tuners that simulate the loop hundreds of times). Plants are SISO in v1; the MIMO extension path is documented in ``docs/design/05_errors_extensibility.md``. """ from __future__ import annotations import abc import enum from typing import Callable, Optional, Sequence, TYPE_CHECKING import numpy as np from numpy.typing import NDArray if TYPE_CHECKING: # imported for annotations only; no runtime dependency from ..models.base import ProcessModel __all__ = ["Discretization", "Plant", "LinearPlant", "FunctionPlant"] class Discretization(enum.Enum): """Discretisation method used by :class:`LinearPlant`. Attributes ---------- ZOH Zero-order hold. Exact for the staircase inputs produced by a sampled controller; the default and the recommended choice for closed-loop tuning validation. TUSTIN Bilinear (Tustin) transform. Preserves frequency-response shape up to the warping limit; useful when comparing simulated margins against analytic ones. EULER Forward Euler. Cheapest and least accurate; provided mainly so that documentation examples can illustrate step-size sensitivity. Stability requires ``dt`` small relative to the fastest plant time constant. Notes ----- Dead time is *not* discretised with the dynamics. It is realised exactly as a FIFO delay line of ``round(theta / dt)`` samples, and :meth:`LinearPlant.initialize` raises ``SimulationError`` if the rounding error exceeds 10 % of ``dt`` (see ``docs/design/04_api_specification.md``, "Dead-time realisation"). """ ZOH = "zoh" TUSTIN = "tustin" EULER = "euler" class Plant(abc.ABC): """Abstract base class for all simulatable plants. Subclasses must implement :meth:`initialize`, :meth:`step` and :meth:`reset`. The loop engine treats plants as black boxes satisfying exactly this contract, so user-defined plants (hardware bridges, co-simulation adapters, nonlinear ODEs) plug in without any registration step — subclassing is sufficient. Examples -------- A custom plant wrapping a first-order nonlinear tank:: class TankPlant(Plant): def __init__(self, area: float = 1.0, cv: float = 0.5) -> None: self.area, self.cv = area, cv self._h = 0.0 self._dt: float | None = None def initialize(self, dt: float) -> None: self._dt = dt def step(self, u: float) -> float: dh = (u - self.cv * math.sqrt(max(self._h, 0.0))) / self.area self._h += self._dt * dh return self._h def reset(self) -> None: self._h = 0.0 """ @abc.abstractmethod def initialize(self, dt: float) -> None: """Prepare the plant for a run with fixed sample time ``dt``. Called exactly once by the loop engine before the first :meth:`step`. Implementations should perform discretisation, allocate delay lines, and validate that ``dt`` is compatible with their dynamics here, so that :meth:`step` can be a tight, allocation-free inner loop. Parameters ---------- dt : float Sample period in the model's time units. Must be strictly positive. Raises ------ pidtune.exceptions.SimulationError If ``dt`` is non-positive, or incompatible with the plant (e.g. dead time not representable within tolerance, or a forward-Euler discretisation that would be unstable). """ raise NotImplementedError( "Plant.initialize is abstract; see docs/design/04_api_specification.md." ) @abc.abstractmethod def step(self, u: float) -> float: """Advance the plant by one sample under held input ``u``. Parameters ---------- u : float Control signal, held constant (zero-order hold) over the sample interval. Returns ------- float Plant output (measurement) at the *end* of the interval. This end-of-interval convention, together with the controller acting on the *previous* measurement, gives the loop engine its one-sample computational delay — matching real digital control loops. See ``docs/design/04_api_specification.md``, "Loop sequencing". Raises ------ pidtune.exceptions.SimulationError If called before :meth:`initialize`. """ raise NotImplementedError( "Plant.step is abstract; see docs/design/04_api_specification.md." ) @abc.abstractmethod def reset(self) -> None: """Return the plant to its initial state. After ``reset``, the plant behaves as a freshly constructed instance except that :meth:`initialize` need not be called again if ``dt`` is unchanged. The loop engine calls ``reset`` at the start of every run, which is what makes a single plant instance safely reusable inside optimisation-based tuners. """ raise NotImplementedError( "Plant.reset is abstract; see docs/design/04_api_specification.md." ) class LinearPlant(Plant): """Stateful simulator for any :class:`~pidtune.models.base.ProcessModel`. Wraps a declarative linear model (FOPDT, SOPDT, or arbitrary rational transfer function), converts it to a discrete-time state-space realisation in :meth:`initialize`, and realises dead time as an exact FIFO delay line. Parameters ---------- model : pidtune.models.base.ProcessModel The process model to simulate. The model is treated as immutable; the plant takes a reference, not a copy. method : Discretization, optional Discretisation method for the delay-free dynamics. Default :attr:`Discretization.ZOH`. y0 : float, optional Initial output (steady-state operating point in deviation variables). Default ``0.0``. Internal states are initialised to the steady state consistent with ``y0``. input_limits : tuple of float, optional Optional ``(lo, hi)`` hard saturation applied to ``u`` before it enters the dynamics, modelling actuator limits *in the plant* (in addition to any limits configured in the controller). Default ``None`` (no saturation). Raises ------ pidtune.exceptions.ModelError If the model is improper or has no realisable state-space form. Examples -------- >>> from pidtune.models import FOPDTModel >>> from pidtune.simulation import LinearPlant >>> plant = LinearPlant(FOPDTModel(gain=2.0, time_constant=10.0, ... dead_time=3.0)) Notes ----- The state-space realisation and the ZOH/Tustin matrix formulas are specified in ``docs/design/02_data_models.md``, section "Realisation of rational models". Implementation is scheduled for the simulation-engine milestone. """ def __init__( self, model: "ProcessModel", method: Discretization = Discretization.ZOH, y0: float = 0.0, input_limits: Optional[tuple[float, float]] = None, ) -> None: self.model = model self.method = method self.y0 = float(y0) self.input_limits = input_limits def initialize(self, dt: float) -> None: """Discretise the model and allocate the dead-time delay line. See :meth:`Plant.initialize` for the general contract. In addition, ``LinearPlant`` validates that ``round(theta/dt)`` approximates the model dead time within 10 % of ``dt`` and raises ``SimulationError`` otherwise, directing the user to choose a commensurate ``dt``. """ raise NotImplementedError( "LinearPlant.initialize is an API stub; implementation is scheduled " "for the simulation-engine milestone (docs/design/06_roadmap.md)." ) def step(self, u: float) -> float: """Advance the discrete state-space realisation by one sample.""" raise NotImplementedError( "LinearPlant.step is an API stub; implementation is scheduled " "for the simulation-engine milestone (docs/design/06_roadmap.md)." ) def reset(self) -> None: """Restore steady state at ``y0`` and flush the delay line.""" raise NotImplementedError( "LinearPlant.reset is an API stub; implementation is scheduled " "for the simulation-engine milestone (docs/design/06_roadmap.md)." ) class FunctionPlant(Plant): """Plant defined by user-supplied ODE right-hand side and output map. Admits arbitrary (including nonlinear, time-varying) SISO dynamics .. math:: \\dot{x} = f(t, x, u), \\qquad y = h(t, x, u), integrated with a fixed-step RK4 scheme (one macro-step per sample, with optional substeps for stiff-ish systems). Parameters ---------- f : callable Right-hand side ``f(t, x, u) -> dx`` where ``x`` and ``dx`` are 1-D :class:`numpy.ndarray` of equal length and ``t``, ``u`` are floats. h : callable Output map ``h(t, x, u) -> y`` returning a float. x0 : sequence of float Initial state vector. substeps : int, optional Number of RK4 substeps per controller sample. Default ``1``. Increase for dynamics significantly faster than ``dt``. Raises ------ pidtune.exceptions.SimulationError At :meth:`step` time, if ``f`` or ``h`` returns a non-finite value (the offending ``t`` is included in the message — see ``docs/design/05_errors_extensibility.md``, "Fail loudly with context"). Examples -------- A saturating heater with quadratic loss:: import numpy as np from pidtune.simulation import FunctionPlant def f(t, x, u): return np.array([0.1 * min(u, 100.0) - 0.02 * x[0] ** 2]) plant = FunctionPlant(f=f, h=lambda t, x, u: x[0], x0=[20.0]) """ def __init__( self, f: Callable[[float, NDArray[np.float64], float], NDArray[np.float64]], h: Callable[[float, NDArray[np.float64], float], float], x0: Sequence[float], substeps: int = 1, ) -> None: self.f = f self.h = h self.x0 = np.asarray(x0, dtype=np.float64) self.substeps = int(substeps) def initialize(self, dt: float) -> None: """Validate ``dt`` and ``substeps`` and cache the substep size.""" raise NotImplementedError( "FunctionPlant.initialize is an API stub; implementation is scheduled " "for the simulation-engine milestone (docs/design/06_roadmap.md)." ) def step(self, u: float) -> float: """Integrate one macro-step with RK4 and return ``h(t, x, u)``.""" raise NotImplementedError( "FunctionPlant.step is an API stub; implementation is scheduled " "for the simulation-engine milestone (docs/design/06_roadmap.md)." ) def reset(self) -> None: """Restore ``x = x0`` and ``t = 0``.""" raise NotImplementedError( "FunctionPlant.reset is an API stub; implementation is scheduled " "for the simulation-engine milestone (docs/design/06_roadmap.md)." )