Skip to content

Commit d7005fd

Browse files
committed
pr feedback
1 parent 7ae4f80 commit d7005fd

9 files changed

Lines changed: 47 additions & 17 deletions

File tree

src/WorkflowCore.DSL/Services/DefinitionLoader.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ private WorkflowStepCollection ConvertSteps(ICollection<StepSourceV1> source, Ty
153153
}
154154
catch (Exception ex) when (ex is System.Linq.Dynamic.Core.Exceptions.ParseException || ex is InvalidOperationException)
155155
{
156-
throw new WorkflowDefinitionLoadException($"Error parsing cancel condition expression '{nextStep.CancelCondition}' for step '{nextStep.Id}': {ex.Message}");
156+
throw new WorkflowDefinitionLoadException($"Error parsing cancel condition expression '{nextStep.CancelCondition}' for step '{nextStep.Id}': {ex.Message}", ex);
157157
}
158158
}
159159

@@ -283,7 +283,7 @@ private void AttachOutputs(StepSourceV1 source, Type dataType, Type stepType, Wo
283283
}
284284
catch (Exception ex) when (ex is System.Linq.Dynamic.Core.Exceptions.ParseException || ex is InvalidOperationException)
285285
{
286-
throw new WorkflowDefinitionLoadException($"Error parsing output expression '{output.Value}': {ex.Message}");
286+
throw new WorkflowDefinitionLoadException($"Error parsing output expression '{output.Value}': {ex.Message}", ex);
287287
}
288288

289289
var dataParameter = Expression.Parameter(dataType, "data");
@@ -442,7 +442,7 @@ private void AttachOutcomes(StepSourceV1 source, Type dataType, WorkflowStep ste
442442
}
443443
catch (Exception ex) when (ex is System.Linq.Dynamic.Core.Exceptions.ParseException || ex is InvalidOperationException)
444444
{
445-
throw new WorkflowDefinitionLoadException($"Error parsing select next step expression '{nextStep.Value}': {ex.Message}");
445+
throw new WorkflowDefinitionLoadException($"Error parsing select next step expression '{nextStep.Value}': {ex.Message}", ex);
446446
}
447447
Expression<Func<object, object, bool>> sourceExpr = (data, outcome) => System.Convert.ToBoolean(sourceDelegate.DynamicInvoke(data, outcome));
448448
step.Outcomes.Add(new ExpressionOutcome<object>(sourceExpr)
@@ -467,7 +467,7 @@ private static Action<IStepBody, object, IStepExecutionContext> BuildScalarInput
467467
}
468468
catch (Exception ex) when (ex is System.Linq.Dynamic.Core.Exceptions.ParseException || ex is InvalidOperationException)
469469
{
470-
throw new WorkflowDefinitionLoadException($"Error parsing input expression '{expr}' for property '{input.Key}': {ex.Message}");
470+
throw new WorkflowDefinitionLoadException($"Error parsing input expression '{expr}' for property '{input.Key}': {ex.Message}", ex);
471471
}
472472

473473
void acn(IStepBody pStep, object pData, IStepExecutionContext pContext)

src/WorkflowCore/Exceptions/WorkflowDefinitionLoadException.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,10 @@ public WorkflowDefinitionLoadException(string message)
88
: base (message)
99
{
1010
}
11+
12+
public WorkflowDefinitionLoadException(string message, Exception innerException)
13+
: base(message, innerException)
14+
{
15+
}
1116
}
1217
}

src/WorkflowCore/Services/ActivityController.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public async Task<PendingActivity> GetPendingActivity(string activityName, strin
5555
Token = token.Encode(),
5656
ActivityName = subscription.EventKey,
5757
Parameters = subscription.SubscriptionData,
58-
TokenExpiry = DateTime.MaxValue
58+
TokenExpiry = DateTime.SpecifyKind(DateTime.MaxValue, DateTimeKind.Utc)
5959
};
6060

6161
if (!await _subscriptionRepository.SetSubscriptionToken(subscription.Id, result.Token, workerId, result.TokenExpiry))

