using System;
using System.Numerics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
namespace ZeroAlloc.Concurrency;
///
/// A bounded, lock-free, wait-free single-producer / single-consumer (SPSC) ring buffer.
///
/// The element type. Both value types and reference types are supported;
/// slots holding reference types are cleared on dequeue so the buffer never extends object lifetimes.
///
///
/// Threading contract: exactly one thread may call the producer methods
/// () and exactly one thread may call the consumer methods
/// (, , ).
/// Producer and consumer may be (and usually are) different threads. Violating this contract
/// produces undefined results.
///
///
/// Allocation behavior: the only managed allocation is the backing array created in the
/// constructor. All enqueue/dequeue operations are allocation-free and never take locks; they use
/// a single volatile store for publication, plus cached views of the opposite index so that the
/// common case touches only cache lines owned by the calling thread.
///
///
/// Indices are monotonically increasing 64-bit sequence numbers; at one billion operations per
/// second a wraparound would take roughly 292 years, so overflow is not handled.
///
///
public sealed class SpscRingBuffer
{
private const int CacheLineSize = 64;
///
/// Producer- and consumer-owned indices, padded onto distinct cache lines to
/// eliminate false sharing between the two threads.
///
[StructLayout(LayoutKind.Explicit, Size = 5 * CacheLineSize)]
private struct Indices
{
/// Next sequence number the producer will write. Written only by the producer.
[FieldOffset(1 * CacheLineSize)] public long Tail;
/// Producer's cached view of . Written only by the producer.
[FieldOffset(2 * CacheLineSize)] public long CachedHead;
/// Next sequence number the consumer will read. Written only by the consumer.
[FieldOffset(3 * CacheLineSize)] public long Head;
/// Consumer's cached view of . Written only by the consumer.
[FieldOffset(4 * CacheLineSize)] public long CachedTail;
}
private readonly T[] _buffer;
private readonly int _mask;
private Indices _idx;
///
/// Initializes a new ring buffer with at least slots.
/// The actual capacity is rounded up to the next power of two.
///
/// Minimum number of slots; must be between 1 and 2^30 inclusive.
///
/// is less than 1 or greater than 2^30.
///
public SpscRingBuffer(int minCapacity)
{
if (minCapacity < 1 || minCapacity > (1 << 30))
{
throw new ArgumentOutOfRangeException(nameof(minCapacity), minCapacity,
"Capacity must be between 1 and 2^30.");
}
int capacity = (int)BitOperations.RoundUpToPowerOf2((uint)minCapacity);
_buffer = new T[capacity];
_mask = capacity - 1;
}
/// Gets the fixed capacity of the buffer (always a power of two).
public int Capacity => _buffer.Length;
///
/// Gets an approximate count of items currently in the buffer. The value is exact only when
/// observed from a quiescent state; under concurrent use it is a best-effort snapshot.
/// This property is allocation-free.
///
public int Count
{
get
{
long tail = Volatile.Read(ref _idx.Tail);
long head = Volatile.Read(ref _idx.Head);
long count = tail - head;
return count < 0 ? 0 : (int)count;
}
}
/// Gets a best-effort indication of whether the buffer is empty.
public bool IsEmpty => Volatile.Read(ref _idx.Tail) == Volatile.Read(ref _idx.Head);
///
/// Attempts to enqueue an item. Producer thread only.
///
/// The item to enqueue.
/// if the item was enqueued; if the buffer is full.
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool TryEnqueue(in T item)
{
// Plain read: Tail is only ever written by this (the producer) thread.
long tail = _idx.Tail;
if (tail - _idx.CachedHead >= _buffer.Length)
{
// Our cached view says full; refresh from the real consumer index.
_idx.CachedHead = Volatile.Read(ref _idx.Head);
if (tail - _idx.CachedHead >= _buffer.Length)
{
return false; // genuinely full
}
}
_buffer[tail & _mask] = item;
// Release-publish: makes the slot write visible before the new tail.
Volatile.Write(ref _idx.Tail, tail + 1);
return true;
}
///
/// Attempts to dequeue an item. Consumer thread only.
///
/// Receives the dequeued item, or default if the buffer is empty.
/// if an item was dequeued; if the buffer is empty.
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool TryDequeue(out T item)
{
// Plain read: Head is only ever written by this (the consumer) thread.
long head = _idx.Head;
if (head >= _idx.CachedTail)
{
// Our cached view says empty; refresh from the real producer index.
_idx.CachedTail = Volatile.Read(ref _idx.Tail);
if (head >= _idx.CachedTail)
{
item = default!;
return false; // genuinely empty
}
}
int slot = (int)(head & _mask);
item = _buffer[slot];
if (RuntimeHelpers.IsReferenceOrContainsReferences())
{
_buffer[slot] = default!; // do not extend object lifetimes
}
// Release: makes the slot clear visible before the new head.
Volatile.Write(ref _idx.Head, head + 1);
return true;
}
///
/// Attempts to read the next item without removing it. Consumer thread only.
///
/// Receives the next item, or default if the buffer is empty.
/// if an item is available; otherwise .
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool TryPeek(out T item)
{
long head = _idx.Head;
if (head >= _idx.CachedTail)
{
_idx.CachedTail = Volatile.Read(ref _idx.Tail);
if (head >= _idx.CachedTail)
{
item = default!;
return false;
}
}
item = _buffer[head & _mask];
return true;
}
///
/// Dequeues up to .Length items into the destination span in
/// a single pass, amortizing the publication cost. Consumer thread only.
///
/// The span receiving dequeued items.
/// The number of items actually dequeued (0 if the buffer is empty).
public int DequeueBatch(Span destination)
{
if (destination.IsEmpty)
{
return 0;
}
long head = _idx.Head;
long available = _idx.CachedTail - head;
if (available <= 0)
{
_idx.CachedTail = Volatile.Read(ref _idx.Tail);
available = _idx.CachedTail - head;
if (available <= 0)
{
return 0;
}
}
int toRead = (int)Math.Min(available, destination.Length);
for (int i = 0; i < toRead; i++)
{
int slot = (int)((head + i) & _mask);
destination[i] = _buffer[slot];
if (RuntimeHelpers.IsReferenceOrContainsReferences())
{
_buffer[slot] = default!;
}
}
Volatile.Write(ref _idx.Head, head + toRead);
return toRead;
}
}