Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 21 additions & 6 deletions EnterpriseIntegrationPlatform/rules/milestones.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,34 @@

## Completed Phases

✅ Phases 1–15 complete — see `rules/completion-log.md` for full history.
✅ Phases 1–18 complete — see `rules/completion-log.md` for full history.

**Current stats:** 1,472 UnitTests + 58 Contract + 29 Workflow + 17 Integration + 10 Load + 19 Vitest = **1,605 total tests**. 48 src projects.

## Next Chunk
---

All phases complete (including Phase 15 Tutorial Fixes Round 2). See `rules/completion-log.md` for full history.
### Phase 19 — Tutorial Audit as New Developer (Round 6)

---
✅ Phase 19 complete.

**Scope:** Approached the repo as a brand-new developer with no prior context. Read each tutorial, found every code snippet, and verified signatures, `required` keywords, default values, interface inheritance, and method completeness against actual source.

### Phase 15 — Tutorial Fixes Round 2
**Findings:** 8 tutorials had issues; 42 passed clean.

| Tutorial | Issue | Fix Applied |
|----------|-------|-------------|
| 03 — First Message | `IntegrationEnvelope<T>` missing `required` keyword on 6 properties (`MessageId`, `CorrelationId`, `Timestamp`, `Source`, `MessageType`, `Payload`); `Priority` and `Metadata` missing default values; property order didn't match source | Rewrote record with `required` keywords, defaults, and correct property order |
| 05 — Message Brokers | `IMessageBrokerConsumer` missing `: IAsyncDisposable` inheritance | Added `: IAsyncDisposable` |
| 06 — Messaging Channels | `IInvalidMessageChannel` missing `RouteRawInvalidAsync` method for handling unparseable raw data | Added method + explanatory note |
| 08 — Activities & Pipeline | `PipelineOrchestrator.ProcessAsync` shown as generic `<T>` but actual source uses `IntegrationEnvelope<JsonElement>` — class also not `sealed` | Fixed to `JsonElement`, added `sealed`, corrected param name |
| 32 — Multi-Tenancy | `ITenantOnboardingService` missing `GetStatusAsync` method | Added `GetStatusAsync` with nullable return |
| 42 — Configuration | `ConfigurationChangeNotifier` missing `: IDisposable` inheritance | Added `: IDisposable` |
| 48 — Notification Use Cases | `NotificationFeatureFlags.Enabled` constant doesn't exist — actual is `NotificationsEnabled`; `NotificationDecisionService` class presented as real code but doesn't exist in source | Fixed constant name; added pseudocode disclaimer comment |
| 49 — Testing Integrations | Test example used non-existent `NotificationDecisionService` class | Replaced with actual `XmlNotificationMapperTests` from source; updated exercise |

## Next Chunk

✅ Phase 15 complete — see completion-log.md
All phases complete (1–19). See `rules/completion-log.md` for full history.

---

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ namespace EnterpriseIntegrationPlatform.Processing.Transform;

/// <summary>
/// Enterprise Integration Pattern — Normalizer.
/// Detects the format of an incoming message (JSON, XML, CSV, flat-file) and converts
/// Detects the format of an incoming message (JSON, XML, CSV) and converts
/// it to the canonical JSON representation used throughout the platform.
/// </summary>
/// <remarks>
Expand Down
18 changes: 9 additions & 9 deletions EnterpriseIntegrationPlatform/tutorials/03-first-message.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,21 @@ Every message in the platform is wrapped in an `IntegrationEnvelope<T>`. This is

public record IntegrationEnvelope<T>
{
public Guid MessageId { get; init; }
public Guid CorrelationId { get; init; }
public required Guid MessageId { get; init; }
public required Guid CorrelationId { get; init; }
public Guid? CausationId { get; init; }
public DateTimeOffset Timestamp { get; init; }
public string Source { get; init; }
public string MessageType { get; init; }
public required DateTimeOffset Timestamp { get; init; }
public required string Source { get; init; }
public required string MessageType { get; init; }
public string SchemaVersion { get; init; } = "1.0";
public T Payload { get; init; }
public MessagePriority Priority { get; init; }
public MessagePriority Priority { get; init; } = MessagePriority.Normal;
public required T Payload { get; init; }
public Dictionary<string, string> Metadata { get; init; } = new();
public string? ReplyTo { get; init; }
public DateTimeOffset? ExpiresAt { get; init; }
public int? SequenceNumber { get; init; }
public int? TotalCount { get; init; }
public MessageIntent? Intent { get; init; }
public Dictionary<string, string> Metadata { get; init; }
}
```

Expand Down Expand Up @@ -152,7 +152,7 @@ The consumer side uses `IMessageBrokerConsumer`:
```csharp
// Location: src/Ingestion/IMessageBrokerConsumer.cs

