Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,5 @@ $tf/pendingchanges.tfb
$tf/properties.tf1
project.lock.json
**/.vs/

.claude/
2 changes: 1 addition & 1 deletion Sources/.editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,4 @@ csharp_new_line_before_catch = true
csharp_new_line_before_finally = true
csharp_indent_case_contents = true
csharp_indent_switch_labels = true
csharp_preserve_single_line_statements = false
csharp_preserve_single_line_statements = false
10 changes: 0 additions & 10 deletions Sources/Core/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,5 @@
<Import Project="$([MSBuild]::GetPathOfFileAbove('Directory.Build.props', '$(MSBuildThisFileDirectory)../'))" />

<!-- Code Analysis -->
<ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.0' And '$(Configuration)' == 'Debug'">
<PackageReference Include="StyleCop.Analyzers" Version="1.0.2">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers</IncludeAssets>
</PackageReference>
</ItemGroup>
<PropertyGroup Condition="'$(TargetFramework)' == 'netstandard2.0'">
<!-- Do not set RunCodeAnalysis*, as this is incompatible with netstandard2.0. Analyzers run by default whenever included. -->
<CodeAnalysisRuleSet>$(EnlistmentRoot)\Sources\Microsoft.StreamProcessing.ruleset</CodeAnalysisRuleSet>
</PropertyGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>netstandard2.0</TargetFrameworks>
<TargetFrameworks>net10.0</TargetFrameworks>
<Platforms>x64;AnyCPU</Platforms>
</PropertyGroup>

Expand All @@ -11,19 +11,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.CSharp" Version="4.5.0" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.CodeAnalysis.Scripting" Version="3.1.0" />
<PackageReference Include="System.Diagnostics.Contracts" Version="4.3.0" />
<PackageReference Include="System.Diagnostics.Process" Version="4.3.0" />
<PackageReference Include="System.Runtime.Serialization.Primitives" Version="4.3.0" />
<PackageReference Include="System.Threading.Thread" Version="4.3.0" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)'=='netstandard2.0'">
<PackageReference Include="System.Runtime.Loader" Version="4.3.0" />
<PackageReference Include="Microsoft.CodeAnalysis.Scripting" Version="5.3.0" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ internal sealed class CountAggregate<TInput> : ISummableAggregate<TInput, ulong,
private static readonly Expression<Func<ulong, ulong, ulong>> diff = (leftCount, rightCount) => leftCount - rightCount;
public Expression<Func<ulong, ulong, ulong>> Difference() => diff;

private static readonly Expression<Func<ulong, ulong, ulong>> sum = (leftCount, rightCount) => leftCount - rightCount;
private static readonly Expression<Func<ulong, ulong, ulong>> sum = (leftCount, rightCount) => leftCount + rightCount;
public Expression<Func<ulong, ulong, ulong>> Sum() => sum;

