diff --git a/.gitignore b/.gitignore
index 809f06a76..ed8c08844 100644
--- a/.gitignore
+++ b/.gitignore
@@ -58,3 +58,5 @@ $tf/pendingchanges.tfb
$tf/properties.tf1
project.lock.json
**/.vs/
+
+.claude/
\ No newline at end of file
diff --git a/Sources/.editorconfig b/Sources/.editorconfig
index 1bae2e959..d1110ea7e 100644
--- a/Sources/.editorconfig
+++ b/Sources/.editorconfig
@@ -50,4 +50,4 @@ csharp_new_line_before_catch = true
csharp_new_line_before_finally = true
csharp_indent_case_contents = true
csharp_indent_switch_labels = true
-csharp_preserve_single_line_statements = false
\ No newline at end of file
+csharp_preserve_single_line_statements = false
diff --git a/Sources/Core/Directory.Build.props b/Sources/Core/Directory.Build.props
index 15f391280..fa8e33c0e 100644
--- a/Sources/Core/Directory.Build.props
+++ b/Sources/Core/Directory.Build.props
@@ -3,15 +3,5 @@
-
-
- all
- runtime; build; native; contentfiles; analyzers
-
-
-
-
- $(EnlistmentRoot)\Sources\Microsoft.StreamProcessing.ruleset
-
diff --git a/Sources/Core/Microsoft.StreamProcessing.Provider/Microsoft.StreamProcessing.Provider.csproj b/Sources/Core/Microsoft.StreamProcessing.Provider/Microsoft.StreamProcessing.Provider.csproj
index 1d9de4e69..00948d43b 100644
--- a/Sources/Core/Microsoft.StreamProcessing.Provider/Microsoft.StreamProcessing.Provider.csproj
+++ b/Sources/Core/Microsoft.StreamProcessing.Provider/Microsoft.StreamProcessing.Provider.csproj
@@ -1,7 +1,7 @@
- netstandard2.0
+ net10.0
x64;AnyCPU
@@ -11,19 +11,7 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
+
diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/CountAggregate.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/CountAggregate.cs
index db3e0ab86..d4ab16d42 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Aggregates/CountAggregate.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Aggregates/CountAggregate.cs
@@ -21,7 +21,7 @@ internal sealed class CountAggregate : ISummableAggregate> diff = (leftCount, rightCount) => leftCount - rightCount;
public Expression> Difference() => diff;
- private static readonly Expression> sum = (leftCount, rightCount) => leftCount - rightCount;
+ private static readonly Expression> sum = (leftCount, rightCount) => leftCount + rightCount;
public Expression> Sum() => sum;
private static readonly Expression> res = count => count;
diff --git a/Sources/Core/Microsoft.StreamProcessing/Collections/MultiSet.cs b/Sources/Core/Microsoft.StreamProcessing/Collections/MultiSet.cs
index 150de6cdb..78bcd584f 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Collections/MultiSet.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Collections/MultiSet.cs
@@ -20,7 +20,7 @@ public sealed class MultiSet
{
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Security", "CA2104:DoNotDeclareReadOnlyMutableReferenceTypes", Justification = "Used to avoid creating redundant readonly property.")]
[DataMember]
- private Dictionary Elements = new Dictionary();
+ private Dictionary Elements = [];
[DataMember]
private long count;
diff --git a/Sources/Core/Microsoft.StreamProcessing/Collections/PriorityQueue.cs b/Sources/Core/Microsoft.StreamProcessing/Collections/PriorityQueue.cs
index e1473d70c..26249d19d 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Collections/PriorityQueue.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Collections/PriorityQueue.cs
@@ -18,7 +18,7 @@ namespace Microsoft.StreamProcessing
public class PriorityQueue : IEnumerable
{
[DataMember]
- private List data = new List();
+ private List data = [];
private readonly IComparer comp;
diff --git a/Sources/Core/Microsoft.StreamProcessing/Collections/SafeConcurrentDictionary.cs b/Sources/Core/Microsoft.StreamProcessing/Collections/SafeConcurrentDictionary.cs
index 9b04f7206..153890de3 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Collections/SafeConcurrentDictionary.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Collections/SafeConcurrentDictionary.cs
@@ -44,6 +44,18 @@ public TValue GetOrAdd(CacheKey key, Func valueFactory)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public IEnumerator> GetEnumerator() => this.dictionary.GetEnumerator();
+ ///
+ /// Clears all entries from the dictionary and the per-key lock table.
+ /// Marked internal (not private) so that test code can clear the codegen cache
+ /// (e.g. EquiJoinStreamable.cachedPipes) to ensure deterministic test behavior
+ /// without relying on reflection.
+ ///
+ internal void Clear()
+ {
+ this.dictionary.Clear();
+ this.keyLocks.Clear();
+ }
+
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
///
diff --git a/Sources/Core/Microsoft.StreamProcessing/Collections/SortedMultiSet.cs b/Sources/Core/Microsoft.StreamProcessing/Collections/SortedMultiSet.cs
index 84ff1aeaf..f187d7fd2 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Collections/SortedMultiSet.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Collections/SortedMultiSet.cs
@@ -47,7 +47,7 @@ private static Func, IEnumerable
/// Creates a new instance of a Sorted Multiset.
///
[EditorBrowsable(EditorBrowsableState.Never)]
- public SortedMultiSet() : this(() => new SortedDictionary()) { }
+ public SortedMultiSet() : this(() => []) { }
///
/// Creates a new instance of a Sorted Multiset where the underlying dictionary is generated.
diff --git a/Sources/Core/Microsoft.StreamProcessing/Egress/Atemporal/AtemporalEgressPipe.cs b/Sources/Core/Microsoft.StreamProcessing/Egress/Atemporal/AtemporalEgressPipe.cs
index 4540cae25..1f1254e9e 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Egress/Atemporal/AtemporalEgressPipe.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Egress/Atemporal/AtemporalEgressPipe.cs
@@ -14,7 +14,7 @@ namespace Microsoft.StreamProcessing
internal sealed class MonotonicEgressPipe : EgressBoundary
{
[DataMember]
- private SortedDictionary> toDelete = new SortedDictionary>();
+ private SortedDictionary> toDelete = [];
[Obsolete("Used only by serialization. Do not call directly.")]
public MonotonicEgressPipe() { }
@@ -76,7 +76,7 @@ private void EnqueueDelete(long currentTime, TPayload payload)
{
if (!this.toDelete.TryGetValue(currentTime, out List queue))
{
- queue = new List();
+ queue = [];
this.toDelete.Add(currentTime, queue);
}
queue.Add(payload);
diff --git a/Sources/Core/Microsoft.StreamProcessing/Egress/AtemporalArray/AtemporalArrayEgressPipe.cs b/Sources/Core/Microsoft.StreamProcessing/Egress/AtemporalArray/AtemporalArrayEgressPipe.cs
index 76c7c3a96..540881456 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Egress/AtemporalArray/AtemporalArrayEgressPipe.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Egress/AtemporalArray/AtemporalArrayEgressPipe.cs
@@ -14,7 +14,7 @@ namespace Microsoft.StreamProcessing
internal sealed class MonotonicArrayEgressPipe : EgressBoundary>
{
[DataMember]
- private SortedDictionary> toDelete = new SortedDictionary>();
+ private SortedDictionary> toDelete = [];
private readonly Func generator;
[DataMember]
@@ -105,7 +105,7 @@ private void EnqueueDelete(long currentTime, TPayload payload)
{
if (!this.toDelete.TryGetValue(currentTime, out List queue))
{
- queue = new List();
+ queue = [];
this.toDelete.Add(currentTime, queue);
}
queue.Add(payload);
diff --git a/Sources/Core/Microsoft.StreamProcessing/Egress/AtemporalEnumerable/AtemporalEnumerableEgressPipe.cs b/Sources/Core/Microsoft.StreamProcessing/Egress/AtemporalEnumerable/AtemporalEnumerableEgressPipe.cs
index be9cb4878..0c6aa082f 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Egress/AtemporalEnumerable/AtemporalEnumerableEgressPipe.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Egress/AtemporalEnumerable/AtemporalEnumerableEgressPipe.cs
@@ -16,9 +16,9 @@ internal sealed class AtemporalEnumerableEgressPipe : EgressBoundary> toDelete = new SortedDictionary>();
+ private SortedDictionary> toDelete = [];
[DataMember]
- private List> currentVersion = new List>();
+ private List> currentVersion = [];
[Obsolete("Used only by serialization. Do not call directly.")]
public AtemporalEnumerableEgressPipe() { }
@@ -86,7 +86,7 @@ private void Process(long timestamp)
if (this.currentVersion.Count > 0)
{
this.observer.OnNext(this.currentVersion);
- this.currentVersion = new List>();
+ this.currentVersion = [];
}
}
}
@@ -95,7 +95,7 @@ private void EnqueueDelete(long currentTime, TPayload payload)
{
if (!this.toDelete.TryGetValue(currentTime, out List queue))
{
- queue = new List();
+ queue = [];
this.toDelete.Add(currentTime, queue);
}
queue.Add(payload);
diff --git a/Sources/Core/Microsoft.StreamProcessing/Fusible/FuseModule.cs b/Sources/Core/Microsoft.StreamProcessing/Fusible/FuseModule.cs
index 34b45a8a4..f17785374 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Fusible/FuseModule.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Fusible/FuseModule.cs
@@ -19,7 +19,7 @@ internal static class PlaceholderMethod
internal sealed class FuseModule
{
- private readonly List expressions = new List();
+ private readonly List expressions = [];
private Expression durationAdjustment = null;
public FuseModule() { }
diff --git a/Sources/Core/Microsoft.StreamProcessing/Ingress/Binary/StreamableSerializer.cs b/Sources/Core/Microsoft.StreamProcessing/Ingress/Binary/StreamableSerializer.cs
index 1a272a0a6..cf0e22a3f 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Ingress/Binary/StreamableSerializer.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Ingress/Binary/StreamableSerializer.cs
@@ -32,8 +32,8 @@ public StreamProperties ToStreamProperties()
EqualityComparerExpression.Default,
EqualityComparerExpression.Default,
null, null,
- new Dictionary(),
- new Dictionary(),
+ [],
+ [],
null);
public bool IsColumnar;
diff --git a/Sources/Core/Microsoft.StreamProcessing/Ingress/DiagnosticObservable.cs b/Sources/Core/Microsoft.StreamProcessing/Ingress/DiagnosticObservable.cs
index c5fb8f4c3..8a76755e1 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Ingress/DiagnosticObservable.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Ingress/DiagnosticObservable.cs
@@ -10,7 +10,7 @@ namespace Microsoft.StreamProcessing
{
internal sealed class DiagnosticObservable : IObservable>, IObserver>, IDisposable
{
- private List>> observers = new List>>();
+ private List>> observers = [];
public IDisposable Subscribe(IObserver> observer)
{
@@ -83,7 +83,7 @@ public static OutOfOrderStreamEvent Create(StreamEvent : IObservable>, IObserver>, IDisposable
{
- private List>> observers = new List>>();
+ private List>> observers = [];
public IDisposable Subscribe(IObserver> observer)
{
diff --git a/Sources/Core/Microsoft.StreamProcessing/Ingress/ImpatienceSorter.cs b/Sources/Core/Microsoft.StreamProcessing/Ingress/ImpatienceSorter.cs
index d260f3fe8..df7fab8d9 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Ingress/ImpatienceSorter.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Ingress/ImpatienceSorter.cs
@@ -35,7 +35,7 @@ public sealed class ImpatienceSorter : IDisposable
private DataStructurePool>> ecbPool;
private List>> toReturn =
- new List>>();
+ [];
///
/// Currently for internal use only - do not use directly.
@@ -44,7 +44,7 @@ public sealed class ImpatienceSorter : IDisposable
public ImpatienceSorter()
{
this.Tails = new long[this.MaxFibers];
- this.Fibers = new List>>();
+ this.Fibers = [];
this.MergeSource = new PooledElasticCircularBuffer>[this.MaxFibers];
this.NextAffectingSyncTime = StreamEvent.InfinitySyncTime;
this.ecbPool = new DataStructurePool>>();
@@ -442,7 +442,7 @@ private sealed class ImpatienceSorter : IDisposable
private DataStructurePool>> ecbPool;
private List>> toReturn =
- new List>>();
+ [];
///
/// Currently for internal use only - do not use directly.
@@ -451,7 +451,7 @@ private sealed class ImpatienceSorter : IDisposable
public ImpatienceSorter()
{
this.Tails = new long[this.MaxFibers];
- this.Fibers = new List>>();
+ this.Fibers = [];
this.MergeSource = new PooledElasticCircularBuffer>[this.MaxFibers];
this.NextAffectingSyncTime = StreamEvent.InfinitySyncTime;
this.ecbPool = new DataStructurePool>>();
diff --git a/Sources/Core/Microsoft.StreamProcessing/Ingress/SubscriptionBase.cs b/Sources/Core/Microsoft.StreamProcessing/Ingress/SubscriptionBase.cs
index bb48d50f5..946aa8b66 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Ingress/SubscriptionBase.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Ingress/SubscriptionBase.cs
@@ -1130,8 +1130,6 @@ protected DisorderedPartitionedObserverSubscriptionBase(
///
///
[EditorBrowsable(EditorBrowsableState.Never)]
- [System.Diagnostics.CodeAnalysis.SuppressMessage("StyleCop.CSharp.SpacingRules", "SA1008:OpeningParenthesisMustBeSpacedCorrectly", Justification = "ValueTuples")]
- [System.Diagnostics.CodeAnalysis.SuppressMessage("StyleCop.CSharp.SpacingRules", "SA1009:ClosingParenthesisMustBeSpacedCorrectly", Justification = "ValueTuples")]
public abstract class DisorderedPartitionedSubscriptionBase : Pipe, TResult>, IIngressStreamObserver
{
private readonly string errorMessages;
@@ -1212,7 +1210,7 @@ public abstract class DisorderedPartitionedSubscriptionBase
[EditorBrowsable(EditorBrowsableState.Never)]
[DataMember]
- protected Dictionary currentTime = new Dictionary();
+ protected Dictionary currentTime = [];
#if DEBUG
///
@@ -1221,7 +1219,7 @@ public abstract class DisorderedPartitionedSubscriptionBase
[EditorBrowsable(EditorBrowsableState.Never)]
[DataMember]
- protected Dictionary lastEventTime = new Dictionary();
+ protected Dictionary lastEventTime = [];
#endif
///
@@ -1235,7 +1233,7 @@ public abstract class DisorderedPartitionedSubscriptionBase
[DataMember]
- protected Dictionary lastPunctuationTime = new Dictionary();
+ protected Dictionary lastPunctuationTime = [];
///
/// Currently for internal use only - do not use directly.
@@ -1251,7 +1249,7 @@ public abstract class DisorderedPartitionedSubscriptionBase
[EditorBrowsable(EditorBrowsableState.Never)]
[DataMember]
- protected Dictionary partitionHighWatermarks = new Dictionary();
+ protected Dictionary partitionHighWatermarks = [];
///
/// Currently for internal use only - do not use directly.
@@ -1260,7 +1258,7 @@ public abstract class DisorderedPartitionedSubscriptionBase
[EditorBrowsable(EditorBrowsableState.Never)]
- protected SortedDictionary> highWatermarkToPartitionsMap = new SortedDictionary>();
+ protected SortedDictionary> highWatermarkToPartitionsMap = [];
///
/// Currently for internal use only - do not use directly.
@@ -1751,8 +1749,6 @@ protected PartitionedObserverSubscriptionBase(
///
///
[EditorBrowsable(EditorBrowsableState.Never)]
- [System.Diagnostics.CodeAnalysis.SuppressMessage("StyleCop.CSharp.SpacingRules", "SA1008:OpeningParenthesisMustBeSpacedCorrectly", Justification = "ValueTuples")]
- [System.Diagnostics.CodeAnalysis.SuppressMessage("StyleCop.CSharp.SpacingRules", "SA1009:ClosingParenthesisMustBeSpacedCorrectly", Justification = "ValueTuples")]
public abstract class PartitionedSubscriptionBase : Pipe, TResult>, IIngressStreamObserver
{
private readonly string errorMessages;
@@ -1833,7 +1829,7 @@ public abstract class PartitionedSubscriptionBase
[EditorBrowsable(EditorBrowsableState.Never)]
[DataMember]
- protected Dictionary currentTime = new Dictionary();
+ protected Dictionary currentTime = [];
#if DEBUG
///
@@ -1842,7 +1838,7 @@ public abstract class PartitionedSubscriptionBase
[EditorBrowsable(EditorBrowsableState.Never)]
[DataMember]
- protected Dictionary lastEventTime = new Dictionary();
+ protected Dictionary lastEventTime = [];
#endif
///
@@ -1856,7 +1852,7 @@ public abstract class PartitionedSubscriptionBase
[DataMember]
- protected Dictionary lastPunctuationTime = new Dictionary();
+ protected Dictionary lastPunctuationTime = [];
///
/// Currently for internal use only - do not use directly.
@@ -1872,7 +1868,7 @@ public abstract class PartitionedSubscriptionBase
[EditorBrowsable(EditorBrowsableState.Never)]
[DataMember]
- protected Dictionary partitionHighWatermarks = new Dictionary();
+ protected Dictionary partitionHighWatermarks = [];
///
/// Currently for internal use only - do not use directly.
@@ -1881,7 +1877,7 @@ public abstract class PartitionedSubscriptionBase
[EditorBrowsable(EditorBrowsableState.Never)]
- protected SortedDictionary> highWatermarkToPartitionsMap = new SortedDictionary>();
+ protected SortedDictionary> highWatermarkToPartitionsMap = [];
///
/// Currently for internal use only - do not use directly.
diff --git a/Sources/Core/Microsoft.StreamProcessing/Ingress/SubscriptionBase.tt b/Sources/Core/Microsoft.StreamProcessing/Ingress/SubscriptionBase.tt
index 24a48ff78..9093db423 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Ingress/SubscriptionBase.tt
+++ b/Sources/Core/Microsoft.StreamProcessing/Ingress/SubscriptionBase.tt
@@ -135,10 +135,6 @@ foreach (bool disordered in new[] { true, false })
///
///
[EditorBrowsable(EditorBrowsableState.Never)]
-<# if (partitioned) { #>
- [System.Diagnostics.CodeAnalysis.SuppressMessage("StyleCop.CSharp.SpacingRules", "SA1008:OpeningParenthesisMustBeSpacedCorrectly", Justification = "ValueTuples")]
- [System.Diagnostics.CodeAnalysis.SuppressMessage("StyleCop.CSharp.SpacingRules", "SA1009:ClosingParenthesisMustBeSpacedCorrectly", Justification = "ValueTuples")]
-<# } #>
public abstract class <#= disordered ? "Disordered" : string.Empty #><#= partitionString #>SubscriptionBase<<#= genericArgument #>TIngressStructure, TPayload, TResult> : Pipe<<#= baseType #>, TResult>, IIngressStreamObserver
{
private readonly string errorMessages;
diff --git a/Sources/Core/Microsoft.StreamProcessing/Ingress/Temporal/TemporalIngressSubscription.cs b/Sources/Core/Microsoft.StreamProcessing/Ingress/Temporal/TemporalIngressSubscription.cs
index 3673529e5..a36e05838 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Ingress/Temporal/TemporalIngressSubscription.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Ingress/Temporal/TemporalIngressSubscription.cs
@@ -380,7 +380,7 @@ private void Process(ref StreamEvent value)
key = Tuple.Create(value.SyncTime, value.Payload);
if (!this.startEventInformation.TryGetValue(key, out q))
{
- q = new ElasticCircularBuffer();
+ q = [];
this.startEventInformation.Add(key, q);
var x = new AdjustInfo(current);
q.Enqueue(ref x);
@@ -763,7 +763,7 @@ private void Process(ref StreamEvent value)
key = Tuple.Create(value.SyncTime, value.Payload);
if (!this.startEventInformation.TryGetValue(key, out q))
{
- q = new ElasticCircularBuffer();
+ q = [];
this.startEventInformation.Add(key, q);
var x = new AdjustInfo(current);
q.Enqueue(ref x);
@@ -1144,7 +1144,7 @@ private void Process(ref StreamEvent value)
key = Tuple.Create(value.SyncTime, value.Payload);
if (!this.startEventInformation.TryGetValue(key, out q))
{
- q = new ElasticCircularBuffer();
+ q = [];
this.startEventInformation.Add(key, q);
var x = new AdjustInfo(current);
q.Enqueue(ref x);
@@ -1435,7 +1435,7 @@ public override void OnNext(StreamEvent value)
key = Tuple.Create(value.SyncTime, value.Payload);
if (!this.startEventInformation.TryGetValue(key, out q))
{
- q = new ElasticCircularBuffer();
+ q = [];
this.startEventInformation.Add(key, q);
var x = new AdjustInfo(current);
q.Enqueue(ref x);
@@ -1715,7 +1715,7 @@ public override void OnNext(StreamEvent value)
key = Tuple.Create(value.SyncTime, value.Payload);
if (!this.startEventInformation.TryGetValue(key, out q))
{
- q = new ElasticCircularBuffer();
+ q = [];
this.startEventInformation.Add(key, q);
var x = new AdjustInfo(current);
q.Enqueue(ref x);
@@ -1993,7 +1993,7 @@ private void Action(long start, long end, TResult payload, Empty actionKey)
key = Tuple.Create(value.SyncTime, value.Payload);
if (!this.startEventInformation.TryGetValue(key, out q))
{
- q = new ElasticCircularBuffer();
+ q = [];
this.startEventInformation.Add(key, q);
var x = new AdjustInfo(current);
q.Enqueue(ref x);
@@ -3722,7 +3722,7 @@ public override void OnNext(PartitionedStreamEvent value)
this.partitionHighWatermarks.Add(value.PartitionKey, this.lowWatermark.rawValue);
if (this.highWatermarkToPartitionsMap.TryGetValue(this.lowWatermark.rawValue, out HashSet keySet)) keySet.Add(value.PartitionKey);
- else this.highWatermarkToPartitionsMap.Add(this.lowWatermark.rawValue, new HashSet { value.PartitionKey });
+ else this.highWatermarkToPartitionsMap.Add(this.lowWatermark.rawValue, [value.PartitionKey]);
}
long moveTo = moveFrom;
@@ -3743,7 +3743,7 @@ public override void OnNext(PartitionedStreamEvent value)
else oldSet.Remove(value.PartitionKey);
if (this.highWatermarkToPartitionsMap.TryGetValue(value.SyncTime, out HashSet set)) set.Add(value.PartitionKey);
- else this.highWatermarkToPartitionsMap.Add(value.SyncTime, new HashSet { value.PartitionKey });
+ else this.highWatermarkToPartitionsMap.Add(value.SyncTime, [value.PartitionKey]);
if (value.IsData)
{
@@ -3864,7 +3864,7 @@ private void Process(ref PartitionedStreamEvent value)
key = Tuple.Create(value.SyncTime, value.Payload);
if (!this.startEventInformation.TryGetValue(key, out q))
{
- q = new ElasticCircularBuffer();
+ q = [];
this.startEventInformation.Add(key, q);
var x = new AdjustInfo(current);
q.Enqueue(ref x);
@@ -4114,7 +4114,7 @@ protected override void UpdatePointers()
if (this.highWatermarkToPartitionsMap.TryGetValue(kvp.Value, out HashSet set))
set.Add(kvp.Key);
else
- this.highWatermarkToPartitionsMap.Add(kvp.Value, new HashSet { kvp.Key });
+ this.highWatermarkToPartitionsMap.Add(kvp.Value, [kvp.Key]);
}
}
@@ -4216,7 +4216,7 @@ public override void OnNext(PartitionedStreamEvent value)
this.partitionHighWatermarks.Add(value.PartitionKey, this.lowWatermark.rawValue);
if (this.highWatermarkToPartitionsMap.TryGetValue(this.lowWatermark.rawValue, out HashSet keySet)) keySet.Add(value.PartitionKey);
- else this.highWatermarkToPartitionsMap.Add(this.lowWatermark.rawValue, new HashSet { value.PartitionKey });
+ else this.highWatermarkToPartitionsMap.Add(this.lowWatermark.rawValue, [value.PartitionKey]);
}
long moveTo = moveFrom;
@@ -4237,7 +4237,7 @@ public override void OnNext(PartitionedStreamEvent value)
else oldSet.Remove(value.PartitionKey);
if (this.highWatermarkToPartitionsMap.TryGetValue(value.SyncTime, out HashSet set)) set.Add(value.PartitionKey);
- else this.highWatermarkToPartitionsMap.Add(value.SyncTime, new HashSet { value.PartitionKey });
+ else this.highWatermarkToPartitionsMap.Add(value.SyncTime, [value.PartitionKey]);
if (value.IsData)
{
@@ -4358,7 +4358,7 @@ private void Process(ref PartitionedStreamEvent value)
key = Tuple.Create(value.SyncTime, value.Payload);
if (!this.startEventInformation.TryGetValue(key, out q))
{
- q = new ElasticCircularBuffer();
+ q = [];
this.startEventInformation.Add(key, q);
var x = new AdjustInfo(current);
q.Enqueue(ref x);
@@ -4603,7 +4603,7 @@ protected override void UpdatePointers()
if (this.highWatermarkToPartitionsMap.TryGetValue(kvp.Value, out HashSet set))
set.Add(kvp.Key);
else
- this.highWatermarkToPartitionsMap.Add(kvp.Value, new HashSet { kvp.Key });
+ this.highWatermarkToPartitionsMap.Add(kvp.Value, [kvp.Key]);
}
}
@@ -4708,7 +4708,7 @@ private void Action(long start, long end, TResult payload, PartitionKey ac
this.partitionHighWatermarks.Add(value.PartitionKey, this.lowWatermark.rawValue);
if (this.highWatermarkToPartitionsMap.TryGetValue(this.lowWatermark.rawValue, out HashSet keySet)) keySet.Add(value.PartitionKey);
- else this.highWatermarkToPartitionsMap.Add(this.lowWatermark.rawValue, new HashSet { value.PartitionKey });
+ else this.highWatermarkToPartitionsMap.Add(this.lowWatermark.rawValue, [value.PartitionKey]);
}
long moveTo = moveFrom;
@@ -4729,7 +4729,7 @@ private void Action(long start, long end, TResult payload, PartitionKey ac
else oldSet.Remove(value.PartitionKey);
if (this.highWatermarkToPartitionsMap.TryGetValue(value.SyncTime, out HashSet set)) set.Add(value.PartitionKey);
- else this.highWatermarkToPartitionsMap.Add(value.SyncTime, new HashSet { value.PartitionKey });
+ else this.highWatermarkToPartitionsMap.Add(value.SyncTime, [value.PartitionKey]);
if (value.IsData)
{
@@ -4850,7 +4850,7 @@ private void Process(ref PartitionedStreamEvent value)
key = Tuple.Create(value.SyncTime, value.Payload);
if (!this.startEventInformation.TryGetValue(key, out q))
{
- q = new ElasticCircularBuffer();
+ q = [];
this.startEventInformation.Add(key, q);
var x = new AdjustInfo(current);
q.Enqueue(ref x);
@@ -5100,7 +5100,7 @@ protected override void UpdatePointers()
if (this.highWatermarkToPartitionsMap.TryGetValue(kvp.Value, out HashSet set))
set.Add(kvp.Key);
else
- this.highWatermarkToPartitionsMap.Add(kvp.Value, new HashSet { kvp.Key });
+ this.highWatermarkToPartitionsMap.Add(kvp.Value, [kvp.Key]);
}
}
@@ -5225,7 +5225,7 @@ public override void OnNext(PartitionedStreamEvent value)
key = Tuple.Create(value.SyncTime, value.Payload);
if (!this.startEventInformation.TryGetValue(key, out q))
{
- q = new ElasticCircularBuffer();
+ q = [];
this.startEventInformation.Add(key, q);
var x = new AdjustInfo(current);
q.Enqueue(ref x);
@@ -5542,7 +5542,7 @@ public override void OnNext(PartitionedStreamEvent value)
key = Tuple.Create(value.SyncTime, value.Payload);
if (!this.startEventInformation.TryGetValue(key, out q))
{
- q = new ElasticCircularBuffer();
+ q = [];
this.startEventInformation.Add(key, q);
var x = new AdjustInfo(current);
q.Enqueue(ref x);
@@ -5857,7 +5857,7 @@ private void Action(long start, long end, TResult payload, PartitionKey ac
key = Tuple.Create(value.SyncTime, value.Payload);
if (!this.startEventInformation.TryGetValue(key, out q))
{
- q = new ElasticCircularBuffer();
+ q = [];
this.startEventInformation.Add(key, q);
var x = new AdjustInfo(current);
q.Enqueue(ref x);
@@ -6140,7 +6140,7 @@ public override void OnNext(TPayload inputValue)
this.partitionHighWatermarks.Add(value.PartitionKey, this.lowWatermark.rawValue);
if (this.highWatermarkToPartitionsMap.TryGetValue(this.lowWatermark.rawValue, out HashSet keySet)) keySet.Add(value.PartitionKey);
- else this.highWatermarkToPartitionsMap.Add(this.lowWatermark.rawValue, new HashSet { value.PartitionKey });
+ else this.highWatermarkToPartitionsMap.Add(this.lowWatermark.rawValue, [value.PartitionKey]);
}
long moveTo = moveFrom;
@@ -6161,7 +6161,7 @@ public override void OnNext(TPayload inputValue)
else oldSet.Remove(value.PartitionKey);
if (this.highWatermarkToPartitionsMap.TryGetValue(value.SyncTime, out HashSet set)) set.Add(value.PartitionKey);
- else this.highWatermarkToPartitionsMap.Add(value.SyncTime, new HashSet { value.PartitionKey });
+ else this.highWatermarkToPartitionsMap.Add(value.SyncTime, [value.PartitionKey]);
moveTo = value.SyncTime - this.reorderLatency;
if (moveTo < StreamEvent.MinSyncTime) moveTo = StreamEvent.MinSyncTime;
@@ -6419,7 +6419,7 @@ protected override void UpdatePointers()
if (this.highWatermarkToPartitionsMap.TryGetValue(kvp.Value, out HashSet set))
set.Add(kvp.Key);
else
- this.highWatermarkToPartitionsMap.Add(kvp.Value, new HashSet { kvp.Key });
+ this.highWatermarkToPartitionsMap.Add(kvp.Value, [kvp.Key]);
}
}
@@ -6540,7 +6540,7 @@ public override void OnNext(TPayload inputValue)
this.partitionHighWatermarks.Add(value.PartitionKey, this.lowWatermark.rawValue);
if (this.highWatermarkToPartitionsMap.TryGetValue(this.lowWatermark.rawValue, out HashSet keySet)) keySet.Add(value.PartitionKey);
- else this.highWatermarkToPartitionsMap.Add(this.lowWatermark.rawValue, new HashSet { value.PartitionKey });
+ else this.highWatermarkToPartitionsMap.Add(this.lowWatermark.rawValue, [value.PartitionKey]);
}
long moveTo = moveFrom;
@@ -6561,7 +6561,7 @@ public override void OnNext(TPayload inputValue)
else oldSet.Remove(value.PartitionKey);
if (this.highWatermarkToPartitionsMap.TryGetValue(value.SyncTime, out HashSet set)) set.Add(value.PartitionKey);
- else this.highWatermarkToPartitionsMap.Add(value.SyncTime, new HashSet { value.PartitionKey });
+ else this.highWatermarkToPartitionsMap.Add(value.SyncTime, [value.PartitionKey]);
moveTo = value.SyncTime - this.reorderLatency;
if (moveTo < StreamEvent.MinSyncTime) moveTo = StreamEvent.MinSyncTime;
@@ -6814,7 +6814,7 @@ protected override void UpdatePointers()
if (this.highWatermarkToPartitionsMap.TryGetValue(kvp.Value, out HashSet set))
set.Add(kvp.Key);
else
- this.highWatermarkToPartitionsMap.Add(kvp.Value, new HashSet { kvp.Key });
+ this.highWatermarkToPartitionsMap.Add(kvp.Value, [kvp.Key]);
}
}
@@ -6937,7 +6937,7 @@ private void Action(long start, long end, TResult payload, PartitionKey ac
this.partitionHighWatermarks.Add(value.PartitionKey, this.lowWatermark.rawValue);
if (this.highWatermarkToPartitionsMap.TryGetValue(this.lowWatermark.rawValue, out HashSet keySet)) keySet.Add(value.PartitionKey);
- else this.highWatermarkToPartitionsMap.Add(this.lowWatermark.rawValue, new HashSet { value.PartitionKey });
+ else this.highWatermarkToPartitionsMap.Add(this.lowWatermark.rawValue, [value.PartitionKey]);
}
long moveTo = moveFrom;
@@ -6958,7 +6958,7 @@ private void Action(long start, long end, TResult payload, PartitionKey ac
else oldSet.Remove(value.PartitionKey);
if (this.highWatermarkToPartitionsMap.TryGetValue(value.SyncTime, out HashSet set)) set.Add(value.PartitionKey);
- else this.highWatermarkToPartitionsMap.Add(value.SyncTime, new HashSet { value.PartitionKey });
+ else this.highWatermarkToPartitionsMap.Add(value.SyncTime, [value.PartitionKey]);
moveTo = value.SyncTime - this.reorderLatency;
if (moveTo < StreamEvent.MinSyncTime) moveTo = StreamEvent.MinSyncTime;
@@ -7216,7 +7216,7 @@ protected override void UpdatePointers()
if (this.highWatermarkToPartitionsMap.TryGetValue(kvp.Value, out HashSet set))
set.Add(kvp.Key);
else
- this.highWatermarkToPartitionsMap.Add(kvp.Value, new HashSet { kvp.Key });
+ this.highWatermarkToPartitionsMap.Add(kvp.Value, [kvp.Key]);
}
}
diff --git a/Sources/Core/Microsoft.StreamProcessing/Microsoft.StreamProcessing.csproj b/Sources/Core/Microsoft.StreamProcessing/Microsoft.StreamProcessing.csproj
index b96b07891..bdcd96f83 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Microsoft.StreamProcessing.csproj
+++ b/Sources/Core/Microsoft.StreamProcessing/Microsoft.StreamProcessing.csproj
@@ -1,7 +1,7 @@
- netstandard2.0
+ net10.0
x64;AnyCPU
@@ -10,15 +10,28 @@
Microsoft.StreamProcessing
+
+ True
+ 1701;1702;0009
+
+
+
+ True
+ 1701;1702;0009
+
+
+
+ True
+ 1701;1702;0009
+
+
+
+ True
+ 1701;1702;0009
+
+
-
-
-
-
-
-
-
-
+
diff --git a/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/AfaDescriptor.cs b/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/AfaDescriptor.cs
index b701e11b1..ed815cf22 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/AfaDescriptor.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/AfaDescriptor.cs
@@ -71,12 +71,12 @@ public Afa(TRegister defaultRegister = default, TAccumulator defaultAccumulator
///
/// The set of final states in the AFA.
///
- internal List finalStates = new List();
+ internal List finalStates = [];
///
/// The arcs present in the AFA.
///
- internal Dictionary>> transitionInfo = new Dictionary>>();
+ internal Dictionary>> transitionInfo = [];
///
/// Start state of the AFA.
@@ -180,7 +180,7 @@ internal void AddArc(int fromState, int toState, Arc arc)
if (!this.transitionInfo.ContainsKey(fromState))
{
- this.transitionInfo.Add(fromState, new Dictionary>());
+ this.transitionInfo.Add(fromState, []);
}
if (!this.transitionInfo[fromState].ContainsKey(toState))
diff --git a/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/AfaMultiEventListTransformer.cs b/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/AfaMultiEventListTransformer.cs
index 1d2030c91..4da11f940 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/AfaMultiEventListTransformer.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/AfaMultiEventListTransformer.cs
@@ -13,8 +13,8 @@ namespace Microsoft.StreamProcessing
internal partial class AfaMultiEventListTemplate : AfaTemplate
{
private Func keyEqualityComparer;
- protected readonly List>> edgeInfos = new List>>();
- protected readonly List>> startEdgeInfos = new List>>();
+ protected readonly List>> edgeInfos = [];
+ protected readonly List>> startEdgeInfos = [];
private bool payloadIsAnon;
private bool payloadHasNoFields;
diff --git a/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/AfaTransformer.cs b/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/AfaTransformer.cs
index 36acd5d6a..b446e8efa 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/AfaTransformer.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/AfaTransformer.cs
@@ -17,8 +17,8 @@ internal partial class AfaTemplate
protected Type accumulatorType;
protected bool hasRegister;
protected bool isSyncTimeSimultaneityFree;
- protected readonly List>> currentlyActiveInfo = new List>>();
- protected readonly List>> newActivationInfo = new List>>();
+ protected readonly List>> currentlyActiveInfo = [];
+ protected readonly List>> newActivationInfo = [];
protected string TKey;
protected string TPayload;
protected string TRegister;
diff --git a/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/CompiledUngroupedAfaPipe_EventList.cs b/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/CompiledUngroupedAfaPipe_EventList.cs
index 27c6af00c..8266ff761 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/CompiledUngroupedAfaPipe_EventList.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/CompiledUngroupedAfaPipe_EventList.cs
@@ -36,7 +36,7 @@ public CompiledUngroupedAfaPipe_EventList(Streamable stream, I
this.activeStates = new FastLinkedList>();
this.activeStatesTraverser = new FastLinkedList>.ListTraverser(this.activeStates);
- this.currentList = new List();
+ this.currentList = [];
this.lastSyncTime = -1;
}
@@ -320,7 +320,7 @@ public override unsafe void OnNext(StreamMessage batch)
{
ProcessCurrentTimestamp();
this.lastSyncTime = synctime;
- this.currentList = new List();
+ this.currentList = [];
}
this.currentList.Add(batch.payload.col[i]);
diff --git a/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/CompiledUngroupedAfaPipe_MultiEventList.cs b/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/CompiledUngroupedAfaPipe_MultiEventList.cs
index 1cff60c47..3fe0cafc6 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/CompiledUngroupedAfaPipe_MultiEventList.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/CompiledUngroupedAfaPipe_MultiEventList.cs
@@ -17,7 +17,7 @@ internal sealed class CompiledUngroupedAfaPipe_MultiEventList> activeStates;
[DataMember]
- private List currentTimestampEventList = new List();
+ private List currentTimestampEventList = [];
[DataMember]
private long lastSyncTime;
diff --git a/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/GroupedAfaMultiEventTransformer.cs b/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/GroupedAfaMultiEventTransformer.cs
index d7c440689..cdd17bc54 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/GroupedAfaMultiEventTransformer.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/GroupedAfaMultiEventTransformer.cs
@@ -13,8 +13,8 @@ namespace Microsoft.StreamProcessing
internal partial class GroupedAfaMultiEventTemplate : AfaTemplate
{
private Func keyEqualityComparer;
- protected readonly List>> edgeInfos = new List>>();
- protected readonly List>> startEdgeInfos = new List>>();
+ protected readonly List>> edgeInfos = [];
+ protected readonly List>> startEdgeInfos = [];
private GroupedAfaMultiEventTemplate(string className, Type keyType, Type payloadType, Type registerType, Type accumulatorType)
: base(className, keyType, payloadType, registerType, accumulatorType)
diff --git a/Sources/Core/Microsoft.StreamProcessing/Operators/Clip/PartitionedClipJoinPipe.cs b/Sources/Core/Microsoft.StreamProcessing/Operators/Clip/PartitionedClipJoinPipe.cs
index 8a551008a..fa32a8623 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Operators/Clip/PartitionedClipJoinPipe.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Operators/Clip/PartitionedClipJoinPipe.cs
@@ -30,11 +30,11 @@ internal sealed class PartitionedClipJoinPipe> rightQueue = new FastDictionary2>();
[DataMember]
- private HashSet processQueue = new HashSet