diff --git a/src/Mocha.Core/Buffer/Memory/MemoryBuferQueue.cs b/src/Mocha.Core/Buffer/Memory/MemoryBufferQueue.cs similarity index 100% rename from src/Mocha.Core/Buffer/Memory/MemoryBuferQueue.cs rename to src/Mocha.Core/Buffer/Memory/MemoryBufferQueue.cs diff --git a/src/Mocha.Core/Buffer/Memory/MemoryBufferSegment.cs b/src/Mocha.Core/Buffer/Memory/MemoryBufferSegment.cs index d424f92..95af756 100644 --- a/src/Mocha.Core/Buffer/Memory/MemoryBufferSegment.cs +++ b/src/Mocha.Core/Buffer/Memory/MemoryBufferSegment.cs @@ -13,14 +13,16 @@ internal sealed class MemoryBufferSegment private readonly MemoryBufferPartitionOffset _startOffset; private readonly MemoryBufferPartitionOffset _endOffset; private readonly T[] _slots; - private volatile int _writePosition; + private volatile int _reservedWritePosition; + private volatile int _publishedWritePosition; public MemoryBufferSegment(int length, MemoryBufferPartitionOffset startOffset) { _startOffset = startOffset; _endOffset = startOffset + (ulong)(length - 1); _slots = new T[length]; - _writePosition = -1; + _reservedWritePosition = -1; + _publishedWritePosition = -1; } private MemoryBufferSegment(T[] slots, MemoryBufferPartitionOffset startOffset) @@ -28,7 +30,8 @@ private MemoryBufferSegment(T[] slots, MemoryBufferPartitionOffset startOffset) _startOffset = startOffset; _endOffset = startOffset + (ulong)(slots.Length - 1); _slots = slots; - _writePosition = -1; + _reservedWritePosition = -1; + _publishedWritePosition = -1; } public MemoryBufferSegment? NextSegment { get; set; } @@ -39,19 +42,58 @@ private MemoryBufferSegment(T[] slots, MemoryBufferPartitionOffset startOffset) public int Capacity => _slots.Length; - public int Count => Math.Min(Capacity, _writePosition + 1); + public int Count => Math.Min(Capacity, _publishedWritePosition + 1); public bool TryEnqueue(T item) { - var writePosition = Interlocked.Increment(ref _writePosition); - if (writePosition >= _slots.Length) + while (true) { - _writePosition = _slots.Length - 1; - return false; - } + var currentReserved = _reservedWritePosition; + var nextPosition = currentReserved + 1; + if (nextPosition >= _slots.Length) + { + // No more space to write in this segment. + return false; + } + + if (Interlocked.CompareExchange( + ref _reservedWritePosition, + nextPosition, + currentReserved) + != currentReserved) + { + // Another thread has already written to the next position, retry. + continue; + } + + // Write the item to the slot. + // It's safe to write directly without locks because each position is written by at most one thread. + _slots[nextPosition] = item; + + // Now we need to publish the new write position so that readers can see the new item. + while (true) + { + var currentPublished = _publishedWritePosition; + if (currentPublished >= nextPosition) + { + // Another thread has already published a position that is greater than our next position, + // which means our item is already visible to readers, no need to publish again. + break; + } + + if (Interlocked.CompareExchange( + ref _publishedWritePosition, + nextPosition, + currentPublished) + == currentPublished) + { + // Successfully published the new write position, now readers can see the new item. + break; + } + } - _slots[writePosition] = item; - return true; + return true; + } } public bool TryGet(MemoryBufferPartitionOffset offset, int count, [NotNullWhen(true)] out T[]? items) @@ -64,22 +106,23 @@ public bool TryGet(MemoryBufferPartitionOffset offset, int count, [NotNullWhen(t var readPosition = (offset - _startOffset).ToInt32(); - if (_writePosition < 0 || readPosition > _writePosition) + if (_publishedWritePosition < 0 || readPosition > _publishedWritePosition) { items = null; return false; } - var writePosition = Math.Min(_writePosition, _slots.Length - 1); - var actualCount = Math.Min(count, writePosition - readPosition + 1); - var wholeSegment = readPosition == 0 && actualCount == _slots.Length; + var writePosition = Math.Min(_publishedWritePosition, _slots.Length - 1); + // Number of items actually available to return (bounded by requested count and written items). + var availableCount = Math.Min(count, writePosition - readPosition + 1); + var wholeSegment = readPosition == 0 && availableCount == _slots.Length; if (wholeSegment) { items = _slots; return true; } - items = _slots[readPosition..(readPosition + actualCount)]; + items = _slots[readPosition..(readPosition + availableCount)]; return true; } @@ -106,6 +149,6 @@ public DebugView(MemoryBufferSegment segment) public int Count => _segment.Count; - public T[] Items => _segment._slots.Take(_segment._writePosition + 1).ToArray(); + public T[] Items => _segment._slots.Take(_segment._publishedWritePosition + 1).ToArray(); } }