using System; using System.Numerics; using System.Runtime.CompilerServices; using System.Runtime.InteropServices; using System.Threading; namespace ZeroAlloc.Concurrency; /// /// A bounded, lock-free, wait-free single-producer / single-consumer (SPSC) ring buffer. /// /// The element type. Both value types and reference types are supported; /// slots holding reference types are cleared on dequeue so the buffer never extends object lifetimes. /// /// /// Threading contract: exactly one thread may call the producer methods /// () and exactly one thread may call the consumer methods /// (, , ). /// Producer and consumer may be (and usually are) different threads. Violating this contract /// produces undefined results. /// /// /// Allocation behavior: the only managed allocation is the backing array created in the /// constructor. All enqueue/dequeue operations are allocation-free and never take locks; they use /// a single volatile store for publication, plus cached views of the opposite index so that the /// common case touches only cache lines owned by the calling thread. /// /// /// Indices are monotonically increasing 64-bit sequence numbers; at one billion operations per /// second a wraparound would take roughly 292 years, so overflow is not handled. /// /// public sealed class SpscRingBuffer { private const int CacheLineSize = 64; /// /// Producer- and consumer-owned indices, padded onto distinct cache lines to /// eliminate false sharing between the two threads. /// [StructLayout(LayoutKind.Explicit, Size = 5 * CacheLineSize)] private struct Indices { /// Next sequence number the producer will write. Written only by the producer. [FieldOffset(1 * CacheLineSize)] public long Tail; /// Producer's cached view of . Written only by the producer. [FieldOffset(2 * CacheLineSize)] public long CachedHead; /// Next sequence number the consumer will read. Written only by the consumer. [FieldOffset(3 * CacheLineSize)] public long Head; /// Consumer's cached view of . Written only by the consumer. [FieldOffset(4 * CacheLineSize)] public long CachedTail; } private readonly T[] _buffer; private readonly int _mask; private Indices _idx; /// /// Initializes a new ring buffer with at least slots. /// The actual capacity is rounded up to the next power of two. /// /// Minimum number of slots; must be between 1 and 2^30 inclusive. /// /// is less than 1 or greater than 2^30. /// public SpscRingBuffer(int minCapacity) { if (minCapacity < 1 || minCapacity > (1 << 30)) { throw new ArgumentOutOfRangeException(nameof(minCapacity), minCapacity, "Capacity must be between 1 and 2^30."); } int capacity = (int)BitOperations.RoundUpToPowerOf2((uint)minCapacity); _buffer = new T[capacity]; _mask = capacity - 1; } /// Gets the fixed capacity of the buffer (always a power of two). public int Capacity => _buffer.Length; /// /// Gets an approximate count of items currently in the buffer. The value is exact only when /// observed from a quiescent state; under concurrent use it is a best-effort snapshot. /// This property is allocation-free. /// public int Count { get { long tail = Volatile.Read(ref _idx.Tail); long head = Volatile.Read(ref _idx.Head); long count = tail - head; return count < 0 ? 0 : (int)count; } } /// Gets a best-effort indication of whether the buffer is empty. public bool IsEmpty => Volatile.Read(ref _idx.Tail) == Volatile.Read(ref _idx.Head); /// /// Attempts to enqueue an item. Producer thread only. /// /// The item to enqueue. /// if the item was enqueued; if the buffer is full. [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool TryEnqueue(in T item) { // Plain read: Tail is only ever written by this (the producer) thread. long tail = _idx.Tail; if (tail - _idx.CachedHead >= _buffer.Length) { // Our cached view says full; refresh from the real consumer index. _idx.CachedHead = Volatile.Read(ref _idx.Head); if (tail - _idx.CachedHead >= _buffer.Length) { return false; // genuinely full } } _buffer[tail & _mask] = item; // Release-publish: makes the slot write visible before the new tail. Volatile.Write(ref _idx.Tail, tail + 1); return true; } /// /// Attempts to dequeue an item. Consumer thread only. /// /// Receives the dequeued item, or default if the buffer is empty. /// if an item was dequeued; if the buffer is empty. [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool TryDequeue(out T item) { // Plain read: Head is only ever written by this (the consumer) thread. long head = _idx.Head; if (head >= _idx.CachedTail) { // Our cached view says empty; refresh from the real producer index. _idx.CachedTail = Volatile.Read(ref _idx.Tail); if (head >= _idx.CachedTail) { item = default!; return false; // genuinely empty } } int slot = (int)(head & _mask); item = _buffer[slot]; if (RuntimeHelpers.IsReferenceOrContainsReferences()) { _buffer[slot] = default!; // do not extend object lifetimes } // Release: makes the slot clear visible before the new head. Volatile.Write(ref _idx.Head, head + 1); return true; } /// /// Attempts to read the next item without removing it. Consumer thread only. /// /// Receives the next item, or default if the buffer is empty. /// if an item is available; otherwise . [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool TryPeek(out T item) { long head = _idx.Head; if (head >= _idx.CachedTail) { _idx.CachedTail = Volatile.Read(ref _idx.Tail); if (head >= _idx.CachedTail) { item = default!; return false; } } item = _buffer[head & _mask]; return true; } /// /// Dequeues up to .Length items into the destination span in /// a single pass, amortizing the publication cost. Consumer thread only. /// /// The span receiving dequeued items. /// The number of items actually dequeued (0 if the buffer is empty). public int DequeueBatch(Span destination) { if (destination.IsEmpty) { return 0; } long head = _idx.Head; long available = _idx.CachedTail - head; if (available <= 0) { _idx.CachedTail = Volatile.Read(ref _idx.Tail); available = _idx.CachedTail - head; if (available <= 0) { return 0; } } int toRead = (int)Math.Min(available, destination.Length); for (int i = 0; i < toRead; i++) { int slot = (int)((head + i) & _mask); destination[i] = _buffer[slot]; if (RuntimeHelpers.IsReferenceOrContainsReferences()) { _buffer[slot] = default!; } } Volatile.Write(ref _idx.Head, head + toRead); return toRead; } }