src/WorkflowCore/Services/DefaultProviders/MemoryPersistenceProvider.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,8 @@ public Task<bool> SetSubscriptionToken(string eventSubscriptionId, string token,
178178
lock (_subscriptions)
179179
{
180180
var sub = _subscriptions.SingleOrDefault(x => x.Id == eventSubscriptionId);
181+
if (sub == null)
182+
return Task.FromResult(false);
181183
sub.ExternalToken = token;
182184
sub.ExternalWorkerId = workerId;
183185
sub.ExternalTokenExpiry = expiry;
@@ -191,6 +193,8 @@ public Task ClearSubscriptionToken(string eventSubscriptionId, string token, Can
191193
lock (_subscriptions)
192194
{
193195
var sub = _subscriptions.SingleOrDefault(x => x.Id == eventSubscriptionId);
196+
if (sub == null)
197+
throw new InvalidOperationException($"Subscription {eventSubscriptionId} not found.");
194198
if (sub.ExternalToken != token)
195199
throw new InvalidOperationException();
196200
sub.ExternalToken = null;

src/WorkflowCore/Services/DefaultProviders/SingleNodeEventHub.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ public SingleNodeEventHub(ILoggerFactory loggerFactory)
1818
_logger = loggerFactory.CreateLogger<SingleNodeEventHub>();
1919
}
2020

21-
public async Task PublishNotification(LifeCycleEvent evt)
21+
public Task PublishNotification(LifeCycleEvent evt)
2222
{
23-
await Task.Run(() =>
23+
Task.Run(() =>
2424
{
2525
foreach (var subscriber in _subscribers.ToArray())
2626
{
@@ -34,6 +34,7 @@ await Task.Run(() =>
3434
}
3535
}
3636
});
37+
return Task.CompletedTask;
3738
}
3839

3940
public void Subscribe(Action<LifeCycleEvent> action)

src/extensions/WorkflowCore.AI.AzureFoundry/Services/InMemoryConversationStore.cs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,22 @@ public Task<ConversationThread> GetOrCreateThreadAsync(string workflowInstanceId
3838
return thread.Id;
3939
});
4040

41-
return Task.FromResult(_threads[threadId]);
41+
if (_threads.TryGetValue(threadId, out var existingThread))
42+
{
43+
return Task.FromResult(existingThread);
44+
}
45+
46+
// The thread was removed or the mapping is stale; recreate and update the mapping.
47+
var newThread = new ConversationThread
48+
{
49+
WorkflowInstanceId = workflowInstanceId,
50+
ExecutionPointerId = executionPointerId
51+
};
52+
53+
_threads[newThread.Id] = newThread;
54+
_workflowThreadMap[key] = newThread.Id;
55+
56+
return Task.FromResult(newThread);
4257
}
4358

4459
public Task SaveThreadAsync(ConversationThread thread)

src/extensions/WorkflowCore.WebAPI/Controllers/WorkflowsController.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public async Task<IActionResult> Post(string id, int? version, string reference,
5656
string workflowId = null;
5757
var def = _registry.GetDefinition(id, version);
5858
if (def == null)
59-
return NotFound(String.Format("Workflow defintion of {0} not found", id));
59+
return NotFound(String.Format("Workflow definition of {0} not found", id));
6060
if ((data != null) && (def.DataType != null))
6161
{
6262
var dataStr = JsonConvert.SerializeObject(data);

src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ public async Task PersistWorkflow(WorkflowInstance workflow, CancellationToken c
145145
.ThenInclude(ep => ep.ExtensionAttributes)
146146
.Include(wf => wf.ExecutionPointers)
147147
.AsTracking()
148-
.FirstOrDefaultAsync(cancellationToken);
148+
.FirstAsync(cancellationToken);
149149

150150
var persistable = workflow.ToPersistable(existingEntity);
151151
await db.SaveChangesAsync(cancellationToken);
@@ -163,7 +163,7 @@ public async Task PersistWorkflow(WorkflowInstance workflow, List<EventSubscript
163163
.ThenInclude(ep => ep.ExtensionAttributes)
164164
.Include(wf => wf.ExecutionPointers)
165165
.AsTracking()
166-
.FirstOrDefaultAsync(cancellationToken);
166+
.FirstAsync(cancellationToken);
167167

168168
var workflowPersistable = workflow.ToPersistable(existingEntity);
169169

@@ -183,7 +183,7 @@ public async Task TerminateSubscription(string eventSubscriptionId, Cancellation
183183
using (var db = ConstructDbContext())
184184
{
185185
var uid = new Guid(eventSubscriptionId);
186-
var existing = await db.Set<PersistedSubscription>().FirstOrDefaultAsync(x => x.SubscriptionId == uid, cancellationToken);
186+
var existing = await db.Set<PersistedSubscription>().FirstAsync(x => x.SubscriptionId == uid, cancellationToken);
187187
db.Set<PersistedSubscription>().Remove(existing);
188188
await db.SaveChangesAsync(cancellationToken);
189189
}
@@ -273,6 +273,9 @@ public async Task MarkEventProcessed(string id, CancellationToken cancellationTo
273273
.AsTracking()
274274
.FirstOrDefaultAsync(cancellationToken);
275275

276+
if (existingEntity == null)
277+
return;
278+
276279
existingEntity.IsProcessed = true;
277280
await db.SaveChangesAsync(cancellationToken);
278281
}
@@ -307,6 +310,9 @@ public async Task MarkEventUnprocessed(string id, CancellationToken cancellation
307310
.AsTracking()
308311
.FirstOrDefaultAsync(cancellationToken);
309312

313+
if (existingEntity == null)
314+
return;
315+
310316
existingEntity.IsProcessed = false;
311317
await db.SaveChangesAsync(cancellationToken);
312318
}
@@ -363,7 +369,7 @@ public async Task<bool> SetSubscriptionToken(string eventSubscriptionId, string
363369
var existingEntity = await db.Set<PersistedSubscription>()
364370
.Where(x => x.SubscriptionId == uid)
365371
.AsTracking()
366-
.FirstOrDefaultAsync(cancellationToken);
372+
.FirstAsync(cancellationToken);
367373

368374
existingEntity.ExternalToken = token;
369375
existingEntity.ExternalWorkerId = workerId;
@@ -382,7 +388,7 @@ public async Task ClearSubscriptionToken(string eventSubscriptionId, string toke
382388
var existingEntity = await db.Set<PersistedSubscription>()
383389
.Where(x => x.SubscriptionId == uid)
384390
.AsTracking()
385-
.FirstOrDefaultAsync(cancellationToken);
391+
.FirstAsync(cancellationToken);
386392

387393
if (existingEntity.ExternalToken != token)
388394
throw new InvalidOperationException();

src/providers/WorkflowCore.QueueProviders.SqlServer/Services/SqlServerQueueProvider.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,8 @@ private static string SanitizeIdentifier(string name)
128128
{
129129
if (string.IsNullOrWhiteSpace(name))
130130
throw new ArgumentException("Queue name cannot be null or empty.", nameof(name));
131-
if (!System.Text.RegularExpressions.Regex.IsMatch(name, @"^[a-zA-Z_][a-zA-Z0-9_/]*$"))
132-
throw new ArgumentException($"Queue name '{name}' contains invalid characters.", nameof(name));
133-
return name;
131+
// Escape any ']' characters to prevent breaking out of the delimited identifier
132+
return name.Replace("]", "]]");
134133
}
135134
}
136135
}

0 commit comments

Comments
 (0)