public interface IMessageBrokerConsumer
public interface IMessageBrokerConsumer : IAsyncDisposable
{
Task SubscribeAsync<T>(
string topic,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public interface IMessageBrokerProducer
}

// src/Ingestion/IMessageBrokerConsumer.cs
public interface IMessageBrokerConsumer
public interface IMessageBrokerConsumer : IAsyncDisposable
{
Task SubscribeAsync<T>(
string topic,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,17 @@ public interface IInvalidMessageChannel
IntegrationEnvelope<T> envelope,
string reason,
CancellationToken cancellationToken = default);

Task RouteRawInvalidAsync(
string rawData,
string sourceTopic,
string reason,
CancellationToken cancellationToken = default);
}
```

> `RouteInvalidAsync` handles messages that were parsed into an envelope but failed validation. `RouteRawInvalidAsync` handles raw data that could not be deserialized into an envelope at all.

### How It Works

```
Expand Down
175 changes: 112 additions & 63 deletions EnterpriseIntegrationPlatform/tutorials/07-temporal-workflows.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,32 +58,60 @@ public class IntegrationPipelineWorkflow
{
// Step 1: Persist the message (status: Pending)
await Workflow.ExecuteActivityAsync(
(IntegrationActivities a) => a.PersistMessageAsync(input),
ActivityOptions);
(PipelineActivities act) => act.PersistMessageAsync(input),
PipelineActivityOptions);

// Step 2: Validate the message
var validationResult = await Workflow.ExecuteActivityAsync(
(IntegrationActivities a) => a.ValidateMessageAsync(input),
ActivityOptions);
// Step 2: Log Received lifecycle event
await Workflow.ExecuteActivityAsync(
(PipelineActivities act) =>
act.LogStageAsync(input.MessageId, input.MessageType, "Received"),
PipelineActivityOptions);

// Step 3: Validate the message
var validation = await Workflow.ExecuteActivityAsync(
(IntegrationActivities act) =>
act.ValidateMessageAsync(input.MessageType, input.PayloadJson),
ValidationActivityOptions);

if (!validationResult.IsValid)
if (!validation.IsValid)
{
// Publish Nack and route to DLQ
// Publish Nack and update status to Failed
await Workflow.ExecuteActivityAsync(
(IntegrationActivities a) => a.PublishNackAsync(input, validationResult),
ActivityOptions);
return new IntegrationPipelineResult(input.MessageId, false, string.Join("; ", validationResult.Errors));
}
(PipelineActivities act) =>
act.UpdateDeliveryStatusAsync(
input.MessageId, input.CorrelationId,
input.Timestamp, "Failed"),
PipelineActivityOptions);

// Step 3: Update status to InFlight
await Workflow.ExecuteActivityAsync(
(IntegrationActivities a) => a.UpdateStatusAsync(input, DeliveryStatus.InFlight),
ActivityOptions);
if (input.NotificationsEnabled)
{
await Workflow.ExecuteActivityAsync(
(PipelineActivities act) =>
act.PublishNackAsync(
input.MessageId, input.CorrelationId,
validation.Reason ?? "Validation failed", input.NackSubject),
PipelineActivityOptions);
}

return new IntegrationPipelineResult(input.MessageId, false, validation.Reason);
}

// Step 4: Publish Ack
// Step 4: Update status to Delivered
await Workflow.ExecuteActivityAsync(
(IntegrationActivities a) => a.PublishAckAsync(input),
ActivityOptions);
(PipelineActivities act) =>
act.UpdateDeliveryStatusAsync(
input.MessageId, input.CorrelationId,
input.Timestamp, "Delivered"),
PipelineActivityOptions);

// Step 5: Publish Ack
if (input.NotificationsEnabled)
{
await Workflow.ExecuteActivityAsync(
(PipelineActivities act) =>
act.PublishAckAsync(input.MessageId, input.CorrelationId, input.AckSubject),
PipelineActivityOptions);
}

