using System;
using System.Numerics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
namespace ZeroAlloc.Concurrency;
///
/// A bounded, lock-free multi-producer / single-consumer (MPSC) ring buffer based on
/// Dmitry Vyukov's sequenced-cell bounded queue, restricted to a single consumer so the
/// dequeue path needs no CAS at all.
///
/// The element type. Slots holding reference types are cleared on dequeue.
///
///
/// Threading contract: any number of threads may call
/// concurrently. Exactly one thread may call and
/// . The enqueue path is lock-free (a failed CAS means another
/// producer made progress); the dequeue path is wait-free.
///
///
/// Allocation behavior: the only managed allocation is the cell array created in the
/// constructor. Enqueue/dequeue are allocation-free.
///
///
/// Each cell carries a sequence number used as a turn indicator: a cell whose sequence equals the
/// producer's claimed position is free to write; a cell whose sequence equals
/// position + 1 is full and ready to read. After consumption the consumer advances the
/// cell's sequence by the full capacity so the cell becomes writable for the next lap.
///
///
public sealed class MpscRingBuffer
{
private const int CacheLineSize = 64;
/// A storage cell pairing the payload with its lap-sequence number.
private struct Cell
{
/// Turn indicator; see the class remarks for the protocol.
public long Sequence;
/// The stored payload (valid only when the sequence marks the cell as full).
public T Item;
}
/// Producer/consumer cursors padded onto separate cache lines.
[StructLayout(LayoutKind.Explicit, Size = 3 * CacheLineSize)]
private struct Cursors
{
/// Next position producers will claim (CAS-incremented).
[FieldOffset(1 * CacheLineSize)] public long Tail;
/// Next position the consumer will read. Written only by the consumer.
[FieldOffset(2 * CacheLineSize)] public long Head;
}
private readonly Cell[] _cells;
private readonly int _mask;
private readonly long _capacity;
private Cursors _cur;
///
/// Initializes a new ring buffer with at least slots.
/// The actual capacity is rounded up to the next power of two.
///
/// Minimum number of slots; must be between 2 and 2^30 inclusive.
///
/// is less than 2 or greater than 2^30.
///
public MpscRingBuffer(int minCapacity)
{
if (minCapacity < 2 || minCapacity > (1 << 30))
{
throw new ArgumentOutOfRangeException(nameof(minCapacity), minCapacity,
"Capacity must be between 2 and 2^30.");
}
int capacity = (int)BitOperations.RoundUpToPowerOf2((uint)minCapacity);
_cells = new Cell[capacity];
_mask = capacity - 1;
_capacity = capacity;
for (int i = 0; i < capacity; i++)
{
_cells[i].Sequence = i;
}
}
/// Gets the fixed capacity of the buffer (always a power of two).
public int Capacity => _cells.Length;
///
/// Gets an approximate count of items currently in the buffer. Best-effort under concurrency.
///
public int Count
{
get
{
long tail = Volatile.Read(ref _cur.Tail);
long head = Volatile.Read(ref _cur.Head);
long count = tail - head;
if (count < 0) return 0;
return count > _capacity ? (int)_capacity : (int)count;
}
}
///
/// Attempts to enqueue an item. Safe to call from any number of producer threads.
///
/// The item to enqueue.
///
/// if the item was enqueued; if the buffer was
/// full at the time of the attempt.
///
public bool TryEnqueue(in T item)
{
Cell[] cells = _cells;
int mask = _mask;
SpinWait spinner = default;
while (true)
{
long pos = Volatile.Read(ref _cur.Tail);
ref Cell cell = ref cells[(int)(pos & mask)];
long seq = Volatile.Read(ref cell.Sequence);
long diff = seq - pos;
if (diff == 0)
{
// Cell is free for this lap; try to claim the position.
if (Interlocked.CompareExchange(ref _cur.Tail, pos + 1, pos) == pos)
{
cell.Item = item;
// Release-publish: mark cell as full for the consumer.
Volatile.Write(ref cell.Sequence, pos + 1);
return true;
}
// Another producer claimed this slot; retry at the new tail.
}
else if (diff < 0)
{
// The cell still holds last lap's data: the queue is full.
return false;
}
// diff > 0: another producer claimed this position but we read a stale tail; retry.
spinner.SpinOnce();
}
}
///
/// Attempts to dequeue an item. Consumer thread only. Wait-free.
///
/// Receives the dequeued item, or default if no item is ready.
/// if an item was dequeued; otherwise .
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool TryDequeue(out T item)
{
// Plain read: Head is written only by this (the consumer) thread.
long pos = _cur.Head;
ref Cell cell = ref _cells[(int)(pos & _mask)];
long seq = Volatile.Read(ref cell.Sequence);
if (seq - (pos + 1) < 0)
{
// The producer claiming this slot has not finished publishing (or the queue is empty).
item = default!;
return false;
}
item = cell.Item;
if (RuntimeHelpers.IsReferenceOrContainsReferences())
{
cell.Item = default!;
}
// Free the cell for the next lap.
Volatile.Write(ref cell.Sequence, pos + _capacity);
Volatile.Write(ref _cur.Head, pos + 1);
return true;
}
///
/// Dequeues up to .Length ready items. Consumer thread only.
///
/// The span receiving dequeued items.
/// The number of items dequeued (0 if no items were ready).
public int DequeueBatch(Span destination)
{
int written = 0;
while (written < destination.Length && TryDequeue(out T item))
{
destination[written++] = item;
}
return written;
}
}