Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 60 additions & 17 deletions src/Mocha.Core/Buffer/Memory/MemoryBufferSegment.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,25 @@ internal sealed class MemoryBufferSegment<T>
private readonly MemoryBufferPartitionOffset _startOffset;
private readonly MemoryBufferPartitionOffset _endOffset;
private readonly T[] _slots;
private volatile int _writePosition;
private volatile int _reservedWritePosition;
private volatile int _publishedWritePosition;

public MemoryBufferSegment(int length, MemoryBufferPartitionOffset startOffset)
{
_startOffset = startOffset;
_endOffset = startOffset + (ulong)(length - 1);
_slots = new T[length];
_writePosition = -1;
_reservedWritePosition = -1;
_publishedWritePosition = -1;
}

private MemoryBufferSegment(T[] slots, MemoryBufferPartitionOffset startOffset)
{
_startOffset = startOffset;
_endOffset = startOffset + (ulong)(slots.Length - 1);
_slots = slots;
_writePosition = -1;
_reservedWritePosition = -1;
_publishedWritePosition = -1;
}

public MemoryBufferSegment<T>? NextSegment { get; set; }
Expand All @@ -39,19 +42,58 @@ private MemoryBufferSegment(T[] slots, MemoryBufferPartitionOffset startOffset)

public int Capacity => _slots.Length;

public int Count => Math.Min(Capacity, _writePosition + 1);
public int Count => Math.Min(Capacity, _publishedWritePosition + 1);

public bool TryEnqueue(T item)
{
var writePosition = Interlocked.Increment(ref _writePosition);
if (writePosition >= _slots.Length)
while (true)
{
_writePosition = _slots.Length - 1;
return false;
}
var currentReserved = _reservedWritePosition;
var nextPosition = currentReserved + 1;
if (nextPosition >= _slots.Length)
{
// No more space to write in this segment.
return false;
}

if (Interlocked.CompareExchange(
ref _reservedWritePosition,
nextPosition,
currentReserved)
!= currentReserved)
{
// Another thread has already written to the next position, retry.
continue;
}

// Write the item to the slot.
// It's safe to write directly without locks because each position is written by at most one thread.
_slots[nextPosition] = item;

// Now we need to publish the new write position so that readers can see the new item.
while (true)
{
var currentPublished = _publishedWritePosition;
if (currentPublished >= nextPosition)
{
// Another thread has already published a position that is greater than our next position,
// which means our item is already visible to readers, no need to publish again.
break;
}

if (Interlocked.CompareExchange(
ref _publishedWritePosition,
nextPosition,
currentPublished)
== currentPublished)
{
// Successfully published the new write position, now readers can see the new item.
break;
}
}

_slots[writePosition] = item;
return true;
return true;
}
}

public bool TryGet(MemoryBufferPartitionOffset offset, int count, [NotNullWhen(true)] out T[]? items)
Expand All @@ -64,22 +106,23 @@ public bool TryGet(MemoryBufferPartitionOffset offset, int count, [NotNullWhen(t

var readPosition = (offset - _startOffset).ToInt32();

if (_writePosition < 0 || readPosition > _writePosition)
if (_publishedWritePosition < 0 || readPosition > _publishedWritePosition)
{
items = null;
return false;
}

var writePosition = Math.Min(_writePosition, _slots.Length - 1);
var actualCount = Math.Min(count, writePosition - readPosition + 1);
var wholeSegment = readPosition == 0 && actualCount == _slots.Length;
var writePosition = Math.Min(_publishedWritePosition, _slots.Length - 1);
// Number of items actually available to return (bounded by requested count and written items).
var availableCount = Math.Min(count, writePosition - readPosition + 1);
var wholeSegment = readPosition == 0 && availableCount == _slots.Length;
if (wholeSegment)
{
items = _slots;
return true;
}

items = _slots[readPosition..(readPosition + actualCount)];
items = _slots[readPosition..(readPosition + availableCount)];
return true;
}

Expand All @@ -106,6 +149,6 @@ public DebugView(MemoryBufferSegment<T> segment)

public int Count => _segment.Count;

public T[] Items => _segment._slots.Take(_segment._writePosition + 1).ToArray();
public T[] Items => _segment._slots.Take(_segment._publishedWritePosition + 1).ToArray();
}
}
Loading