return new IntegrationPipelineResult(input.MessageId, true);
}
Expand Down Expand Up @@ -119,49 +147,53 @@ Publish Nack with compensation details
```

```csharp
// Simplified saga compensation flow
// src/Workflow.Temporal/Workflows/AtomicPipelineWorkflow.cs (simplified)
[Workflow]
public class AtomicPipelineWorkflow
{
[WorkflowRun]
public async Task<IntegrationPipelineResult> RunAsync(
public async Task<AtomicPipelineResult> RunAsync(
IntegrationPipelineInput input)
{
var completedSteps = new Stack<string>();
var completedSteps = new List<string>();

try
{
// Execute steps, tracking each completed one
await ExecuteStep("persist", input);
completedSteps.Push("persist");

await ExecuteStep("validate", input);
completedSteps.Push("validate");

await ExecuteStep("transform", input);
completedSteps.Push("transform");
// Step 1: Persist message as Pending
await Workflow.ExecuteActivityAsync(
(PipelineActivities act) => act.PersistMessageAsync(input),
PipelineActivityOptions);
completedSteps.Add("PersistMessage");

await ExecuteStep("deliver", input);
completedSteps.Push("deliver");
// Step 2: Validate message
var validation = await Workflow.ExecuteActivityAsync(
(IntegrationActivities act) =>
act.ValidateMessageAsync(input.MessageType, input.PayloadJson),
ValidationActivityOptions);

await PublishAck(input);
return new IntegrationPipelineResult(input.MessageId, true);
}
catch (Exception ex)
if (!validation.IsValid)
{
// Compensate in reverse order
while (completedSteps.Count > 0)
// Compensate all previously completed steps in reverse order
foreach (var step in Enumerable.Reverse(completedSteps))
{
var step = completedSteps.Pop();
await Workflow.ExecuteActivityAsync(
(SagaCompensationActivities a) =>
a.CompensateAsync(step, input),
CompensationOptions);
(SagaCompensationActivities act) =>
act.CompensateStepAsync(input.CorrelationId, step),
CompensationActivityOptions);
}

await PublishNack(input, ex);
return new IntegrationPipelineResult(input.MessageId, false, ex.Message);
// Save fault and publish Nack
return new AtomicPipelineResult(
input.MessageId, false, validation.Reason);
}

// Step 3: Update status to Delivered and Publish Ack
await Workflow.ExecuteActivityAsync(
(PipelineActivities act) =>
act.UpdateDeliveryStatusAsync(
input.MessageId, input.CorrelationId,
input.Timestamp, "Delivered"),
PipelineActivityOptions);

return new AtomicPipelineResult(input.MessageId, true);
}
}
```
Expand All @@ -174,43 +206,60 @@ Activities are the building blocks that workflows orchestrate. Each activity is

```csharp
// src/Workflow.Temporal/Activities/IntegrationActivities.cs (simplified)
// Handles validation and processing-stage logging

[Activity]
public class IntegrationActivities
{
[ActivityMethod]
public async Task PersistMessageAsync(IntegrationPipelineInput input)
[Activity]
public async Task<MessageValidationResult> ValidateMessageAsync(
string messageType, string payloadJson)
{
// Save message to Cassandra with status: Pending
await _persistence.SaveMessageAsync(input.Envelope, DeliveryStatus.Pending);
// Validate the message against schema and business rules
return await _validation.ValidateAsync(messageType, payloadJson);
}

[ActivityMethod]
public async Task<ValidationResult> ValidateMessageAsync(
IntegrationPipelineInput input)
[Activity]
public async Task LogProcessingStageAsync(
Guid messageId, string messageType, string stage)
{
// Validate the message against schema and business rules
return await _validation.ValidateAsync(input.Envelope);
// Record a lifecycle stage for observability
}
}

// src/Workflow.Temporal/Activities/PipelineActivities.cs (simplified)
// Handles persistence, delivery status, acknowledgments, and faults

[Activity]
public class PipelineActivities
{
[Activity]
public async Task PersistMessageAsync(IntegrationPipelineInput input)
{
// Save message to Cassandra with status: Pending
await _persistence.SaveMessageAsync(input);
}

[ActivityMethod]
public async Task PublishAckAsync(IntegrationPipelineInput input)
[Activity]
public async Task PublishAckAsync(
Guid messageId, Guid correlationId, string topic)
{
// Publish acknowledgment to Ack topic
await _notification.PublishAckAsync(input.Envelope);
await _notification.PublishAckAsync(messageId, correlationId, topic);
}

[ActivityMethod]
[Activity]
public async Task PublishNackAsync(
IntegrationPipelineInput input,
ValidationResult result)
Guid messageId, Guid correlationId, string reason, string topic)
{
// Publish negative acknowledgment to Nack topic
await _notification.PublishNackAsync(input.Envelope, result.Errors);
await _notification.PublishNackAsync(messageId, correlationId, reason, topic);
}
}
```

> **Note:** Activities are split across two classes: `IntegrationActivities` (validation and logging) and `PipelineActivities` (persistence and notifications). A third class, `SagaCompensationActivities`, handles rollback (see Tutorial 47).

### Activity Design Principles

1. **Stateless** — Activities don't hold state between executions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,11 @@ The orchestrator wraps the message in pipeline input and dispatches to Temporal:

```csharp
// Simplified from src/Demo.Pipeline/PipelineOrchestrator.cs
public class PipelineOrchestrator : IPipelineOrchestrator
public sealed class PipelineOrchestrator : IPipelineOrchestrator
{
public async Task ProcessAsync<T>(
IntegrationEnvelope<T> envelope,
CancellationToken ct)
public async Task ProcessAsync(
IntegrationEnvelope<JsonElement> envelope,
CancellationToken cancellationToken = default)
{
var input = new IntegrationPipelineInput
{
Expand All @@ -197,7 +197,7 @@ public class PipelineOrchestrator : IPipelineOrchestrator
// ... map from envelope to workflow input
};

await _dispatcher.DispatchAsync(input, ct);
await _dispatcher.DispatchAsync(input, cancellationToken);
}
}
```
Expand Down
Loading
Loading