using System; using System.Numerics; using System.Runtime.CompilerServices; using System.Runtime.InteropServices; using System.Threading; namespace ZeroAlloc.Concurrency; /// /// A bounded, lock-free multi-producer / single-consumer (MPSC) ring buffer based on /// Dmitry Vyukov's sequenced-cell bounded queue, restricted to a single consumer so the /// dequeue path needs no CAS at all. /// /// The element type. Slots holding reference types are cleared on dequeue. /// /// /// Threading contract: any number of threads may call /// concurrently. Exactly one thread may call and /// . The enqueue path is lock-free (a failed CAS means another /// producer made progress); the dequeue path is wait-free. /// /// /// Allocation behavior: the only managed allocation is the cell array created in the /// constructor. Enqueue/dequeue are allocation-free. /// /// /// Each cell carries a sequence number used as a turn indicator: a cell whose sequence equals the /// producer's claimed position is free to write; a cell whose sequence equals /// position + 1 is full and ready to read. After consumption the consumer advances the /// cell's sequence by the full capacity so the cell becomes writable for the next lap. /// /// public sealed class MpscRingBuffer { private const int CacheLineSize = 64; /// A storage cell pairing the payload with its lap-sequence number. private struct Cell { /// Turn indicator; see the class remarks for the protocol. public long Sequence; /// The stored payload (valid only when the sequence marks the cell as full). public T Item; } /// Producer/consumer cursors padded onto separate cache lines. [StructLayout(LayoutKind.Explicit, Size = 3 * CacheLineSize)] private struct Cursors { /// Next position producers will claim (CAS-incremented). [FieldOffset(1 * CacheLineSize)] public long Tail; /// Next position the consumer will read. Written only by the consumer. [FieldOffset(2 * CacheLineSize)] public long Head; } private readonly Cell[] _cells; private readonly int _mask; private readonly long _capacity; private Cursors _cur; /// /// Initializes a new ring buffer with at least slots. /// The actual capacity is rounded up to the next power of two. /// /// Minimum number of slots; must be between 2 and 2^30 inclusive. /// /// is less than 2 or greater than 2^30. /// public MpscRingBuffer(int minCapacity) { if (minCapacity < 2 || minCapacity > (1 << 30)) { throw new ArgumentOutOfRangeException(nameof(minCapacity), minCapacity, "Capacity must be between 2 and 2^30."); } int capacity = (int)BitOperations.RoundUpToPowerOf2((uint)minCapacity); _cells = new Cell[capacity]; _mask = capacity - 1; _capacity = capacity; for (int i = 0; i < capacity; i++) { _cells[i].Sequence = i; } } /// Gets the fixed capacity of the buffer (always a power of two). public int Capacity => _cells.Length; /// /// Gets an approximate count of items currently in the buffer. Best-effort under concurrency. /// public int Count { get { long tail = Volatile.Read(ref _cur.Tail); long head = Volatile.Read(ref _cur.Head); long count = tail - head; if (count < 0) return 0; return count > _capacity ? (int)_capacity : (int)count; } } /// /// Attempts to enqueue an item. Safe to call from any number of producer threads. /// /// The item to enqueue. /// /// if the item was enqueued; if the buffer was /// full at the time of the attempt. /// public bool TryEnqueue(in T item) { Cell[] cells = _cells; int mask = _mask; SpinWait spinner = default; while (true) { long pos = Volatile.Read(ref _cur.Tail); ref Cell cell = ref cells[(int)(pos & mask)]; long seq = Volatile.Read(ref cell.Sequence); long diff = seq - pos; if (diff == 0) { // Cell is free for this lap; try to claim the position. if (Interlocked.CompareExchange(ref _cur.Tail, pos + 1, pos) == pos) { cell.Item = item; // Release-publish: mark cell as full for the consumer. Volatile.Write(ref cell.Sequence, pos + 1); return true; } // Another producer claimed this slot; retry at the new tail. } else if (diff < 0) { // The cell still holds last lap's data: the queue is full. return false; } // diff > 0: another producer claimed this position but we read a stale tail; retry. spinner.SpinOnce(); } } /// /// Attempts to dequeue an item. Consumer thread only. Wait-free. /// /// Receives the dequeued item, or default if no item is ready. /// if an item was dequeued; otherwise . [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool TryDequeue(out T item) { // Plain read: Head is written only by this (the consumer) thread. long pos = _cur.Head; ref Cell cell = ref _cells[(int)(pos & _mask)]; long seq = Volatile.Read(ref cell.Sequence); if (seq - (pos + 1) < 0) { // The producer claiming this slot has not finished publishing (or the queue is empty). item = default!; return false; } item = cell.Item; if (RuntimeHelpers.IsReferenceOrContainsReferences()) { cell.Item = default!; } // Free the cell for the next lap. Volatile.Write(ref cell.Sequence, pos + _capacity); Volatile.Write(ref _cur.Head, pos + 1); return true; } /// /// Dequeues up to .Length ready items. Consumer thread only. /// /// The span receiving dequeued items. /// The number of items dequeued (0 if no items were ready). public int DequeueBatch(Span destination) { int written = 0; while (written < destination.Length && TryDequeue(out T item)) { destination[written++] = item; } return written; } }