diff --git a/EnterpriseIntegrationPlatform/rules/milestones.md b/EnterpriseIntegrationPlatform/rules/milestones.md index 61f19e5..b8f580d 100644 --- a/EnterpriseIntegrationPlatform/rules/milestones.md +++ b/EnterpriseIntegrationPlatform/rules/milestones.md @@ -22,19 +22,34 @@ ## Completed Phases -✅ Phases 1–15 complete — see `rules/completion-log.md` for full history. +✅ Phases 1–18 complete — see `rules/completion-log.md` for full history. **Current stats:** 1,472 UnitTests + 58 Contract + 29 Workflow + 17 Integration + 10 Load + 19 Vitest = **1,605 total tests**. 48 src projects. -## Next Chunk +--- -All phases complete (including Phase 15 Tutorial Fixes Round 2). See `rules/completion-log.md` for full history. +### Phase 19 — Tutorial Audit as New Developer (Round 6) ---- +✅ Phase 19 complete. + +**Scope:** Approached the repo as a brand-new developer with no prior context. Read each tutorial, found every code snippet, and verified signatures, `required` keywords, default values, interface inheritance, and method completeness against actual source. -### Phase 15 — Tutorial Fixes Round 2 +**Findings:** 8 tutorials had issues; 42 passed clean. + +| Tutorial | Issue | Fix Applied | +|----------|-------|-------------| +| 03 — First Message | `IntegrationEnvelope` missing `required` keyword on 6 properties (`MessageId`, `CorrelationId`, `Timestamp`, `Source`, `MessageType`, `Payload`); `Priority` and `Metadata` missing default values; property order didn't match source | Rewrote record with `required` keywords, defaults, and correct property order | +| 05 — Message Brokers | `IMessageBrokerConsumer` missing `: IAsyncDisposable` inheritance | Added `: IAsyncDisposable` | +| 06 — Messaging Channels | `IInvalidMessageChannel` missing `RouteRawInvalidAsync` method for handling unparseable raw data | Added method + explanatory note | +| 08 — Activities & Pipeline | `PipelineOrchestrator.ProcessAsync` shown as generic `` but actual source uses `IntegrationEnvelope` — class also not `sealed` | Fixed to `JsonElement`, added `sealed`, corrected param name | +| 32 — Multi-Tenancy | `ITenantOnboardingService` missing `GetStatusAsync` method | Added `GetStatusAsync` with nullable return | +| 42 — Configuration | `ConfigurationChangeNotifier` missing `: IDisposable` inheritance | Added `: IDisposable` | +| 48 — Notification Use Cases | `NotificationFeatureFlags.Enabled` constant doesn't exist — actual is `NotificationsEnabled`; `NotificationDecisionService` class presented as real code but doesn't exist in source | Fixed constant name; added pseudocode disclaimer comment | +| 49 — Testing Integrations | Test example used non-existent `NotificationDecisionService` class | Replaced with actual `XmlNotificationMapperTests` from source; updated exercise | + +## Next Chunk -✅ Phase 15 complete — see completion-log.md +All phases complete (1–19). See `rules/completion-log.md` for full history. --- diff --git a/EnterpriseIntegrationPlatform/src/Processing.Transform/INormalizer.cs b/EnterpriseIntegrationPlatform/src/Processing.Transform/INormalizer.cs index 1694fbe..7b64654 100644 --- a/EnterpriseIntegrationPlatform/src/Processing.Transform/INormalizer.cs +++ b/EnterpriseIntegrationPlatform/src/Processing.Transform/INormalizer.cs @@ -2,7 +2,7 @@ namespace EnterpriseIntegrationPlatform.Processing.Transform; /// /// Enterprise Integration Pattern — Normalizer. -/// Detects the format of an incoming message (JSON, XML, CSV, flat-file) and converts +/// Detects the format of an incoming message (JSON, XML, CSV) and converts /// it to the canonical JSON representation used throughout the platform. /// /// diff --git a/EnterpriseIntegrationPlatform/tutorials/03-first-message.md b/EnterpriseIntegrationPlatform/tutorials/03-first-message.md index 2b37d83..e8f6d85 100644 --- a/EnterpriseIntegrationPlatform/tutorials/03-first-message.md +++ b/EnterpriseIntegrationPlatform/tutorials/03-first-message.md @@ -18,21 +18,21 @@ Every message in the platform is wrapped in an `IntegrationEnvelope`. This is public record IntegrationEnvelope { - public Guid MessageId { get; init; } - public Guid CorrelationId { get; init; } + public required Guid MessageId { get; init; } + public required Guid CorrelationId { get; init; } public Guid? CausationId { get; init; } - public DateTimeOffset Timestamp { get; init; } - public string Source { get; init; } - public string MessageType { get; init; } + public required DateTimeOffset Timestamp { get; init; } + public required string Source { get; init; } + public required string MessageType { get; init; } public string SchemaVersion { get; init; } = "1.0"; - public T Payload { get; init; } - public MessagePriority Priority { get; init; } + public MessagePriority Priority { get; init; } = MessagePriority.Normal; + public required T Payload { get; init; } + public Dictionary Metadata { get; init; } = new(); public string? ReplyTo { get; init; } public DateTimeOffset? ExpiresAt { get; init; } public int? SequenceNumber { get; init; } public int? TotalCount { get; init; } public MessageIntent? Intent { get; init; } - public Dictionary Metadata { get; init; } } ``` @@ -152,7 +152,7 @@ The consumer side uses `IMessageBrokerConsumer`: ```csharp // Location: src/Ingestion/IMessageBrokerConsumer.cs -public interface IMessageBrokerConsumer +public interface IMessageBrokerConsumer : IAsyncDisposable { Task SubscribeAsync( string topic, diff --git a/EnterpriseIntegrationPlatform/tutorials/05-message-brokers.md b/EnterpriseIntegrationPlatform/tutorials/05-message-brokers.md index 922c149..f5af7ea 100644 --- a/EnterpriseIntegrationPlatform/tutorials/05-message-brokers.md +++ b/EnterpriseIntegrationPlatform/tutorials/05-message-brokers.md @@ -105,7 +105,7 @@ public interface IMessageBrokerProducer } // src/Ingestion/IMessageBrokerConsumer.cs -public interface IMessageBrokerConsumer +public interface IMessageBrokerConsumer : IAsyncDisposable { Task SubscribeAsync( string topic, diff --git a/EnterpriseIntegrationPlatform/tutorials/06-messaging-channels.md b/EnterpriseIntegrationPlatform/tutorials/06-messaging-channels.md index 0fddae4..df2f487 100644 --- a/EnterpriseIntegrationPlatform/tutorials/06-messaging-channels.md +++ b/EnterpriseIntegrationPlatform/tutorials/06-messaging-channels.md @@ -160,9 +160,17 @@ public interface IInvalidMessageChannel IntegrationEnvelope envelope, string reason, CancellationToken cancellationToken = default); + + Task RouteRawInvalidAsync( + string rawData, + string sourceTopic, + string reason, + CancellationToken cancellationToken = default); } ``` +> `RouteInvalidAsync` handles messages that were parsed into an envelope but failed validation. `RouteRawInvalidAsync` handles raw data that could not be deserialized into an envelope at all. + ### How It Works ``` diff --git a/EnterpriseIntegrationPlatform/tutorials/07-temporal-workflows.md b/EnterpriseIntegrationPlatform/tutorials/07-temporal-workflows.md index 5faa34a..f4c07f4 100644 --- a/EnterpriseIntegrationPlatform/tutorials/07-temporal-workflows.md +++ b/EnterpriseIntegrationPlatform/tutorials/07-temporal-workflows.md @@ -58,32 +58,60 @@ public class IntegrationPipelineWorkflow { // Step 1: Persist the message (status: Pending) await Workflow.ExecuteActivityAsync( - (IntegrationActivities a) => a.PersistMessageAsync(input), - ActivityOptions); + (PipelineActivities act) => act.PersistMessageAsync(input), + PipelineActivityOptions); - // Step 2: Validate the message - var validationResult = await Workflow.ExecuteActivityAsync( - (IntegrationActivities a) => a.ValidateMessageAsync(input), - ActivityOptions); + // Step 2: Log Received lifecycle event + await Workflow.ExecuteActivityAsync( + (PipelineActivities act) => + act.LogStageAsync(input.MessageId, input.MessageType, "Received"), + PipelineActivityOptions); + + // Step 3: Validate the message + var validation = await Workflow.ExecuteActivityAsync( + (IntegrationActivities act) => + act.ValidateMessageAsync(input.MessageType, input.PayloadJson), + ValidationActivityOptions); - if (!validationResult.IsValid) + if (!validation.IsValid) { - // Publish Nack and route to DLQ + // Publish Nack and update status to Failed await Workflow.ExecuteActivityAsync( - (IntegrationActivities a) => a.PublishNackAsync(input, validationResult), - ActivityOptions); - return new IntegrationPipelineResult(input.MessageId, false, string.Join("; ", validationResult.Errors)); - } + (PipelineActivities act) => + act.UpdateDeliveryStatusAsync( + input.MessageId, input.CorrelationId, + input.Timestamp, "Failed"), + PipelineActivityOptions); - // Step 3: Update status to InFlight - await Workflow.ExecuteActivityAsync( - (IntegrationActivities a) => a.UpdateStatusAsync(input, DeliveryStatus.InFlight), - ActivityOptions); + if (input.NotificationsEnabled) + { + await Workflow.ExecuteActivityAsync( + (PipelineActivities act) => + act.PublishNackAsync( + input.MessageId, input.CorrelationId, + validation.Reason ?? "Validation failed", input.NackSubject), + PipelineActivityOptions); + } + + return new IntegrationPipelineResult(input.MessageId, false, validation.Reason); + } - // Step 4: Publish Ack + // Step 4: Update status to Delivered await Workflow.ExecuteActivityAsync( - (IntegrationActivities a) => a.PublishAckAsync(input), - ActivityOptions); + (PipelineActivities act) => + act.UpdateDeliveryStatusAsync( + input.MessageId, input.CorrelationId, + input.Timestamp, "Delivered"), + PipelineActivityOptions); + + // Step 5: Publish Ack + if (input.NotificationsEnabled) + { + await Workflow.ExecuteActivityAsync( + (PipelineActivities act) => + act.PublishAckAsync(input.MessageId, input.CorrelationId, input.AckSubject), + PipelineActivityOptions); + } return new IntegrationPipelineResult(input.MessageId, true); } @@ -119,49 +147,53 @@ Publish Nack with compensation details ``` ```csharp -// Simplified saga compensation flow +// src/Workflow.Temporal/Workflows/AtomicPipelineWorkflow.cs (simplified) [Workflow] public class AtomicPipelineWorkflow { [WorkflowRun] - public async Task RunAsync( + public async Task RunAsync( IntegrationPipelineInput input) { - var completedSteps = new Stack(); + var completedSteps = new List(); - try - { - // Execute steps, tracking each completed one - await ExecuteStep("persist", input); - completedSteps.Push("persist"); - - await ExecuteStep("validate", input); - completedSteps.Push("validate"); - - await ExecuteStep("transform", input); - completedSteps.Push("transform"); + // Step 1: Persist message as Pending + await Workflow.ExecuteActivityAsync( + (PipelineActivities act) => act.PersistMessageAsync(input), + PipelineActivityOptions); + completedSteps.Add("PersistMessage"); - await ExecuteStep("deliver", input); - completedSteps.Push("deliver"); + // Step 2: Validate message + var validation = await Workflow.ExecuteActivityAsync( + (IntegrationActivities act) => + act.ValidateMessageAsync(input.MessageType, input.PayloadJson), + ValidationActivityOptions); - await PublishAck(input); - return new IntegrationPipelineResult(input.MessageId, true); - } - catch (Exception ex) + if (!validation.IsValid) { - // Compensate in reverse order - while (completedSteps.Count > 0) + // Compensate all previously completed steps in reverse order + foreach (var step in Enumerable.Reverse(completedSteps)) { - var step = completedSteps.Pop(); await Workflow.ExecuteActivityAsync( - (SagaCompensationActivities a) => - a.CompensateAsync(step, input), - CompensationOptions); + (SagaCompensationActivities act) => + act.CompensateStepAsync(input.CorrelationId, step), + CompensationActivityOptions); } - await PublishNack(input, ex); - return new IntegrationPipelineResult(input.MessageId, false, ex.Message); + // Save fault and publish Nack + return new AtomicPipelineResult( + input.MessageId, false, validation.Reason); } + + // Step 3: Update status to Delivered and Publish Ack + await Workflow.ExecuteActivityAsync( + (PipelineActivities act) => + act.UpdateDeliveryStatusAsync( + input.MessageId, input.CorrelationId, + input.Timestamp, "Delivered"), + PipelineActivityOptions); + + return new AtomicPipelineResult(input.MessageId, true); } } ``` @@ -174,43 +206,60 @@ Activities are the building blocks that workflows orchestrate. Each activity is ```csharp // src/Workflow.Temporal/Activities/IntegrationActivities.cs (simplified) +// Handles validation and processing-stage logging [Activity] public class IntegrationActivities { - [ActivityMethod] - public async Task PersistMessageAsync(IntegrationPipelineInput input) + [Activity] + public async Task ValidateMessageAsync( + string messageType, string payloadJson) { - // Save message to Cassandra with status: Pending - await _persistence.SaveMessageAsync(input.Envelope, DeliveryStatus.Pending); + // Validate the message against schema and business rules + return await _validation.ValidateAsync(messageType, payloadJson); } - [ActivityMethod] - public async Task ValidateMessageAsync( - IntegrationPipelineInput input) + [Activity] + public async Task LogProcessingStageAsync( + Guid messageId, string messageType, string stage) { - // Validate the message against schema and business rules - return await _validation.ValidateAsync(input.Envelope); + // Record a lifecycle stage for observability + } +} + +// src/Workflow.Temporal/Activities/PipelineActivities.cs (simplified) +// Handles persistence, delivery status, acknowledgments, and faults + +[Activity] +public class PipelineActivities +{ + [Activity] + public async Task PersistMessageAsync(IntegrationPipelineInput input) + { + // Save message to Cassandra with status: Pending + await _persistence.SaveMessageAsync(input); } - [ActivityMethod] - public async Task PublishAckAsync(IntegrationPipelineInput input) + [Activity] + public async Task PublishAckAsync( + Guid messageId, Guid correlationId, string topic) { // Publish acknowledgment to Ack topic - await _notification.PublishAckAsync(input.Envelope); + await _notification.PublishAckAsync(messageId, correlationId, topic); } - [ActivityMethod] + [Activity] public async Task PublishNackAsync( - IntegrationPipelineInput input, - ValidationResult result) + Guid messageId, Guid correlationId, string reason, string topic) { // Publish negative acknowledgment to Nack topic - await _notification.PublishNackAsync(input.Envelope, result.Errors); + await _notification.PublishNackAsync(messageId, correlationId, reason, topic); } } ``` +> **Note:** Activities are split across two classes: `IntegrationActivities` (validation and logging) and `PipelineActivities` (persistence and notifications). A third class, `SagaCompensationActivities`, handles rollback (see Tutorial 47). + ### Activity Design Principles 1. **Stateless** — Activities don't hold state between executions diff --git a/EnterpriseIntegrationPlatform/tutorials/08-activities-pipeline.md b/EnterpriseIntegrationPlatform/tutorials/08-activities-pipeline.md index af9448b..64404b0 100644 --- a/EnterpriseIntegrationPlatform/tutorials/08-activities-pipeline.md +++ b/EnterpriseIntegrationPlatform/tutorials/08-activities-pipeline.md @@ -184,11 +184,11 @@ The orchestrator wraps the message in pipeline input and dispatches to Temporal: ```csharp // Simplified from src/Demo.Pipeline/PipelineOrchestrator.cs -public class PipelineOrchestrator : IPipelineOrchestrator +public sealed class PipelineOrchestrator : IPipelineOrchestrator { - public async Task ProcessAsync( - IntegrationEnvelope envelope, - CancellationToken ct) + public async Task ProcessAsync( + IntegrationEnvelope envelope, + CancellationToken cancellationToken = default) { var input = new IntegrationPipelineInput { @@ -197,7 +197,7 @@ public class PipelineOrchestrator : IPipelineOrchestrator // ... map from envelope to workflow input }; - await _dispatcher.DispatchAsync(input, ct); + await _dispatcher.DispatchAsync(input, cancellationToken); } } ``` diff --git a/EnterpriseIntegrationPlatform/tutorials/17-normalizer.md b/EnterpriseIntegrationPlatform/tutorials/17-normalizer.md index 8d5d7e1..f52eb39 100644 --- a/EnterpriseIntegrationPlatform/tutorials/17-normalizer.md +++ b/EnterpriseIntegrationPlatform/tutorials/17-normalizer.md @@ -3,7 +3,7 @@ ## What You'll Learn - The EIP Normalizer pattern for converting diverse formats to a canonical model -- How `INormalizer` / `MessageNormalizer` auto-detects JSON, XML, CSV, and flat-file +- How `INormalizer` / `MessageNormalizer` auto-detects JSON, XML, and CSV - The `NormalizationResult` with `DetectedFormat` and `WasTransformed` - `NormalizerOptions` for CSV delimiter, header mode, and strict content-type handling - Why a canonical model simplifies downstream processing @@ -18,8 +18,7 @@ ``` JSON ──────▶ ┌──────────────┐ XML ──────▶ │ Normalizer │──▶ Canonical JSON - CSV ──────▶ │ │ - Flat ──────▶ └──────────────┘ + CSV ──────▶ └──────────────┘ ``` External systems send data in many formats. The Normalizer detects the incoming format and converts it to the platform's canonical representation (JSON), so every downstream component only needs to understand one format. @@ -45,7 +44,7 @@ public interface INormalizer The `MessageNormalizer` class implements `INormalizer`. It: 1. Inspects the `contentType` parameter (or payload content if `StrictContentType = false`) -2. Detects whether the payload is JSON, XML, CSV, or flat-file +2. Detects whether the payload is JSON, XML, or CSV 3. Applies the appropriate conversion to produce canonical JSON 4. Returns the result with the detected format @@ -56,7 +55,7 @@ The `MessageNormalizer` class implements `INormalizer`. It: public sealed record NormalizationResult( string Payload, string OriginalContentType, - string DetectedFormat, // "JSON", "XML", "CSV", "FlatFile" + string DetectedFormat, // "JSON", "XML", "CSV" bool WasTransformed); ``` diff --git a/EnterpriseIntegrationPlatform/tutorials/26-message-replay.md b/EnterpriseIntegrationPlatform/tutorials/26-message-replay.md index 567f9ed..1f29df2 100644 --- a/EnterpriseIntegrationPlatform/tutorials/26-message-replay.md +++ b/EnterpriseIntegrationPlatform/tutorials/26-message-replay.md @@ -43,9 +43,7 @@ Messages are stored as they flow through the pipeline. When a replay is requeste // src/Processing.Replay/IMessageReplayer.cs public interface IMessageReplayer { - Task ReplayAsync( - ReplayFilter filter, - CancellationToken cancellationToken = default); + Task ReplayAsync(ReplayFilter filter, CancellationToken ct); } ``` @@ -64,7 +62,7 @@ public interface IMessageReplayStore ```csharp // src/Processing.Replay/ReplayFilter.cs -public sealed record ReplayFilter +public record ReplayFilter { public Guid? CorrelationId { get; init; } public string? MessageType { get; init; } diff --git a/EnterpriseIntegrationPlatform/tutorials/28-competing-consumers.md b/EnterpriseIntegrationPlatform/tutorials/28-competing-consumers.md index faf2a0f..ddc6781 100644 --- a/EnterpriseIntegrationPlatform/tutorials/28-competing-consumers.md +++ b/EnterpriseIntegrationPlatform/tutorials/28-competing-consumers.md @@ -51,10 +51,10 @@ public sealed class CompetingConsumerOrchestrator : BackgroundService var lagInfo = await _lagMonitor.GetLagAsync( _options.TargetTopic, _options.ConsumerGroup, stoppingToken); - if (lagInfo.TotalLag > _options.ScaleUpThreshold) + if (lagInfo.CurrentLag >= _options.ScaleUpThreshold) await _scaler.ScaleAsync( Math.Min(_scaler.CurrentCount + 1, _options.MaxConsumers), stoppingToken); - else if (lagInfo.TotalLag < _options.ScaleDownThreshold + else if (lagInfo.CurrentLag <= _options.ScaleDownThreshold && _scaler.CurrentCount > _options.MinConsumers) await _scaler.ScaleAsync(_scaler.CurrentCount - 1, stoppingToken); @@ -78,12 +78,15 @@ public interface IConsumerLagMonitor CancellationToken ct = default); } -public sealed record ConsumerLagInfo( - long TotalLag, - int ActiveConsumers, - DateTimeOffset MeasuredAt); +public record ConsumerLagInfo( + string ConsumerGroup, + string Topic, + long CurrentLag, + DateTimeOffset Timestamp); ``` +> **Note:** `ConsumerLagInfo` tracks lag per consumer group and topic. The active consumer count is available separately via `IConsumerScaler.CurrentCount`. + ### IConsumerScaler ```csharp diff --git a/EnterpriseIntegrationPlatform/tutorials/29-throttle-rate-limiting.md b/EnterpriseIntegrationPlatform/tutorials/29-throttle-rate-limiting.md index ec1ff09..5aa89ce 100644 --- a/EnterpriseIntegrationPlatform/tutorials/29-throttle-rate-limiting.md +++ b/EnterpriseIntegrationPlatform/tutorials/29-throttle-rate-limiting.md @@ -57,7 +57,7 @@ public interface IMessageThrottle ```csharp // src/Processing.Throttle/TokenBucketThrottle.cs -public sealed class TokenBucketThrottle : IMessageThrottle +public sealed class TokenBucketThrottle : IMessageThrottle, IDisposable { // Each partition key gets its own bucket // Tokens refill at policy.RefillRate per second @@ -69,21 +69,25 @@ public sealed class TokenBucketThrottle : IMessageThrottle ```csharp // src/Processing.Throttle/ThrottleResult.cs -public sealed record ThrottleResult( - bool Permitted, - TimeSpan WaitTime, - double RemainingTokens, - string? RejectionReason = null); +public sealed record ThrottleResult +{ + public required bool Permitted { get; init; } + public required TimeSpan WaitTime { get; init; } + public required int RemainingTokens { get; init; } + public string? RejectionReason { get; init; } +} ``` ### ThrottlePartitionKey ```csharp // src/Processing.Throttle/ThrottlePartitionKey.cs -public sealed record ThrottlePartitionKey( - string? TenantId = null, - string? Queue = null, - string? Endpoint = null); +public sealed record ThrottlePartitionKey +{ + public string? TenantId { get; init; } + public string? Queue { get; init; } + public string? Endpoint { get; init; } +} ``` | Key Property | Use Case | @@ -110,13 +114,15 @@ public interface IThrottleRegistry ```csharp // src/Processing.Throttle/ThrottleMetrics.cs -public sealed record ThrottleMetrics( - long TotalAcquired, - long TotalRejected, - double AvailableTokens, - double BurstCapacity, - double RefillRate, - TimeSpan TotalWaitTime); +public sealed record ThrottleMetrics +{ + public required long TotalAcquired { get; init; } + public required long TotalRejected { get; init; } + public required int AvailableTokens { get; init; } + public required int BurstCapacity { get; init; } + public required int RefillRate { get; init; } + public required TimeSpan TotalWaitTime { get; init; } +} ``` ### Rate Limiting vs Throttling diff --git a/EnterpriseIntegrationPlatform/tutorials/30-rule-engine.md b/EnterpriseIntegrationPlatform/tutorials/30-rule-engine.md index 290002f..f748d8e 100644 --- a/EnterpriseIntegrationPlatform/tutorials/30-rule-engine.md +++ b/EnterpriseIntegrationPlatform/tutorials/30-rule-engine.md @@ -38,8 +38,8 @@ Rules are evaluated in priority order. The first match determines the action, de // src/RuleEngine/IRuleEngine.cs public interface IRuleEngine { - Task EvaluateAsync( - IntegrationEnvelope envelope, + Task EvaluateAsync( + IntegrationEnvelope envelope, CancellationToken cancellationToken = default); } ``` diff --git a/EnterpriseIntegrationPlatform/tutorials/31-event-sourcing.md b/EnterpriseIntegrationPlatform/tutorials/31-event-sourcing.md index 02798b6..b92ea65 100644 --- a/EnterpriseIntegrationPlatform/tutorials/31-event-sourcing.md +++ b/EnterpriseIntegrationPlatform/tutorials/31-event-sourcing.md @@ -118,7 +118,7 @@ public static class TemporalQuery public interface ISnapshotStore { Task SaveAsync(string streamId, TState state, long version, CancellationToken ct = default); - Task<(TState? State, long Version)?> LoadAsync(string streamId, CancellationToken ct = default); + Task<(TState? State, long Version)> LoadAsync(string streamId, CancellationToken ct = default); } ``` diff --git a/EnterpriseIntegrationPlatform/tutorials/32-multi-tenancy.md b/EnterpriseIntegrationPlatform/tutorials/32-multi-tenancy.md index a38f038..39d16a2 100644 --- a/EnterpriseIntegrationPlatform/tutorials/32-multi-tenancy.md +++ b/EnterpriseIntegrationPlatform/tutorials/32-multi-tenancy.md @@ -106,6 +106,10 @@ public interface ITenantOnboardingService Task DeprovisionAsync( string tenantId, CancellationToken cancellationToken = default); + + Task GetStatusAsync( + string tenantId, + CancellationToken cancellationToken = default); } public sealed record TenantOnboardingRequest( diff --git a/EnterpriseIntegrationPlatform/tutorials/33-security.md b/EnterpriseIntegrationPlatform/tutorials/33-security.md index 1639c52..18c4bda 100644 --- a/EnterpriseIntegrationPlatform/tutorials/33-security.md +++ b/EnterpriseIntegrationPlatform/tutorials/33-security.md @@ -70,8 +70,8 @@ public interface IPayloadSizeGuard // src/Security/PayloadTooLargeException.cs public sealed class PayloadTooLargeException : Exception { - public long ActualSize { get; } - public long MaxAllowedSize { get; } + public int ActualBytes { get; } + public int MaxBytes { get; } } ``` @@ -100,21 +100,30 @@ JWT authentication is used at the Gateway API layer. Tokens carry tenant identit // src/Security.Secrets/ISecretProvider.cs public interface ISecretProvider { - Task GetSecretAsync(string key, CancellationToken ct = default); - Task SetSecretAsync(string key, string value, CancellationToken ct = default); + Task GetSecretAsync( + string key, string? version = null, CancellationToken ct = default); + Task SetSecretAsync( + string key, string value, + IReadOnlyDictionary? metadata = null, + CancellationToken ct = default); Task DeleteSecretAsync(string key, CancellationToken ct = default); - Task> ListSecretKeysAsync(CancellationToken ct = default); + Task> ListSecretKeysAsync( + string? prefix = null, CancellationToken ct = default); } public sealed record SecretEntry( string Key, string Value, - int Version, + string Version, DateTimeOffset CreatedAt, - Dictionary Metadata); + DateTimeOffset? ExpiresAt = null, + IReadOnlyDictionary? Metadata = null) +{ + public bool IsExpired => ExpiresAt.HasValue && ExpiresAt.Value <= DateTimeOffset.UtcNow; +} ``` -`GetSecretAsync` returns a `SecretEntry?` containing the value along with version, creation timestamp, and metadata — or `null` if the key does not exist. `DeleteSecretAsync` returns `true` if the key was deleted, `false` if it did not exist. `ListSecretKeysAsync` returns all known key names. +`GetSecretAsync` returns a `SecretEntry?` containing the value along with version, creation timestamp, and metadata — or `null` if the key does not exist. The optional `version` parameter allows retrieving a specific version of a secret. `SetSecretAsync` returns the newly created `SecretEntry` (with version and timestamp) and accepts optional metadata. `DeleteSecretAsync` returns `true` if the key was deleted, `false` if it did not exist. `ListSecretKeysAsync` returns all known key names, optionally filtered by prefix. Two implementations are provided: - `AzureKeyVaultSecretProvider` — integrates with Azure Key Vault using managed identity diff --git a/EnterpriseIntegrationPlatform/tutorials/42-configuration.md b/EnterpriseIntegrationPlatform/tutorials/42-configuration.md index dbcc929..b5d8639 100644 --- a/EnterpriseIntegrationPlatform/tutorials/42-configuration.md +++ b/EnterpriseIntegrationPlatform/tutorials/42-configuration.md @@ -76,13 +76,15 @@ public interface IFeatureFlagService ```csharp // src/Configuration/FeatureFlag.cs -public sealed record FeatureFlag +public sealed record FeatureFlag( + string Name, + bool IsEnabled = false, + Dictionary? Variants = null, + int RolloutPercentage = 100, + List? TargetTenants = null) { - public required string Name { get; init; } - public bool IsEnabled { get; init; } - public Dictionary Variants { get; init; } = new(); - public int RolloutPercentage { get; init; } = 100; - public IReadOnlyList TargetTenants { get; init; } = Array.Empty(); + public Dictionary Variants { get; init; } = Variants ?? new(); + public List TargetTenants { get; init; } = TargetTenants ?? []; } ``` @@ -97,15 +99,17 @@ public sealed record FeatureFlag ```csharp // src/Configuration/ConfigurationChangeNotifier.cs -public sealed class ConfigurationChangeNotifier : IObservable +public sealed class ConfigurationChangeNotifier : IObservable, IDisposable { public void Publish(ConfigurationChange change); public IDisposable Subscribe(IObserver observer); } public sealed record ConfigurationChange( - string Key, string? Value, string Environment, - ConfigurationChangeType ChangeType, DateTimeOffset Timestamp); + string Key, string Environment, + ConfigurationChangeType ChangeType, + string? OldValue, string? NewValue, + DateTimeOffset Timestamp); public enum ConfigurationChangeType { Created, Updated, Deleted } ``` @@ -119,10 +123,10 @@ The `EnvironmentOverrideProvider` reads environment variables using the conventi ### NotificationFeatureFlags ```csharp -// src/Configuration/NotificationFeatureFlags.cs +// src/Activities/NotificationFeatureFlags.cs public static class NotificationFeatureFlags { - public const string NotificationsEnabled = "notifications.enabled"; + public const string NotificationsEnabled = "Notifications.Enabled"; } ``` diff --git a/EnterpriseIntegrationPlatform/tutorials/45-performance-profiling.md b/EnterpriseIntegrationPlatform/tutorials/45-performance-profiling.md index 769a43f..126127c 100644 --- a/EnterpriseIntegrationPlatform/tutorials/45-performance-profiling.md +++ b/EnterpriseIntegrationPlatform/tutorials/45-performance-profiling.md @@ -40,7 +40,7 @@ Capture CPU and runtime profiling snapshots to identify hot paths: ```csharp -public sealed class ContinuousProfiler +public sealed class ContinuousProfiler : IContinuousProfiler { public ContinuousProfiler(ILogger logger, IOptions options) { /* ... */ } @@ -68,18 +68,18 @@ public sealed class ContinuousProfiler Track GC activity and detect memory issues: ```csharp -public sealed class GcMonitor +public sealed class GcMonitor : IGcMonitor { public GcSnapshot CaptureSnapshot() { return new GcSnapshot { - Gen0 = GC.CollectionCount(0), - Gen1 = GC.CollectionCount(1), - Gen2 = GC.CollectionCount(2), - TotalMemoryMb = GC.GetTotalMemory(forceFullCollection: false) / (1024.0 * 1024.0), + Gen0Collections = GC.CollectionCount(0), + Gen1Collections = GC.CollectionCount(1), + Gen2Collections = GC.CollectionCount(2), + TotalCommittedBytes = GC.GetTotalMemory(forceFullCollection: false), IsServerGc = GCSettings.IsServerGC, - Timestamp = DateTimeOffset.UtcNow + // ... additional properties: heap sizes, fragmentation, pause times }; } diff --git a/EnterpriseIntegrationPlatform/tutorials/46-complete-integration.md b/EnterpriseIntegrationPlatform/tutorials/46-complete-integration.md index 67359a4..7eb32b8 100644 --- a/EnterpriseIntegrationPlatform/tutorials/46-complete-integration.md +++ b/EnterpriseIntegrationPlatform/tutorials/46-complete-integration.md @@ -52,7 +52,7 @@ Gateway.Api wraps this in an `IntegrationEnvelope` and publishes to the broker. ## Step 2: Broker Receives and Queues -The message enters the configured broker (Kafka, RabbitMQ, or NATS): +The message enters the configured broker (Kafka, NATS JetStream, or Pulsar): ``` ┌─────────────────────────────────────────┐ @@ -74,21 +74,59 @@ The worker starts an `IntegrationPipelineWorkflow` (or `AtomicPipelineWorkflow` for saga compensation). Temporal manages retries and state. ```csharp +// src/Workflow.Temporal/Workflows/IntegrationPipelineWorkflow.cs (simplified) +[Workflow] public class IntegrationPipelineWorkflow { - public async Task RunAsync(IntegrationPipelineInput input) + [WorkflowRun] + public async Task RunAsync(IntegrationPipelineInput input) { - var validated = await ExecuteActivity(input); - var transformed = await ExecuteActivity(validated); - var routed = await ExecuteActivity(transformed); - var delivered = await ExecuteActivity(routed); + // Step 1: Persist message to storage + await Workflow.ExecuteActivityAsync( + (PipelineActivities act) => act.PersistMessageAsync(input), + PipelineActivityOptions); + + // Step 2: Validate message schema and content + var validation = await Workflow.ExecuteActivityAsync( + (IntegrationActivities act) => + act.ValidateMessageAsync(input.MessageType, input.PayloadJson), + ValidationActivityOptions); + + if (!validation.IsValid) + { + if (input.NotificationsEnabled) + { + await Workflow.ExecuteActivityAsync( + (PipelineActivities act) => + act.PublishNackAsync(input.MessageId, input.CorrelationId, + validation.Reason ?? "Validation failed", input.NackSubject), + PipelineActivityOptions); + } + return new IntegrationPipelineResult(input.MessageId, false, validation.Reason); + } + + // Steps 3-4: Transform and Route are handled externally via + // the Normalizer and Content-Based Router patterns — the workflow + // publishes to the appropriate channel and downstream consumers + // handle format conversion and routing decisions. + + // Step 5: Publish success acknowledgment if (input.NotificationsEnabled) - await ExecuteActivity(delivered); - return delivered.Result; + { + await Workflow.ExecuteActivityAsync( + (PipelineActivities act) => + act.PublishAckAsync(input.MessageId, input.CorrelationId, + input.AckSubject), + PipelineActivityOptions); + } + + return new IntegrationPipelineResult(input.MessageId, true); } } ``` +> **Note:** The workflow orchestrates three activity classes: `IntegrationActivities` (validation), `PipelineActivities` (persistence and notifications), and `SagaCompensationActivities` (rollback — see Tutorial 47). Transform and routing are handled by separate pipeline consumers, not by individual workflow activities. + ## Step 4: Validate The validation activity checks schema compliance, required fields, and diff --git a/EnterpriseIntegrationPlatform/tutorials/47-saga-compensation.md b/EnterpriseIntegrationPlatform/tutorials/47-saga-compensation.md index 176df34..b9f7f35 100644 --- a/EnterpriseIntegrationPlatform/tutorials/47-saga-compensation.md +++ b/EnterpriseIntegrationPlatform/tutorials/47-saga-compensation.md @@ -32,109 +32,153 @@ ## AtomicPipelineWorkflow Structure ```csharp +// src/Workflow.Temporal/Workflows/AtomicPipelineWorkflow.cs (simplified) +[Workflow] public class AtomicPipelineWorkflow { - private readonly List _completedSteps = new(); - - public async Task RunAsync(IntegrationPipelineInput input) + [WorkflowRun] + public async Task RunAsync(IntegrationPipelineInput input) { - try + var completedSteps = new List(); + + // Step 1: Persist message as Pending + await Workflow.ExecuteActivityAsync( + (PipelineActivities act) => act.PersistMessageAsync(input), + PipelineActivityOptions); + completedSteps.Add("PersistMessage"); + + // Step 2: Log Received lifecycle event + await Workflow.ExecuteActivityAsync( + (PipelineActivities act) => + act.LogStageAsync(input.MessageId, input.MessageType, "Received"), + PipelineActivityOptions); + completedSteps.Add("LogReceived"); + + // Step 3: Validate message + var validation = await Workflow.ExecuteActivityAsync( + (IntegrationActivities act) => + act.ValidateMessageAsync(input.MessageType, input.PayloadJson), + ValidationActivityOptions); + + if (!validation.IsValid) { - var validated = await ExecuteTracked("Validate", input); - var transformed = await ExecuteTracked("Transform", validated); - var routed = await ExecuteTracked("Route", transformed); - var delivered = await ExecuteTracked("Deliver", routed); + // Nack path: compensate all completed steps, then Nack + return await HandleNackWithRollbackAsync( + input, completedSteps, validation.Reason ?? "Validation failed"); + } - if (input.NotificationsEnabled) - await PublishAck(delivered); + // Step 4: Update status to Delivered + await Workflow.ExecuteActivityAsync( + (PipelineActivities act) => + act.UpdateDeliveryStatusAsync( + input.MessageId, input.CorrelationId, + input.Timestamp, "Delivered"), + PipelineActivityOptions); - return PipelineResult.Success(delivered); - } - catch (ActivityFailedException ex) + // Step 5: Publish Ack (only if notifications enabled) + if (input.NotificationsEnabled) { - if (input.NotificationsEnabled) - await PublishNack(ex); - - await CompensateAsync(); - return PipelineResult.Failed(ex); + await Workflow.ExecuteActivityAsync( + (PipelineActivities act) => + act.PublishAckAsync(input.MessageId, input.CorrelationId, input.AckSubject), + PipelineActivityOptions); } + + return new AtomicPipelineResult(input.MessageId, true); } } ``` ## Step Tracking -Each activity execution is tracked before proceeding: +Each successfully completed activity is recorded in a `List`. If a later step fails, the workflow knows exactly which steps need compensation: ```csharp -private async Task ExecuteTracked( - string stepName, object input) -{ - var result = await ExecuteActivity(stepName, input); - _completedSteps.Add(new CompletedStep - { - Name = stepName, - CompletedAt = DateTime.UtcNow, - CompensationData = result.CompensationData - }); - return result; -} +var completedSteps = new List(); + +// After each activity completes successfully: +await Workflow.ExecuteActivityAsync( + (PipelineActivities act) => act.PersistMessageAsync(input), + PipelineActivityOptions); +completedSteps.Add("PersistMessage"); // Track it + +await Workflow.ExecuteActivityAsync( + (PipelineActivities act) => + act.LogStageAsync(input.MessageId, input.MessageType, "Received"), + PipelineActivityOptions); +completedSteps.Add("LogReceived"); // Track it ``` ``` -Completed Steps Stack (LIFO for compensation): +Completed Steps List (reversed for compensation): ┌─────────────────┐ -│ 3. Route │ ← compensate first -├─────────────────┤ -│ 2. Transform │ ← compensate second +│ 2. LogReceived │ ← compensate first ├─────────────────┤ -│ 1. Validate │ ← compensate last +│ 1. PersistMessage│ ← compensate second └─────────────────┘ ``` ## Reverse-Order Compensation -`SagaCompensationActivities` undo steps in reverse order: +`HandleNackWithRollbackAsync` compensates steps in reverse order via `SagaCompensationActivities`: ```csharp -private async Task CompensateAsync() +// src/Workflow.Temporal/Workflows/AtomicPipelineWorkflow.cs (simplified) +private async Task HandleNackWithRollbackAsync( + IntegrationPipelineInput input, + List completedSteps, + string failureReason) { - // Reverse order: last completed step compensated first - for (int i = _completedSteps.Count - 1; i >= 0; i--) + var compensatedSteps = new List(); + + // Compensate in reverse order (last committed step first) + foreach (var step in Enumerable.Reverse(completedSteps)) { - var step = _completedSteps[i]; - await ExecuteActivity( - $"Compensate{step.Name}", - step.CompensationData); + try + { + var success = await Workflow.ExecuteActivityAsync( + (SagaCompensationActivities act) => + act.CompensateStepAsync(input.CorrelationId, step), + CompensationActivityOptions); + + if (success) + compensatedSteps.Add(step); + } + catch (Exception) + { + // Log but continue — partial compensation is better than none + } } + + // Save fault, update status to Failed, publish Nack... + return new AtomicPipelineResult( + input.MessageId, false, failureReason, + compensatedSteps.AsReadOnly()); } ``` ``` - Forward execution: Compensation (on failure): + Forward execution: Compensation (on failure): - Validate ──────▶ ◀────── UndoValidate - Transform ─────▶ ◀────── UndoTransform - Route ─────────▶ ◀────── UndoRoute - Deliver ───✗ FAIL (not completed, skip) + PersistMessage ──────▶ ◀────── CompensateStepAsync("PersistMessage") + LogReceived ─────────▶ ◀────── CompensateStepAsync("LogReceived") + Validate ────────✗ FAIL (not completed, skip) ``` ## Partial Compensation -If compensation itself fails, the workflow records the partial state: +If compensation itself fails, the workflow catches the exception and continues with remaining steps: ```csharp -catch (Exception compensationEx) +catch (Exception) { - // Log and continue compensating remaining steps - _logger.LogError(compensationEx, - "Compensation failed for step {Step}", step.Name); - failedCompensations.Add(step.Name); - // Do NOT throw — attempt all remaining compensations + // Log but continue — partial compensation is better than none. + // The step is NOT added to compensatedSteps, so the result + // tracks exactly which steps were successfully rolled back. } ``` -Uncompensated steps are flagged for manual review via Admin.Api. +Uncompensated steps are visible in the `AtomicPipelineResult.CompensatedSteps` list — operators can compare it with the original `completedSteps` to identify gaps requiring manual review via Admin.Api. ## Real-World Example: E-Commerce Order @@ -154,16 +198,22 @@ Uncompensated steps are flagged for manual review via Admin.Api. ``` ```csharp -public class SagaCompensationActivities +// src/Workflow.Temporal/Activities/SagaCompensationActivities.cs +public sealed class SagaCompensationActivities { - public async Task CompensateChargePayment(PaymentData data) - { - await _paymentService.RefundAsync(data.TransactionId, data.Amount); - } + private readonly ICompensationActivityService _compensationService; + private readonly IMessageLoggingService _logging; - public async Task CompensateReserveInventory(InventoryData data) + [Activity] + public async Task CompensateStepAsync(Guid correlationId, string stepName) { - await _inventoryService.ReleaseAsync(data.Sku, data.Quantity); + await _logging.LogAsync(correlationId, stepName, $"CompensationStarted:{stepName}"); + var success = await _compensationService.CompensateAsync(correlationId, stepName); + var stage = success + ? $"CompensationSucceeded:{stepName}" + : $"CompensationFailed:{stepName}"; + await _logging.LogAsync(correlationId, stepName, stage); + return success; } } ``` @@ -200,11 +250,11 @@ available in a distributed system without two-phase commit. ## Exercises -1. What happens if `CompensateChargePayment` fails with a network timeout? +1. What happens if `CompensateStepAsync` fails with a network timeout? Design a retry policy that balances urgency (customer refund) with safety - (no double refund). + (no double refund). See `CompensationActivityOptions` in the source. -2. Add a fourth step "Send Confirmation Email" to the saga. What does its +2. Add a fourth step "SendConfirmation" to the saga. What does its compensation look like? Is email compensation even possible? 3. Compare the `IntegrationPipelineWorkflow` and `AtomicPipelineWorkflow`. diff --git a/EnterpriseIntegrationPlatform/tutorials/48-notification-use-cases.md b/EnterpriseIntegrationPlatform/tutorials/48-notification-use-cases.md index b09cc8d..ae6a9b9 100644 --- a/EnterpriseIntegrationPlatform/tutorials/48-notification-use-cases.md +++ b/EnterpriseIntegrationPlatform/tutorials/48-notification-use-cases.md @@ -16,7 +16,7 @@ │ Notification Stack │ │ │ │ IntegrationPipelineInput.NotificationsEnabled (per-msg) │ -│ NotificationFeatureFlags.Enabled (global) │ +│ NotificationFeatureFlags.NotificationsEnabled (global) │ │ IFeatureFlagService (toggle) │ │ INotificationMapper / XmlNotificationMapper (format) │ │ NatsNotificationActivityService (publish) │ @@ -30,7 +30,7 @@ public interface INotificationMapper string MapNack(Guid messageId, Guid correlationId, string errorMessage); } -public class XmlNotificationMapper : INotificationMapper +public sealed class XmlNotificationMapper : INotificationMapper { public string MapAck(Guid messageId, Guid correlationId) => "ok"; @@ -183,6 +183,9 @@ maintenance windows without changing individual integration configurations. ``` ```csharp +// Conceptual pseudocode — the actual notification logic lives inside +// IntegrationPipelineWorkflow and AtomicPipelineWorkflow (see Tutorials 46–47). +// This example illustrates the decision flow for reference: public class NotificationDecisionService { public async Task HandleDeliveryResultAsync( @@ -190,7 +193,7 @@ public class NotificationDecisionService { if (!input.NotificationsEnabled) return; // UC1 if (!await _featureFlags.IsEnabledAsync( - NotificationFeatureFlags.Enabled)) return; // UC4/UC5 + NotificationFeatureFlags.NotificationsEnabled)) return; // UC4/UC5 if (result.Success) await _notificationService.PublishAckAsync( // UC2 diff --git a/EnterpriseIntegrationPlatform/tutorials/49-testing-integrations.md b/EnterpriseIntegrationPlatform/tutorials/49-testing-integrations.md index 0442599..1033ac6 100644 --- a/EnterpriseIntegrationPlatform/tutorials/49-testing-integrations.md +++ b/EnterpriseIntegrationPlatform/tutorials/49-testing-integrations.md @@ -78,24 +78,33 @@ public class XmlNotificationMapperTests - **NSubstitute** for mocking interfaces: ```csharp -[SetUp] -public void SetUp() +// Example from tests/UnitTests/XmlNotificationMapperTests.cs +[TestFixture] +public class XmlNotificationMapperTests { - _featureFlags = Substitute.For(); - _notificationService = Substitute.For(); - _sut = new NotificationDecisionService(_featureFlags, _notificationService); -} + private XmlNotificationMapper _sut = null!; -[Test] -public async Task WhenFeatureFlagDisabled_SkipsNotification() -{ - _featureFlags.IsEnabledAsync(Arg.Any()) - .Returns(Task.FromResult(false)); + [SetUp] + public void SetUp() + { + _sut = new XmlNotificationMapper(); + } - await _sut.HandleDeliveryResultAsync(successResult, inputWithNotifications); + [Test] + public void MapAck_ReturnsXmlAckOk() + { + var result = _sut.MapAck(Guid.NewGuid(), Guid.NewGuid()); + + Assert.That(result, Is.EqualTo("ok")); + } - await _notificationService.DidNotReceive() - .PublishAckAsync(Arg.Any()); + [Test] + public void MapNack_ReturnsXmlNackWithErrorMessage() + { + var result = _sut.MapNack(Guid.NewGuid(), Guid.NewGuid(), "Connection timed out"); + + Assert.That(result, Is.EqualTo("not ok because of Connection timed out")); + } } ``` @@ -249,8 +258,10 @@ integration tests confirm that DLQ routing works with real brokers. ## Exercises -1. Write a unit test for the `NotificationDecisionService` that covers all 5 - use cases (UC1–UC5). Use NSubstitute to mock `IFeatureFlagService`. +1. Write a unit test for `XmlNotificationMapper` that verifies XML special + characters (e.g., `<`, `&`, `"`) are properly escaped in error messages. + Use the existing `MapNack_EscapesXmlSpecialCharactersInErrorMessage` test + in `tests/UnitTests/XmlNotificationMapperTests.cs` as a reference. 2. Create a Testcontainers integration test that verifies dead-letter queue routing when a consumer fails to process a message.