"""Vectorised signal factories for setpoints, disturbances and noise. Every public function in this module is a *factory*: it returns a callable of type :data:`SignalFn` that maps a 1-D time array to a 1-D value array of the same shape. The loop engine evaluates the factory output once, up front, on the full time grid — signals are therefore deterministic functions of time, never of loop state. (State-dependent inputs belong in a custom :class:`~pidtune.simulation.plant.Plant` or controller; see ``docs/design/04_api_specification.md``, "Signals are functions of time".) Why factories rather than plain arrays? Three reasons: * Users compose signals symbolically (``add(step(...), prbs(...))``) without committing to a time grid, and the same composed signal can be reused across simulations with different ``dt``. * Stochastic signals (:func:`prbs`, :func:`gaussian_noise`) carry their seed inside the closure, so a :class:`SimulationResult` is exactly reproducible from the factory arguments alone. * Factories are trivially serialisable to the experiment records used by optimisation-based tuners. All factories are **API stubs** in this milestone; bodies raise :class:`NotImplementedError` until the simulation-engine milestone. """ from __future__ import annotations from typing import Callable, Optional import numpy as np from numpy.typing import NDArray __all__ = [ "SignalFn", "constant", "step", "ramp", "pulse", "doublet", "sine", "prbs", "gaussian_noise", "add", "scale", "make_time_grid", ] #: A signal: maps a 1-D time array (seconds, or the model's time unit) #: to a 1-D value array of identical shape. All factories below return #: callables satisfying this alias. SignalFn = Callable[[NDArray[np.float64]], NDArray[np.float64]] def make_time_grid(t_final: float, dt: float) -> NDArray[np.float64]: """Build the uniform time grid used by the loop engine. Returns ``numpy.arange(0, t_final + dt/2, dt)`` — i.e. the grid *includes* ``t = 0`` and includes ``t_final`` when it is an integer multiple of ``dt`` (the half-step tolerance guards against float accumulation). Centralised here so that every component in the library agrees on grid semantics. Parameters ---------- t_final : float End time, strictly positive. dt : float Sample period, strictly positive and not greater than ``t_final``. Returns ------- numpy.ndarray 1-D float64 array of monotonically increasing sample times. Raises ------ ValueError If ``t_final`` or ``dt`` violates the constraints above. """ raise NotImplementedError( "make_time_grid is an API stub; implementation is scheduled for the " "simulation-engine milestone (docs/design/06_roadmap.md)." ) def constant(value: float) -> SignalFn: """Signal that is ``value`` everywhere. Parameters ---------- value : float The constant level. Returns ------- SignalFn ``f(t) -> full_like(t, value)``. """ raise NotImplementedError( "signals.constant is an API stub; implementation is scheduled for the " "simulation-engine milestone (docs/design/06_roadmap.md)." ) def step(amplitude: float = 1.0, start_time: float = 0.0, baseline: float = 0.0) -> SignalFn: """Step from ``baseline`` to ``baseline + amplitude`` at ``start_time``. The transition is inclusive on the left: samples with ``t >= start_time`` take the stepped value. Parameters ---------- amplitude : float, optional Step height (may be negative). Default ``1.0``. start_time : float, optional Time of the transition. Default ``0.0``. baseline : float, optional Value before the step. Default ``0.0``. Returns ------- SignalFn Examples -------- >>> sp = step(amplitude=5.0, start_time=10.0) # doctest: +SKIP """ raise NotImplementedError( "signals.step is an API stub; implementation is scheduled for the " "simulation-engine milestone (docs/design/06_roadmap.md)." ) def ramp(slope: float, start_time: float = 0.0, baseline: float = 0.0) -> SignalFn: """Ramp of given ``slope`` beginning at ``start_time``. ``f(t) = baseline`` for ``t < start_time`` and ``baseline + slope * (t - start_time)`` afterwards. Parameters ---------- slope : float Rate of change per unit time. start_time : float, optional When the ramp begins. Default ``0.0``. baseline : float, optional Value before the ramp. Default ``0.0``. Returns ------- SignalFn """ raise NotImplementedError( "signals.ramp is an API stub; implementation is scheduled for the " "simulation-engine milestone (docs/design/06_roadmap.md)." ) def pulse( amplitude: float, start_time: float, duration: float, baseline: float = 0.0, ) -> SignalFn: """Rectangular pulse: ``baseline + amplitude`` on ``[start, start+duration)``. Commonly used as a load disturbance in closed-loop validation (pass it as ``load_disturbance`` to :func:`~pidtune.simulation.loop.simulate_closed_loop`). Parameters ---------- amplitude : float Pulse height relative to ``baseline``. start_time : float Pulse onset time. duration : float Pulse width; must be strictly positive. baseline : float, optional Value outside the pulse. Default ``0.0``. Returns ------- SignalFn Raises ------ ValueError If ``duration`` is not strictly positive. """ raise NotImplementedError( "signals.pulse is an API stub; implementation is scheduled for the " "simulation-engine milestone (docs/design/06_roadmap.md)." ) def doublet(amplitude: float, start_time: float, half_width: float) -> SignalFn: """Symmetric doublet: ``+amplitude`` then ``-amplitude``, each for ``half_width``. The classic low-net-energy identification input: it perturbs the process while returning the input to its baseline, limiting drift away from the operating point. Parameters ---------- amplitude : float Magnitude of each lobe. start_time : float Onset of the positive lobe. half_width : float Duration of each lobe; must be strictly positive. Returns ------- SignalFn """ raise NotImplementedError( "signals.doublet is an API stub; implementation is scheduled for the " "simulation-engine milestone (docs/design/06_roadmap.md)." ) def sine(amplitude: float, period: float, phase: float = 0.0, offset: float = 0.0) -> SignalFn: """Sinusoid ``offset + amplitude * sin(2*pi*t/period + phase)``. Parameters ---------- amplitude : float Peak deviation from ``offset``. period : float Oscillation period; must be strictly positive. phase : float, optional Phase in radians. Default ``0.0``. offset : float, optional DC offset. Default ``0.0``. Returns ------- SignalFn """ raise NotImplementedError( "signals.sine is an API stub; implementation is scheduled for the " "simulation-engine milestone (docs/design/06_roadmap.md)." ) def prbs( amplitude: float, bit_period: float, start_time: float = 0.0, baseline: float = 0.0, seed: Optional[int] = None, ) -> SignalFn: """Pseudo-random binary sequence toggling between ``baseline ± amplitude``. The sequence is generated from a maximal-length LFSR seeded by ``seed`` (or by ``numpy.random.default_rng`` entropy when ``seed`` is ``None``), held for ``bit_period`` per bit. Because the seed lives in the closure, re-evaluating the returned signal on any time grid reproduces the identical bit pattern — a requirement for the reproducible identification experiments described in ``docs/design/03_tuning_methods.md``. Parameters ---------- amplitude : float Half the peak-to-peak excursion. bit_period : float Hold time per bit; should be 2–10× the loop sample time and comparable to the dominant process time constant divided by 3–5 for good spectral coverage. start_time : float, optional Before this time the signal equals ``baseline``. Default ``0.0``. baseline : float, optional Centre level. Default ``0.0``. seed : int, optional RNG seed for reproducibility. Default ``None``. Returns ------- SignalFn """ raise NotImplementedError( "signals.prbs is an API stub; implementation is scheduled for the " "simulation-engine milestone (docs/design/06_roadmap.md)." ) def gaussian_noise(std: float, seed: Optional[int] = None) -> SignalFn: """White Gaussian noise of standard deviation ``std``. Intended for the ``measurement_noise`` hook of :func:`~pidtune.simulation.loop.simulate_closed_loop` to test derivative-filter settings. One independent sample is drawn per grid point; the closure owns its own ``numpy.random.Generator`` so runs are reproducible given ``seed``. Parameters ---------- std : float Standard deviation; must be non-negative. seed : int, optional RNG seed. Default ``None``. Returns ------- SignalFn """ raise NotImplementedError( "signals.gaussian_noise is an API stub; implementation is scheduled for " "the simulation-engine milestone (docs/design/06_roadmap.md)." ) def add(*signals: SignalFn) -> SignalFn: """Pointwise sum of one or more signals. Parameters ---------- *signals : SignalFn Signals to combine; at least one required. Returns ------- SignalFn ``f(t) -> sum(s(t) for s in signals)``. Examples -------- Setpoint step plus a later disturbance-rejection probe:: sp = add(step(1.0, start_time=5.0), pulse(0.2, 60.0, 10.0)) """ raise NotImplementedError( "signals.add is an API stub; implementation is scheduled for the " "simulation-engine milestone (docs/design/06_roadmap.md)." ) def scale(signal: SignalFn, factor: float) -> SignalFn: """Pointwise scaling of a signal by ``factor``. Parameters ---------- signal : SignalFn The signal to scale. factor : float Multiplicative factor. Returns ------- SignalFn ``f(t) -> factor * signal(t)``. """ raise NotImplementedError( "signals.scale is an API stub; implementation is scheduled for the " "simulation-engine milestone (docs/design/06_roadmap.md)." )