private static readonly Expression<Func<ulong, ulong>> res = count => count;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public sealed class MultiSet<T>
{
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Security", "CA2104:DoNotDeclareReadOnlyMutableReferenceTypes", Justification = "Used to avoid creating redundant readonly property.")]
[DataMember]
private Dictionary<T, long> Elements = new Dictionary<T, long>();
private Dictionary<T, long> Elements = [];
[DataMember]
private long count;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace Microsoft.StreamProcessing
public class PriorityQueue<T> : IEnumerable<T>
{
[DataMember]
private List<T> data = new List<T>();
private List<T> data = [];

private readonly IComparer<T> comp;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,18 @@ public TValue GetOrAdd(CacheKey key, Func<CacheKey, TValue> valueFactory)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public IEnumerator<KeyValuePair<CacheKey, TValue>> GetEnumerator() => this.dictionary.GetEnumerator();

/// <summary>
/// Clears all entries from the dictionary and the per-key lock table.
/// Marked internal (not private) so that test code can clear the codegen cache
/// (e.g. EquiJoinStreamable.cachedPipes) to ensure deterministic test behavior
/// without relying on reflection.
/// </summary>
internal void Clear()
{
this.dictionary.Clear();
this.keyLocks.Clear();
}

IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ private static Func<SortedDictionary<T, long>, IEnumerable<KeyValuePair<T, long>
/// Creates a new instance of a Sorted Multiset.
/// </summary>
[EditorBrowsable(EditorBrowsableState.Never)]
public SortedMultiSet() : this(() => new SortedDictionary<T, long>()) { }
public SortedMultiSet() : this(() => []) { }

/// <summary>
/// Creates a new instance of a Sorted Multiset where the underlying dictionary is generated.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace Microsoft.StreamProcessing
internal sealed class MonotonicEgressPipe<TPayload> : EgressBoundary<Empty, TPayload, TPayload>
{
[DataMember]
private SortedDictionary<long, List<TPayload>> toDelete = new SortedDictionary<long, List<TPayload>>();
private SortedDictionary<long, List<TPayload>> toDelete = [];

[Obsolete("Used only by serialization. Do not call directly.")]
public MonotonicEgressPipe() { }
Expand Down Expand Up @@ -76,7 +76,7 @@ private void EnqueueDelete(long currentTime, TPayload payload)
{
if (!this.toDelete.TryGetValue(currentTime, out List<TPayload> queue))
{
queue = new List<TPayload>();
queue = [];
this.toDelete.Add(currentTime, queue);
}
queue.Add(payload);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace Microsoft.StreamProcessing
internal sealed class MonotonicArrayEgressPipe<TPayload> : EgressBoundary<Empty, TPayload, ArraySegment<TPayload>>
{
[DataMember]
private SortedDictionary<long, List<TPayload>> toDelete = new SortedDictionary<long, List<TPayload>>();
private SortedDictionary<long, List<TPayload>> toDelete = [];

private readonly Func<TPayload[]> generator;
[DataMember]
Expand Down Expand Up @@ -105,7 +105,7 @@ private void EnqueueDelete(long currentTime, TPayload payload)
{
if (!this.toDelete.TryGetValue(currentTime, out List<TPayload> queue))
{
queue = new List<TPayload>();
queue = [];
this.toDelete.Add(currentTime, queue);
}
queue.Add(payload);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ internal sealed class AtemporalEnumerableEgressPipe<TPayload> : EgressBoundary<E
[DataMember]
private long currentTimestamp = long.MinValue;
[DataMember]
private SortedDictionary<long, List<TPayload>> toDelete = new SortedDictionary<long, List<TPayload>>();
private SortedDictionary<long, List<TPayload>> toDelete = [];
[DataMember]
private List<ChangeListEvent<TPayload>> currentVersion = new List<ChangeListEvent<TPayload>>();
private List<ChangeListEvent<TPayload>> currentVersion = [];

[Obsolete("Used only by serialization. Do not call directly.")]
public AtemporalEnumerableEgressPipe() { }
Expand Down Expand Up @@ -86,7 +86,7 @@ private void Process(long timestamp)
if (this.currentVersion.Count > 0)
{
this.observer.OnNext(this.currentVersion);
this.currentVersion = new List<ChangeListEvent<TPayload>>();
this.currentVersion = [];
}
}
}
Expand All @@ -95,7 +95,7 @@ private void EnqueueDelete(long currentTime, TPayload payload)
{
if (!this.toDelete.TryGetValue(currentTime, out List<TPayload> queue))
{
queue = new List<TPayload>();
queue = [];
this.toDelete.Add(currentTime, queue);
}
queue.Add(payload);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ internal static class PlaceholderMethod

internal sealed class FuseModule
{
private readonly List<ExpressionProfile> expressions = new List<ExpressionProfile>();
private readonly List<ExpressionProfile> expressions = [];
private Expression durationAdjustment = null;

public FuseModule() { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ public StreamProperties<TKey, TPayload> ToStreamProperties<TKey, TPayload>()
EqualityComparerExpression<TKey>.Default,
EqualityComparerExpression<TPayload>.Default,
null, null,
new Dictionary<Expression, object>(),
new Dictionary<Expression, Guid?>(),
[],
[],
null);

public bool IsColumnar;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace Microsoft.StreamProcessing
{
internal sealed class DiagnosticObservable<TPayload> : IObservable<OutOfOrderStreamEvent<TPayload>>, IObserver<OutOfOrderStreamEvent<TPayload>>, IDisposable
{
private List<IObserver<OutOfOrderStreamEvent<TPayload>>> observers = new List<IObserver<OutOfOrderStreamEvent<TPayload>>>();
private List<IObserver<OutOfOrderStreamEvent<TPayload>>> observers = [];

public IDisposable Subscribe(IObserver<OutOfOrderStreamEvent<TPayload>> observer)
{
Expand Down Expand Up @@ -83,7 +83,7 @@ public static OutOfOrderStreamEvent<TPayload> Create<TPayload>(StreamEvent<TPayl

internal sealed class PartitionedDiagnosticObservable<TKey, TPayload> : IObservable<OutOfOrderPartitionedStreamEvent<TKey, TPayload>>, IObserver<OutOfOrderPartitionedStreamEvent<TKey, TPayload>>, IDisposable
{
private List<IObserver<OutOfOrderPartitionedStreamEvent<TKey, TPayload>>> observers = new List<IObserver<OutOfOrderPartitionedStreamEvent<TKey, TPayload>>>();
private List<IObserver<OutOfOrderPartitionedStreamEvent<TKey, TPayload>>> observers = [];

public IDisposable Subscribe(IObserver<OutOfOrderPartitionedStreamEvent<TKey, TPayload>> observer)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public sealed class ImpatienceSorter<TPayload> : IDisposable

private DataStructurePool<PooledElasticCircularBuffer<StreamEvent<TPayload>>> ecbPool;
private List<PooledElasticCircularBuffer<StreamEvent<TPayload>>> toReturn =
new List<PooledElasticCircularBuffer<StreamEvent<TPayload>>>();
[];

/// <summary>
/// Currently for internal use only - do not use directly.
Expand All @@ -44,7 +44,7 @@ public sealed class ImpatienceSorter<TPayload> : IDisposable
public ImpatienceSorter()
{
this.Tails = new long[this.MaxFibers];
this.Fibers = new List<PooledElasticCircularBuffer<StreamEvent<TPayload>>>();
this.Fibers = [];
this.MergeSource = new PooledElasticCircularBuffer<StreamEvent<TPayload>>[this.MaxFibers];
this.NextAffectingSyncTime = StreamEvent.InfinitySyncTime;
this.ecbPool = new DataStructurePool<PooledElasticCircularBuffer<StreamEvent<TPayload>>>();
Expand Down Expand Up @@ -442,7 +442,7 @@ private sealed class ImpatienceSorter : IDisposable

private DataStructurePool<PooledElasticCircularBuffer<PartitionedStreamEvent<TKey, TPayload>>> ecbPool;
private List<PooledElasticCircularBuffer<PartitionedStreamEvent<TKey, TPayload>>> toReturn =
new List<PooledElasticCircularBuffer<PartitionedStreamEvent<TKey, TPayload>>>();
[];

/// <summary>
/// Currently for internal use only - do not use directly.
Expand All @@ -451,7 +451,7 @@ private sealed class ImpatienceSorter : IDisposable
public ImpatienceSorter()
{
this.Tails = new long[this.MaxFibers];
this.Fibers = new List<PooledElasticCircularBuffer<PartitionedStreamEvent<TKey, TPayload>>>();
this.Fibers = [];
this.MergeSource = new PooledElasticCircularBuffer<PartitionedStreamEvent<TKey, TPayload>>[this.MaxFibers];
this.NextAffectingSyncTime = StreamEvent.InfinitySyncTime;
this.ecbPool = new DataStructurePool<PooledElasticCircularBuffer<PartitionedStreamEvent<TKey, TPayload>>>();
Expand Down
24 changes: 10 additions & 14 deletions Sources/Core/Microsoft.StreamProcessing/Ingress/SubscriptionBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1130,8 +1130,6 @@ protected DisorderedPartitionedObserverSubscriptionBase(
/// <typeparam name="TPayload"></typeparam>
/// <typeparam name="TResult"></typeparam>
[EditorBrowsable(EditorBrowsableState.Never)]
[System.Diagnostics.CodeAnalysis.SuppressMessage("StyleCop.CSharp.SpacingRules", "SA1008:OpeningParenthesisMustBeSpacedCorrectly", Justification = "ValueTuples")]
[System.Diagnostics.CodeAnalysis.SuppressMessage("StyleCop.CSharp.SpacingRules", "SA1009:ClosingParenthesisMustBeSpacedCorrectly", Justification = "ValueTuples")]
public abstract class DisorderedPartitionedSubscriptionBase<TKey, TIngressStructure, TPayload, TResult> : Pipe<PartitionKey<TKey>, TResult>, IIngressStreamObserver
{
private readonly string errorMessages;
Expand Down Expand Up @@ -1212,7 +1210,7 @@ public abstract class DisorderedPartitionedSubscriptionBase<TKey, TIngressStruct
/// </summary>
[EditorBrowsable(EditorBrowsableState.Never)]
[DataMember]
protected Dictionary<TKey, long> currentTime = new Dictionary<TKey, long>();
protected Dictionary<TKey, long> currentTime = [];

#if DEBUG
/// <summary>
Expand All @@ -1221,7 +1219,7 @@ public abstract class DisorderedPartitionedSubscriptionBase<TKey, TIngressStruct
/// </summary>
[EditorBrowsable(EditorBrowsableState.Never)]
[DataMember]
protected Dictionary<TKey, long> lastEventTime = new Dictionary<TKey, long>();
protected Dictionary<TKey, long> lastEventTime = [];
#endif

/// <summary>
Expand All @@ -1235,7 +1233,7 @@ public abstract class DisorderedPartitionedSubscriptionBase<TKey, TIngressStruct
/// Currently for internal use only - do not use directly.
/// </summary>
[DataMember]
protected Dictionary<TKey, (long lastPunctuation, long lastPunctuationQuantized)> lastPunctuationTime = new Dictionary<TKey, (long, long)>();
protected Dictionary<TKey, (long lastPunctuation, long lastPunctuationQuantized)> lastPunctuationTime = [];

/// <summary>
/// Currently for internal use only - do not use directly.
Expand All @@ -1251,7 +1249,7 @@ public abstract class DisorderedPartitionedSubscriptionBase<TKey, TIngressStruct
/// </summary>
[EditorBrowsable(EditorBrowsableState.Never)]
[DataMember]
protected Dictionary<TKey, long> partitionHighWatermarks = new Dictionary<TKey, long>();
protected Dictionary<TKey, long> partitionHighWatermarks = [];

/// <summary>
/// Currently for internal use only - do not use directly.
Expand All @@ -1260,7 +1258,7 @@ public abstract class DisorderedPartitionedSubscriptionBase<TKey, TIngressStruct
/// NB: Do not mark as DataMember or as state managed: this is an inversion of existing data in field partitionHighWatermarks.
/// </summary>
[EditorBrowsable(EditorBrowsableState.Never)]
protected SortedDictionary<long, HashSet<TKey>> highWatermarkToPartitionsMap = new SortedDictionary<long, HashSet<TKey>>();
protected SortedDictionary<long, HashSet<TKey>> highWatermarkToPartitionsMap = [];

/// <summary>
/// Currently for internal use only - do not use directly.
Expand Down Expand Up @@ -1751,8 +1749,6 @@ protected PartitionedObserverSubscriptionBase(
/// <typeparam name="TPayload"></typeparam>
/// <typeparam name="TResult"></typeparam>
[EditorBrowsable(EditorBrowsableState.Never)]
[System.Diagnostics.CodeAnalysis.SuppressMessage("StyleCop.CSharp.SpacingRules", "SA1008:OpeningParenthesisMustBeSpacedCorrectly", Justification = "ValueTuples")]
[System.Diagnostics.CodeAnalysis.SuppressMessage("StyleCop.CSharp.SpacingRules", "SA1009:ClosingParenthesisMustBeSpacedCorrectly", Justification = "ValueTuples")]
public abstract class PartitionedSubscriptionBase<TKey, TIngressStructure, TPayload, TResult> : Pipe<PartitionKey<TKey>, TResult>, IIngressStreamObserver
{
private readonly string errorMessages;
Expand Down Expand Up @@ -1833,7 +1829,7 @@ public abstract class PartitionedSubscriptionBase<TKey, TIngressStructure, TPayl
/// </summary>
[EditorBrowsable(EditorBrowsableState.Never)]
[DataMember]
protected Dictionary<TKey, long> currentTime = new Dictionary<TKey, long>();
protected Dictionary<TKey, long> currentTime = [];

#if DEBUG
/// <summary>
Expand All @@ -1842,7 +1838,7 @@ public abstract class PartitionedSubscriptionBase<TKey, TIngressStructure, TPayl
/// </summary>
[EditorBrowsable(EditorBrowsableState.Never)]
[DataMember]
protected Dictionary<TKey, long> lastEventTime = new Dictionary<TKey, long>();
protected Dictionary<TKey, long> lastEventTime = [];
#endif

/// <summary>
Expand All @@ -1856,7 +1852,7 @@ public abstract class PartitionedSubscriptionBase<TKey, TIngressStructure, TPayl
/// Currently for internal use only - do not use directly.
/// </summary>
[DataMember]
protected Dictionary<TKey, (long lastPunctuation, long lastPunctuationQuantized)> lastPunctuationTime = new Dictionary<TKey, (long, long)>();
protected Dictionary<TKey, (long lastPunctuation, long lastPunctuationQuantized)> lastPunctuationTime = [];

/// <summary>
/// Currently for internal use only - do not use directly.
Expand All @@ -1872,7 +1868,7 @@ public abstract class PartitionedSubscriptionBase<TKey, TIngressStructure, TPayl
/// </summary>
[EditorBrowsable(EditorBrowsableState.Never)]
[DataMember]
protected Dictionary<TKey, long> partitionHighWatermarks = new Dictionary<TKey, long>();
protected Dictionary<TKey, long> partitionHighWatermarks = [];

/// <summary>
/// Currently for internal use only - do not use directly.
Expand All @@ -1881,7 +1877,7 @@ public abstract class PartitionedSubscriptionBase<TKey, TIngressStructure, TPayl
/// NB: Do not mark as DataMember or as state managed: this is an inversion of existing data in field partitionHighWatermarks.
/// </summary>
[EditorBrowsable(EditorBrowsableState.Never)]
protected SortedDictionary<long, HashSet<TKey>> highWatermarkToPartitionsMap = new SortedDictionary<long, HashSet<TKey>>();
protected SortedDictionary<long, HashSet<TKey>> highWatermarkToPartitionsMap = [];

/// <summary>
/// Currently for internal use only - do not use directly.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,6 @@ foreach (bool disordered in new[] { true, false })
/// <typeparam name="TPayload"></typeparam>
/// <typeparam name="TResult"></typeparam>
[EditorBrowsable(EditorBrowsableState.Never)]
<# if (partitioned) { #>
[System.Diagnostics.CodeAnalysis.SuppressMessage("StyleCop.CSharp.SpacingRules", "SA1008:OpeningParenthesisMustBeSpacedCorrectly", Justification = "ValueTuples")]
[System.Diagnostics.CodeAnalysis.SuppressMessage("StyleCop.CSharp.SpacingRules", "SA1009:ClosingParenthesisMustBeSpacedCorrectly", Justification = "ValueTuples")]
<# } #>
public abstract class <#= disordered ? "Disordered" : string.Empty #><#= partitionString #>SubscriptionBase<<#= genericArgument #>TIngressStructure, TPayload, TResult> : Pipe<<#= baseType #>, TResult>, IIngressStreamObserver
{
private readonly string errorMessages;
Expand Down
Loading