diff --git a/EnterpriseIntegrationPlatform/rules/milestones.md b/EnterpriseIntegrationPlatform/rules/milestones.md index 08538dd..c2a6d6c 100644 --- a/EnterpriseIntegrationPlatform/rules/milestones.md +++ b/EnterpriseIntegrationPlatform/rules/milestones.md @@ -22,57 +22,17 @@ ## Completed Phases -✅ Phases 1–21 complete — see `rules/completion-log.md` for full history. +✅ Phases 1–24 complete — see `rules/completion-log.md` for full history. -**Current stats:** 1,518 UnitTests + 58 Contract + 29 Workflow + 17 Integration + 10 Load + 19 Vitest = **1,651 total tests**. 48 src projects. +48 src projects. All 50 tutorials rewritten with BizTalk-style Lab + Exam exercises focused on EIP patterns, scalability, and atomicity. -**Next chunk:** Phase 22 complete — all 13 chunks (080-092) done. +**Next chunk:** (none — all current work complete) --- -### Phase 19 — Tutorial Audit as New Developer (Round 6) - -✅ Phase 19 complete — see `rules/completion-log.md`. - -### Phase 20 — Tutorial Audit as New Developer (Round 7) - -✅ Phase 20 complete — fixed 7 tutorials (03, 17, 26, 28, 29, 45, 48) plus INormalizer.cs xmldoc. - -### Phase 21 — Tutorial Code Snippet Accuracy Audit - -✅ Phase 21 complete — fixed 4 tutorials (26, 31, 35, 38) with code snippets mismatched against actual source code. - ---- - -### Phase 22 — Implement Unfulfilled Tutorial Promises - -**Scope:** Audit of all 50 tutorials against source code found 13 features that tutorials promise but are not implemented. These chunks implement the missing features so that every tutorial claim is backed by working code. - -#### Chunk 090 — EnvironmentOverrideProvider: EIP__ Environment Variable Convention - -| Field | Value | -|-------|-------| -| Status | `not-started` | -| Tutorial | 42 — Configuration (line 121) | -| Claim | "The `EnvironmentOverrideProvider` reads environment variables using the convention `EIP__Key__SubKey` (double underscore as separator). Environment variables take precedence over store values." | -| Current State | `EnvironmentOverrideProvider` only does cascading resolution from the `IConfigurationStore`. It never reads `System.Environment.GetEnvironmentVariable()`. | -| Implementation | In `ResolveAsync`, before falling back to the store, check `Environment.GetEnvironmentVariable($"EIP__{key.Replace(":", "__")}")`. If found, return a synthetic `ConfigurationEntry` with that value. Add `ResolveManyAsync` override similarly. Add unit tests using environment variable injection. | -| Files | `src/Configuration/EnvironmentOverrideProvider.cs`, `tests/UnitTests/EnvironmentOverrideProviderTests.cs` | - -#### Chunk 092 — Kustomize Base Directory Structure - -| Field | Value | -|-------|-------| -| Status | `done` | -| Tutorial | 43 — Kubernetes Deployment (lines 91-104) | -| Claim | Tutorial shows flat `base/` with `deployment.yaml` and `service.yaml`. | -| Current State | Actual structure has `base/admin-api/` and `base/openclaw-web/` subdirectories. | -| Implementation | Updated tutorial 43 to match the actual directory structure (service-specific subdirectories, namespace.yaml, prod PDB files). | -| Files | `tutorials/43-kubernetes-deployment.md` | - ## Next Chunk -Phase 22 complete — all 13 chunks (080-092) done. +(none) --- diff --git a/EnterpriseIntegrationPlatform/tests/UnitTests/ScatterGathererTests.cs b/EnterpriseIntegrationPlatform/tests/UnitTests/ScatterGathererTests.cs new file mode 100644 index 0000000..3a9d17a --- /dev/null +++ b/EnterpriseIntegrationPlatform/tests/UnitTests/ScatterGathererTests.cs @@ -0,0 +1,376 @@ +using EnterpriseIntegrationPlatform.Contracts; +using EnterpriseIntegrationPlatform.Ingestion; +using EnterpriseIntegrationPlatform.Processing.ScatterGather; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using NSubstitute; +using NUnit.Framework; + +namespace EnterpriseIntegrationPlatform.Tests.Unit; + +[TestFixture] +public class ScatterGathererTests +{ + private IMessageBrokerProducer _producer = null!; + private ILogger> _logger = null!; + + private ScatterGatherer CreateSut(int timeoutMs = 5_000, int maxRecipients = 50) + { + var options = Options.Create(new ScatterGatherOptions + { + TimeoutMs = timeoutMs, + MaxRecipients = maxRecipients, + }); + + return new ScatterGatherer(_producer, options, _logger); + } + + [SetUp] + public void SetUp() + { + _producer = Substitute.For(); + _logger = Substitute.For>>(); + } + + // ── Constructor guards ─────────────────────────────────────────── + + [Test] + public void Ctor_NullProducer_ThrowsArgumentNullException() + { + Assert.Throws(() => + new ScatterGatherer( + null!, + Options.Create(new ScatterGatherOptions()), + _logger)); + } + + [Test] + public void Ctor_NullOptions_ThrowsArgumentNullException() + { + Assert.Throws(() => + new ScatterGatherer( + _producer, + null!, + _logger)); + } + + [Test] + public void Ctor_NullLogger_ThrowsArgumentNullException() + { + Assert.Throws(() => + new ScatterGatherer( + _producer, + Options.Create(new ScatterGatherOptions()), + null!)); + } + + // ── ScatterGatherAsync ─────────────────────────────────────────── + + [Test] + public void ScatterGatherAsync_NullRequest_ThrowsArgumentNullException() + { + var sut = CreateSut(); + + Assert.ThrowsAsync(() => + sut.ScatterGatherAsync(null!)); + } + + [Test] + public async Task ScatterGatherAsync_EmptyRecipients_ReturnsEmptyResultNotTimedOut() + { + var sut = CreateSut(); + var request = new ScatterRequest(Guid.NewGuid(), "payload", []); + + var result = await sut.ScatterGatherAsync(request); + + Assert.That(result.Responses, Is.Empty); + Assert.That(result.TimedOut, Is.False); + Assert.That(result.Duration, Is.EqualTo(TimeSpan.Zero)); + Assert.That(result.CorrelationId, Is.EqualTo(request.CorrelationId)); + } + + [Test] + public async Task ScatterGatherAsync_NullRecipients_ReturnsEmptyResultNotTimedOut() + { + var sut = CreateSut(); + var request = new ScatterRequest(Guid.NewGuid(), "payload", null!); + + var result = await sut.ScatterGatherAsync(request); + + Assert.That(result.Responses, Is.Empty); + Assert.That(result.TimedOut, Is.False); + } + + [Test] + public void ScatterGatherAsync_ExceedsMaxRecipients_ThrowsArgumentException() + { + var sut = CreateSut(maxRecipients: 2); + var request = new ScatterRequest( + Guid.NewGuid(), + "payload", + ["topic-a", "topic-b", "topic-c"]); + + var ex = Assert.ThrowsAsync(() => + sut.ScatterGatherAsync(request)); + + Assert.That(ex!.Message, Does.Contain("3")); + Assert.That(ex.Message, Does.Contain("2")); + } + + [Test] + public void ScatterGatherAsync_DuplicateCorrelationId_ThrowsInvalidOperationException() + { + var sut = CreateSut(timeoutMs: 60_000); + var correlationId = Guid.NewGuid(); + + // First request: block the producer so the operation stays active + _producer.PublishAsync(Arg.Any>(), Arg.Any(), Arg.Any()) + .Returns(new TaskCompletionSource().Task); // never completes + + var cts = new CancellationTokenSource(); + var request1 = new ScatterRequest(correlationId, "p1", ["topic-a"]); + var firstTask = sut.ScatterGatherAsync(request1, cts.Token); + + // Second request with same correlationId + var request2 = new ScatterRequest(correlationId, "p2", ["topic-b"]); + + Assert.ThrowsAsync(() => + sut.ScatterGatherAsync(request2)); + + cts.Cancel(); + } + + [Test] + public async Task ScatterGatherAsync_AllRecipientsRespond_ReturnsAllResponsesNotTimedOut() + { + var sut = CreateSut(timeoutMs: 5_000); + var correlationId = Guid.NewGuid(); + var recipients = new[] { "topic-a", "topic-b" }; + var request = new ScatterRequest(correlationId, "payload", recipients); + + // When scatter publishes, submit responses immediately + _producer + .When(p => p.PublishAsync(Arg.Any>(), Arg.Any(), Arg.Any())) + .Do(_ => + { + // Submit both responses after scatter completes + Task.Run(async () => + { + await Task.Delay(50); + await sut.SubmitResponseAsync(correlationId, + new GatherResponse("topic-a", "resp-a", DateTimeOffset.UtcNow, true, null)); + await sut.SubmitResponseAsync(correlationId, + new GatherResponse("topic-b", "resp-b", DateTimeOffset.UtcNow, true, null)); + }); + }); + + var result = await sut.ScatterGatherAsync(request); + + Assert.That(result.Responses, Has.Count.EqualTo(2)); + Assert.That(result.TimedOut, Is.False); + Assert.That(result.CorrelationId, Is.EqualTo(correlationId)); + Assert.That(result.Duration, Is.GreaterThan(TimeSpan.Zero)); + } + + [Test] + public async Task ScatterGatherAsync_PublishesToAllRecipients_VerifiesPublishCalls() + { + var sut = CreateSut(timeoutMs: 200); + var correlationId = Guid.NewGuid(); + var recipients = new[] { "topic-a", "topic-b", "topic-c" }; + var request = new ScatterRequest(correlationId, "payload", recipients); + + _producer.PublishAsync(Arg.Any>(), Arg.Any(), Arg.Any()) + .Returns(Task.CompletedTask); + + await sut.ScatterGatherAsync(request); + + await _producer.Received(3) + .PublishAsync(Arg.Any>(), Arg.Any(), Arg.Any()); + + foreach (var topic in recipients) + { + await _producer.Received(1) + .PublishAsync(Arg.Any>(), topic, Arg.Any()); + } + } + + [Test] + public async Task ScatterGatherAsync_Timeout_ReturnsPartialResultsWithTimedOutTrue() + { + var sut = CreateSut(timeoutMs: 200); + var correlationId = Guid.NewGuid(); + var request = new ScatterRequest(correlationId, "payload", ["topic-a", "topic-b"]); + + // Submit only one response (on the first publish only) + var submitted = 0; + _producer + .When(p => p.PublishAsync(Arg.Any>(), Arg.Any(), Arg.Any())) + .Do(_ => + { + if (Interlocked.Increment(ref submitted) == 1) + { + Task.Run(async () => + { + await Task.Delay(50); + await sut.SubmitResponseAsync(correlationId, + new GatherResponse("topic-a", "resp-a", DateTimeOffset.UtcNow, true, null)); + }); + } + }); + + var result = await sut.ScatterGatherAsync(request); + + Assert.That(result.Responses, Has.Count.EqualTo(1)); + Assert.That(result.TimedOut, Is.True); + } + + // ── SubmitResponseAsync ────────────────────────────────────────── + + [Test] + public void SubmitResponseAsync_NullResponse_ThrowsArgumentNullException() + { + var sut = CreateSut(); + + Assert.ThrowsAsync(() => + sut.SubmitResponseAsync(Guid.NewGuid(), null!)); + } + + [Test] + public async Task SubmitResponseAsync_UnknownCorrelationId_ReturnsFalse() + { + var sut = CreateSut(); + var response = new GatherResponse("topic-a", "resp", DateTimeOffset.UtcNow, true, null); + + var accepted = await sut.SubmitResponseAsync(Guid.NewGuid(), response); + + Assert.That(accepted, Is.False); + } + + // ── ScatterGatherResult ────────────────────────────────────────── + + [Test] + public void ScatterGatherResult_RecordProperties_RetainValues() + { + var id = Guid.NewGuid(); + var responses = new List> + { + new("topic-a", "resp-a", DateTimeOffset.UtcNow, true, null), + }; + var duration = TimeSpan.FromMilliseconds(123); + + var result = new ScatterGatherResult(id, responses, TimedOut: true, duration); + + Assert.That(result.CorrelationId, Is.EqualTo(id)); + Assert.That(result.Responses, Has.Count.EqualTo(1)); + Assert.That(result.TimedOut, Is.True); + Assert.That(result.Duration, Is.EqualTo(duration)); + } + + // ── GatherResponse ─────────────────────────────────────────────── + + [Test] + public void GatherResponse_RecordProperties_RetainValues() + { + var now = DateTimeOffset.UtcNow; + var response = new GatherResponse("topic-a", "resp", now, false, "error msg"); + + Assert.That(response.Recipient, Is.EqualTo("topic-a")); + Assert.That(response.Payload, Is.EqualTo("resp")); + Assert.That(response.ReceivedAt, Is.EqualTo(now)); + Assert.That(response.IsSuccess, Is.False); + Assert.That(response.ErrorMessage, Is.EqualTo("error msg")); + } + + // ── ScatterRequest ─────────────────────────────────────────────── + + [Test] + public void ScatterRequest_RecordProperties_RetainValues() + { + var id = Guid.NewGuid(); + var recipients = new[] { "a", "b" }; + var request = new ScatterRequest(id, "payload", recipients); + + Assert.That(request.CorrelationId, Is.EqualTo(id)); + Assert.That(request.Payload, Is.EqualTo("payload")); + Assert.That(request.Recipients, Is.EquivalentTo(recipients)); + } +} + +[TestFixture] +public class ScatterGatherOptionsTests +{ + [Test] + public void TimeoutMs_Default_Is30000() + { + var options = new ScatterGatherOptions(); + Assert.That(options.TimeoutMs, Is.EqualTo(30_000)); + } + + [Test] + public void MaxRecipients_Default_Is50() + { + var options = new ScatterGatherOptions(); + Assert.That(options.MaxRecipients, Is.EqualTo(50)); + } + + [Test] + public void Properties_WhenSet_RetainValues() + { + var options = new ScatterGatherOptions + { + TimeoutMs = 5_000, + MaxRecipients = 10, + }; + + Assert.That(options.TimeoutMs, Is.EqualTo(5_000)); + Assert.That(options.MaxRecipients, Is.EqualTo(10)); + } +} + +[TestFixture] +public class ScatterGatherServiceExtensionsTests +{ + [Test] + public void AddScatterGather_NullServices_ThrowsArgumentNullException() + { + var config = new ConfigurationBuilder().Build(); + + Assert.Throws(() => + ScatterGatherServiceExtensions.AddScatterGather(null!, config)); + } + + [Test] + public void AddScatterGather_NullConfiguration_ThrowsArgumentNullException() + { + var services = new ServiceCollection(); + + Assert.Throws(() => + services.AddScatterGather(null!)); + } + + [Test] + public void AddScatterGather_ValidArgs_RegistersServices() + { + var services = new ServiceCollection(); + var config = new ConfigurationBuilder() + .AddInMemoryCollection(new Dictionary + { + ["ScatterGather:TimeoutMs"] = "10000", + ["ScatterGather:MaxRecipients"] = "25", + }) + .Build(); + + services.AddSingleton(Substitute.For()); + services.AddLogging(); + services.AddScatterGather(config); + + var sp = services.BuildServiceProvider(); + var instance = sp.GetService>(); + + Assert.That(instance, Is.Not.Null); + Assert.That(instance, Is.InstanceOf>()); + } +} diff --git a/EnterpriseIntegrationPlatform/tutorials/01-introduction.md b/EnterpriseIntegrationPlatform/tutorials/01-introduction.md index e00561a..f62f59b 100644 --- a/EnterpriseIntegrationPlatform/tutorials/01-introduction.md +++ b/EnterpriseIntegrationPlatform/tutorials/01-introduction.md @@ -152,13 +152,49 @@ By the end of this course, you'll understand how to: --- -## Exercises +## Lab -1. **Explore the EIP website**: Visit [enterpriseintegrationpatterns.com](https://www.enterpriseintegrationpatterns.com/patterns/messaging/toc.html) and browse the pattern catalog. Pick three patterns and think about where you've seen them (or could use them) in your own work. +**Objective:** Map EIP pattern categories to concrete platform components and trace how the Pipes and Filters architecture enables scalable message processing. -2. **Review the architecture**: Read [`docs/architecture-overview.md`](../docs/architecture-overview.md) and identify how the platform's data flow maps to the Pipes and Filters pattern. +### Step 1: Map Patterns to Projects -3. **Count the patterns**: Look at [`docs/eip-mapping.md`](../docs/eip-mapping.md) and count how many of the 65 EIP patterns are implemented in the platform. +Open [`docs/eip-mapping.md`](../docs/eip-mapping.md). For each of the following EIP categories, identify the `src/` project that implements it and the primary interface it exposes: + +| Category | Project | Interface | +|----------|---------|-----------| +| Message Construction | `src/Contracts/` | ? | +| Content-Based Router | `src/Processing.Routing/` | ? | +| Message Translator | `src/Processing.Translator/` | ? | +| Splitter | `src/Processing.Splitter/` | ? | +| Dead Letter Channel | `src/Processing.DeadLetter/` | ? | + +### Step 2: Trace the Pipes and Filters Chain + +Open [`docs/architecture-overview.md`](../docs/architecture-overview.md) and trace how a single message flows through the platform: Ingress → Broker → Workflow → Activities → Connectors. For each stage, write down which EIP pattern it implements and how the platform guarantees **atomicity** (hint: look at Temporal workflows and Ack/Nack). + +### Step 3: Evaluate Scalability Points + +Identify three places in the architecture where **horizontal scaling** is possible without code changes. Consider: broker partitions, Competing Consumers (`src/Processing.CompetingConsumers/`), and workflow workers. For each, explain what happens to in-flight messages when a new instance is added. + +## Exam + +1. Which integration style does the EIP book recommend for loosely coupled, asynchronous communication between systems? + - A) File Transfer + - B) Shared Database + - C) Messaging + - D) Remote Procedure Invocation + +2. In the Pipes and Filters pattern, what property must each filter maintain to allow independent scaling? + - A) Global mutable state shared across filters + - B) Stateless processing with all context carried in the message envelope + - C) Direct method calls to the next filter in the chain + - D) A persistent database connection for every filter + +3. How does the platform guarantee **zero message loss** when a processing step fails mid-pipeline? + - A) Messages are stored in memory and retried indefinitely + - B) Temporal workflows provide durable execution with saga compensation — either all steps complete or compensating actions roll back committed work + - C) The broker automatically resends messages every 5 seconds + - D) Failed messages are silently discarded to avoid blocking the pipeline --- diff --git a/EnterpriseIntegrationPlatform/tutorials/02-environment-setup.md b/EnterpriseIntegrationPlatform/tutorials/02-environment-setup.md index 831f898..3aaf44d 100644 --- a/EnterpriseIntegrationPlatform/tutorials/02-environment-setup.md +++ b/EnterpriseIntegrationPlatform/tutorials/02-environment-setup.md @@ -76,7 +76,7 @@ This downloads all NuGet packages defined in `Directory.Packages.props` (central dotnet build EnterpriseIntegrationPlatform.sln ``` -A clean build should complete with **0 errors**. The solution contains 44+ projects — this takes 30–60 seconds on first build. +A clean build should complete with **0 errors**. The solution contains many projects — this takes 30–60 seconds on first build. ### Step 4: Run the Tests @@ -86,14 +86,14 @@ dotnet test EnterpriseIntegrationPlatform.sln The test suite includes: -| Test Project | Count | Description | -|-------------|-------|-------------| -| UnitTests | 1,100+ | Fast, isolated tests for every component | -| ContractTests | 58 | Contract verification between services | -| WorkflowTests | 29 | Temporal workflow behavior tests | -| IntegrationTests | 17 | Testcontainers-based tests with real infrastructure | -| PlaywrightTests | 13 | End-to-end browser tests for OpenClaw UI | -| LoadTests | 10 | Performance and throughput benchmarks | +| Test Project | Description | +|-------------|-------------| +| UnitTests | Fast, isolated tests for every component (most numerous) | +| ContractTests | Contract verification between services | +| WorkflowTests | Temporal workflow behavior tests | +| IntegrationTests | Testcontainers-based tests with real infrastructure | +| PlaywrightTests | End-to-end browser tests for OpenClaw UI | +| LoadTests | Performance and throughput benchmarks | > **Note:** IntegrationTests and PlaywrightTests require Docker to be running. @@ -144,7 +144,7 @@ The dashboard at `https://localhost:15888` (or the URL shown in console output) ``` EnterpriseIntegrationPlatform/ -├── src/ # Source code (44+ projects) +├── src/ # Source code │ ├── AppHost/ # .NET Aspire orchestrator │ ├── ServiceDefaults/ # Shared OpenTelemetry & health checks │ ├── Contracts/ # IntegrationEnvelope & shared interfaces @@ -246,13 +246,63 @@ You need `Microsoft.NETCore.App 10.x.x` and `Microsoft.AspNetCore.App 10.x.x`. --- -## Exercises +## Lab -1. **Explore the solution**: Open the `.sln` file in your IDE and browse the project list. Count how many `Processing.*` projects exist. +**Objective:** Build the solution, launch the Aspire orchestrator, and explore how the platform's service topology implements the EIP Messaging Gateway and Control Bus patterns. -2. **Read the tests**: Open `tests/UnitTests/` and browse the test namespaces. Each namespace corresponds to a `src/` project. +### Step 1: Build and Launch -3. **Explore Aspire**: Launch the AppHost and click through the Aspire dashboard. Find the health check endpoints for each service. +Open a terminal in the repository root and execute: + +```bash +dotnet restore EnterpriseIntegrationPlatform.sln +dotnet build EnterpriseIntegrationPlatform.sln +``` + +Confirm the build succeeds with zero errors and zero warnings. + +### Step 2: Explore the Aspire Service Topology + +Start the orchestrator: + +```bash +cd src/AppHost +dotnet run +``` + +Open the Aspire dashboard URL printed in the console. Identify each service and classify it by EIP role: + +| Service | EIP Role | +|---------|----------| +| Gateway.Api | Messaging Gateway — single entry point for external systems | +| Admin.Api | Control Bus — runtime administration and monitoring | +| OpenClaw.Web | ? (identify its role) | + +Click each resource's health endpoint. Explain why health checks are essential for **scalability** — what happens when a load balancer cannot determine service health? + +### Step 3: Trace a Message Path Through Services + +Using the Aspire dashboard's **Traces** tab, identify the OpenTelemetry spans created when a message enters the Gateway. Draw the message flow: Gateway → Broker → Workflow → Activities → Connector. For each hop, note which EIP pattern is being applied (e.g., Gateway = Messaging Gateway, Broker = Message Channel, Workflow = Process Manager). + +## Exam + +1. In the EIP Messaging Gateway pattern, what is the gateway's primary responsibility? + - A) Transform message payloads between formats + - B) Provide a single entry point that encapsulates messaging-specific logic and shields external systems from internal broker details + - C) Store messages permanently in a database + - D) Route messages based on content inspection + +2. Why does the platform use .NET Aspire to orchestrate services rather than starting each service manually? + - A) Aspire encrypts all inter-service communication automatically + - B) Aspire ensures services start in dependency order with shared configuration, health checks, and observability — critical for a distributed integration platform's operational reliability + - C) Manual startup is not supported by .NET 10 + - D) Aspire compiles all services into a single executable + +3. How does the Control Bus pattern (implemented by Admin.Api) support **operational scalability**? + - A) It routes business messages to faster consumers + - B) It provides centralized runtime management — feature flags, DLQ resubmission, and health monitoring — without modifying or redeploying processing pipelines + - C) It increases the number of broker partitions automatically + - D) It caches all messages in memory for faster retrieval --- diff --git a/EnterpriseIntegrationPlatform/tutorials/03-first-message.md b/EnterpriseIntegrationPlatform/tutorials/03-first-message.md index e8f6d85..96951f4 100644 --- a/EnterpriseIntegrationPlatform/tutorials/03-first-message.md +++ b/EnterpriseIntegrationPlatform/tutorials/03-first-message.md @@ -266,16 +266,62 @@ public class IntegrationEnvelopeTests --- -## Exercises +## Lab -1. **Trace a CorrelationId**: Imagine you publish a message, it gets split into 5 parts, each part gets transformed, and then they're aggregated back together. Which field ensures they all stay linked? What would `CausationId` be set to on each split message? +**Objective:** Create an `IntegrationEnvelope`, publish it to a Message Channel, and trace the Correlation Identifier through a publish-subscribe round-trip. -2. **Choose the Intent**: For each scenario, pick the correct `MessageIntent`: - - "Process this payment" → ? - - "Here is the quarterly report" → ? - - "A new customer registered" → ? +### Step 1: Create and Inspect an Integration Envelope -3. **Broker independence**: Why does the platform use `IMessageBrokerProducer` instead of calling Kafka/NATS directly? What happens when you switch from NATS to Pulsar? +Using the static factory method, create an envelope and inspect the EIP Message pattern fields it populates automatically: + +```csharp +var envelope = IntegrationEnvelope.Create( + payload: "{\"orderId\": 42, \"amount\": 99.95}", + source: "OrderService", + messageType: "order.created"); +``` + +Verify: `MessageId` is a non-empty `Guid` (Message Identity), `CorrelationId` is generated (Correlation Identifier pattern), `Timestamp` is UTC (for ordering and expiration), and `Priority` defaults to `Normal`. + +### Step 2: Trace the Message Lifecycle + +Draw the 8-step message lifecycle from the tutorial on paper or whiteboard: + +``` +CREATE → PUBLISH → PERSIST → CONSUME → WORKFLOW → ACTIVITIES → ACK/NACK → OBSERVE +``` + +For each step, identify: (a) which EIP pattern applies, (b) where **atomicity** is enforced (hint: PERSIST ensures durability, WORKFLOW ensures all-or-nothing), and (c) which step enables **scalability** through parallel processing (hint: CONSUME with consumer groups). + +### Step 3: Design a Multi-Consumer Topology + +Imagine you need both an **analytics service** and a **billing service** to receive `order.created` messages. Design the consumer group configuration: + +- Analytics: consumer group = `"analytics-processors"` (receives every message) +- Billing: consumer group = `"billing-processors"` (receives every message) +- Within billing, 3 instances share the load + +Explain which EIP patterns are at play: **Publish-Subscribe Channel** (different groups) vs. **Competing Consumers** (same group, multiple instances). Why does this design scale without code changes? + +## Exam + +1. What is the purpose of the `CorrelationId` field on `IntegrationEnvelope`? + - A) It uniquely identifies a single message in the broker's storage + - B) It links all messages that belong to the same logical business transaction, even across splits, transformations, and aggregations + - C) It stores the consumer group name for load balancing + - D) It provides the encryption key for message payloads + +2. Which `MessageIntent` value should be assigned to a message that instructs a downstream service to perform an action (e.g., "process this payment")? + - A) `MessageIntent.Event` + - B) `MessageIntent.Document` + - C) `MessageIntent.Command` + - D) There is no distinction — all messages are treated identically + +3. How does the broker abstraction (`IMessageBrokerProducer` / `IMessageBrokerConsumer`) support **atomic processing** in the message lifecycle? + - A) It encrypts every message before publishing + - B) It ensures the message is durably persisted in the broker before returning from `PublishAsync`, so the message survives producer crashes + - C) It compresses the payload to reduce latency + - D) It creates a database transaction around the publish call --- diff --git a/EnterpriseIntegrationPlatform/tutorials/04-integration-envelope.md b/EnterpriseIntegrationPlatform/tutorials/04-integration-envelope.md index f38d256..e119054 100644 --- a/EnterpriseIntegrationPlatform/tutorials/04-integration-envelope.md +++ b/EnterpriseIntegrationPlatform/tutorials/04-integration-envelope.md @@ -215,13 +215,45 @@ All five envelopes share the same `CorrelationId`. This lets you: --- -## Exercises +## Lab -1. **Design an envelope**: You receive an XML invoice from a partner via SFTP. Design the `IntegrationEnvelope` fields — what would `Source`, `MessageType`, `Intent`, and key metadata entries be? +**Objective:** Build causation chains and sequenced message sets that demonstrate how the Envelope Wrapper pattern preserves **atomicity** and **traceability** across a multi-step integration pipeline. -2. **Trace the chain**: A message arrives, gets validated, split into 3 parts, each part is transformed, and all 3 are aggregated. How many envelopes are created total? Draw the `CausationId` chain. +### Step 1: Build a Causation Chain (Message Lineage) -3. **Expiration scenario**: A message has `ExpiresAt = now + 5 minutes`. Processing takes 6 minutes. What happens? Which component handles this? +Write code that simulates a three-step processing pipeline. Create an original envelope with `IntegrationEnvelope.Create()`. Then create a second envelope (transformation result) using a `with` expression — set its `CausationId` to the first envelope's `MessageId` and keep the same `CorrelationId`. Create a third envelope whose `CausationId` is the second. Verify all three share the same `CorrelationId` but have distinct `MessageId` values. + +This lineage is essential for **atomicity**: if step 3 fails, the saga compensation engine uses the `CausationId` chain to identify and roll back exactly the right upstream steps. + +### Step 2: Model a Splitter Output with Sequencing + +Create three envelopes representing a Splitter's output. Use `with` expressions to set `SequenceNumber` (0, 1, 2) and `TotalCount` (3). Also set `ExpiresAt` on one envelope to 5 minutes from now, and on another to a time in the past. Verify `IsExpired` returns the correct value. + +Explain why the **Message Expiration** pattern is critical for scalability: in a high-throughput system, stale messages must be routed to the Dead Letter Queue rather than consuming resources processing outdated data. + +### Step 3: Design an Atomicity Scenario + +Imagine an order message is split into 3 line-item messages. Line-item 2 fails delivery. Using the envelope fields (`CorrelationId`, `CausationId`, `SequenceNumber`, `TotalCount`), describe how the platform can: (a) identify all 3 messages as belonging to the same operation, (b) determine which specific message failed, and (c) trigger compensation for line-items 1 and 3 that already succeeded. + +## Exam + +1. Why is `IntegrationEnvelope` defined as a C# `record` rather than a `class`? + - A) Records are faster to serialize than classes + - B) Records provide immutability via `with` expressions, ensuring envelopes are never accidentally mutated during concurrent processing — critical for thread-safe scalability + - C) The .NET runtime requires records for generic types + - D) Records automatically encrypt their properties + +2. In a causation chain where message A is split into messages B₁, B₂, and B₃, what value should the `CausationId` of each split message contain? + - A) Its own `MessageId` + - B) The `CorrelationId` of message A + - C) The `MessageId` of message A — the parent that caused the split + - D) A new randomly generated `Guid` + +3. How does the `IsExpired` check contribute to the platform's **zero message loss** guarantee? + - A) Expired messages are silently dropped to save resources + - B) Expired messages are routed to the Dead Letter Queue with reason "expired", ensuring they are never silently lost but also don't consume processing capacity for stale data + - C) The broker automatically deletes expired messages + - D) `IsExpired` prevents messages from being published in the first place --- diff --git a/EnterpriseIntegrationPlatform/tutorials/05-message-brokers.md b/EnterpriseIntegrationPlatform/tutorials/05-message-brokers.md index f5af7ea..8439690 100644 --- a/EnterpriseIntegrationPlatform/tutorials/05-message-brokers.md +++ b/EnterpriseIntegrationPlatform/tutorials/05-message-brokers.md @@ -232,16 +232,59 @@ Is this task delivery (process and acknowledge)? --- -## Exercises +## Lab -1. **HOL blocking scenario**: You have 1,000 recipients. Recipient 500 has a message that takes 60 seconds to process. With Kafka (10 partitions), how many other recipients are potentially blocked? With NATS queue groups? +**Objective:** Design a broker topic hierarchy for a multi-tenant system and analyze how different broker architectures affect **scalability** and **message ordering guarantees**. -2. **Topic design**: Design the topic hierarchy for a system that processes invoices, payments, and refunds across 3 regions (US, EU, APAC). Use NATS subject hierarchy. +### Step 1: Design a Multi-Region Topic Hierarchy -3. **Broker selection**: For each scenario, choose the best broker: - - Real-time analytics dashboard consuming all order events - - Processing customer onboarding requests (each takes 5-30 seconds) - - Audit trail that must be retained for 7 years +Design a NATS subject hierarchy for a multi-region e-commerce system with: orders, payments, and refunds across three regions (US, EU, APAC). Use NATS conventions (`.` for levels, `*` for single-level wildcard, `>` for multi-level wildcard): + +``` +eip.{region}.{domain}.{event} +Example: eip.us.orders.created +``` + +Write subscriber patterns for: (a) all events in EU: `eip.eu.>`, (b) all order events globally: `eip.*.orders.*`, (c) only payment completions in APAC: `eip.apac.payments.completed`. + +Explain how this hierarchy enables **horizontal scalability** — new regions can be added without changing existing subscribers. + +### Step 2: Compare Broker Scalability Characteristics + +Create a comparison table for Kafka, NATS JetStream, and Pulsar: + +| Characteristic | Kafka | NATS JetStream | Pulsar | +|---------------|-------|----------------|--------| +| Ordering guarantee | Per-partition | Per-subject | Per-key (Key_Shared) | +| HOL blocking risk | ? | ? | ? | +| Multi-tenant isolation | ? | ? | ? | +| Scale-out mechanism | ? | ? | ? | + +For each cell, explain the implication for a platform processing 10,000 messages/second from 50 tenants. + +### Step 3: Design for Atomicity Across Broker Switches + +The platform uses `IMessageBrokerProducer` / `IMessageBrokerConsumer` to abstract the broker. Describe a scenario where switching from NATS to Kafka for a specific message type would change the **atomicity** guarantees (hint: Kafka's transactional producer vs. NATS at-least-once). What compensating design would the platform need? + +## Exam + +1. What is head-of-line (HOL) blocking and why is it a **scalability** problem? + - A) HOL blocking occurs when a slow message in a partition delays all subsequent messages; NATS queue groups avoid it because any available consumer can pick up any message + - B) HOL blocking is a network-layer issue that all brokers handle identically + - C) HOL blocking only affects messages with `MessagePriority.Low` + - D) HOL blocking means messages are delivered out of order + +2. Why does the platform define `IMessageBrokerProducer` and `IMessageBrokerConsumer` as abstractions rather than coding directly against a specific broker SDK? + - A) The broker SDKs do not support .NET 10 + - B) It allows the broker implementation to be swapped at deployment time without changing application code — enabling different scalability and atomicity trade-offs per workload + - C) Abstractions are required by the C# compiler for async methods + - D) Each broker uses a different serialization format + +3. When would you choose Apache Pulsar's Key_Shared subscription over Kafka's partition-based consumption for **multi-tenant scalability**? + - A) When you need strict global order across all keys + - B) When you want per-key ordering without cross-key head-of-line blocking — one tenant's slow processing should not affect others + - C) When your messages do not have any key + - D) When you require messages to be stored for less than 24 hours --- diff --git a/EnterpriseIntegrationPlatform/tutorials/06-messaging-channels.md b/EnterpriseIntegrationPlatform/tutorials/06-messaging-channels.md index df2f487..ae841b6 100644 --- a/EnterpriseIntegrationPlatform/tutorials/06-messaging-channels.md +++ b/EnterpriseIntegrationPlatform/tutorials/06-messaging-channels.md @@ -246,17 +246,61 @@ Here's how a typical message flow uses multiple channel types: --- -## Exercises +## Lab -1. **Channel selection**: For each scenario, choose the channel type: - - Processing incoming purchase orders (one processor per order) - - Notifying 5 different systems when a shipment is dispatched - - Handling messages that arrive as XML or JSON from different partners - - Quarantining messages with missing required fields +**Objective:** Classify messaging scenarios by channel type and design a channel topology that ensures **atomic delivery** and **scalable fan-out**. -2. **Bridge design**: Your company uses Kafka for everything but wants to add NATS for new microservices. Design a Messaging Bridge that keeps both systems in sync. +### Step 1: Map Scenarios to Channel Types -3. **Dead Letter Channel**: How does the Invalid Message Channel relate to the Dead Letter Channel? When would you use each? +For each scenario, identify the correct EIP channel pattern and the platform class that implements it: + +| Scenario | Channel Pattern | Platform Class | +|----------|----------------|----------------| +| Processing purchase orders (one processor per order) | ? | `PointToPointChannel` or `PublishSubscribeChannel`? | +| Notifying 5 systems when a shipment is dispatched | ? | ? | +| Handling messages in XML or JSON from different partners | ? | ? | +| Quarantining messages with missing required fields | ? | ? | + +Open `src/Processing.Channels/` and verify your answers against the actual implementations. + +### Step 2: Design a Messaging Bridge for Broker Migration + +Your company uses Kafka for all integrations but wants to add NATS for new microservices. Using the `MessagingBridge` class in `src/Processing.Channels/`, design a bridge configuration that: + +- Reads from Kafka topic `legacy.orders.created` +- Publishes to NATS subject `eip.orders.created` +- Preserves the `CorrelationId` and all `Metadata` across the bridge + +Draw the message flow and identify where **atomicity** could be lost (hint: what if the bridge crashes after reading from Kafka but before publishing to NATS?). How does the platform's Ack/Nack pattern mitigate this? + +### Step 3: Evaluate Scalability of Channel Patterns + +Compare Point-to-Point and Publish-Subscribe channels under high load: + +- Point-to-Point with 3 competing consumers processing 10,000 messages/second +- Pub-Sub with 5 subscriber groups, each with 2 consumers + +For each, explain: How does adding more consumers affect throughput? What happens to in-flight messages? Where is the bottleneck? + +## Exam + +1. In the EIP Messaging Bridge pattern, what is the bridge's primary responsibility? + - A) Transform message payloads between XML and JSON + - B) Connect two separate messaging systems while preserving message identity and metadata, enabling gradual broker migration without changing producers or consumers + - C) Compress messages to reduce broker storage requirements + - D) Route messages based on their content type header + +2. How does the Invalid Message Channel pattern contribute to **zero message loss**? + - A) Invalid messages are silently discarded to avoid poisoning downstream consumers + - B) Messages that cannot be parsed or violate schema rules are routed to a dedicated channel for inspection and reprocessing, ensuring they are never lost + - C) Invalid messages are automatically reformatted and retried + - D) The broker rejects invalid messages at the protocol level + +3. What is the key **scalability** difference between a Point-to-Point channel and a Publish-Subscribe channel? + - A) Point-to-Point channels cannot have multiple consumers + - B) In Point-to-Point, adding consumers distributes load (Competing Consumers); in Pub-Sub, adding subscriber groups creates independent copies of every message for parallel processing + - C) Publish-Subscribe channels are always faster than Point-to-Point + - D) Point-to-Point channels require Kafka while Pub-Sub requires NATS --- diff --git a/EnterpriseIntegrationPlatform/tutorials/07-temporal-workflows.md b/EnterpriseIntegrationPlatform/tutorials/07-temporal-workflows.md index f4c07f4..15b6a71 100644 --- a/EnterpriseIntegrationPlatform/tutorials/07-temporal-workflows.md +++ b/EnterpriseIntegrationPlatform/tutorials/07-temporal-workflows.md @@ -345,13 +345,60 @@ public class IntegrationPipelineWorkflowTests --- -## Exercises +## Lab -1. **Failure scenario**: A workflow has 4 steps. Step 3 fails. What does Temporal do? What happens if the worker crashes during Step 3's retry? +**Objective:** Trace how Temporal workflows enforce **atomic processing** with saga compensation, and design a failure recovery strategy for a multi-step integration pipeline. -2. **Saga compensation**: Design compensation for: (1) create customer record, (2) provision email account, (3) send welcome email. What compensates each step? +### Step 1: Trace a Failure Recovery Path -3. **Ack/Nack design**: An order processing workflow has 5 steps. Step 4 (warehouse check) says "out of stock." Should this be a Nack? What information should the Nack carry? +A workflow has 4 steps: Validate → Transform → Route → Deliver. Step 3 (Route) fails after Step 2 has already committed its result. Open `src/Workflow.Temporal/` and trace the code path: + +1. What does Temporal do when Step 3 throws an exception? (hint: retry policy) +2. If all retries are exhausted, how does the `AtomicPipelineWorkflow` trigger saga compensation? +3. What does `SagaCompensationActivities.CompensateStepAsync` do for Steps 1 and 2? + +Draw the timeline showing: original steps executed, failure point, compensation steps in reverse order. + +### Step 2: Design Compensation for a Business Scenario + +Design saga compensation for an order fulfilment workflow: + +| Step | Action | Compensation | +|------|--------|-------------| +| 1 | Create customer record in CRM | ? | +| 2 | Reserve inventory in warehouse | ? | +| 3 | Charge payment via gateway | ? | +| 4 | Send confirmation email | ? | + +For each compensation, identify: Is it idempotent? What happens if the compensation itself fails? How does the `CorrelationId` link the original action to its compensation? + +### Step 3: Evaluate Scalability of Workflow Workers + +Temporal workers poll task queues for workflow and activity tasks. Consider a scenario with 100 concurrent integrations: + +- How many workflow workers should you run? What happens when you add more? +- What is the relationship between worker count and **throughput**? +- Why does Temporal's durable execution model prevent duplicate processing even when workers scale horizontally? + +## Exam + +1. What happens when a Temporal workflow worker crashes in the middle of executing an activity? + - A) The message is lost permanently + - B) Another worker picks up the activity from the last checkpoint — Temporal's event history ensures exactly-once execution semantics with durable state + - C) The entire workflow restarts from Step 1 + - D) The broker automatically retries the message + +2. In the Saga Compensation pattern, why must compensation steps execute in **reverse order**? + - A) Reverse order is faster for the runtime to schedule + - B) Later steps may depend on earlier steps' state — compensating in reverse ensures each rollback sees a consistent state from the steps that preceded it + - C) The EIP book mandates reverse order for all patterns + - D) Temporal only supports reverse-order execution + +3. How does Temporal's durable execution model ensure **atomicity** across a multi-step integration pipeline? + - A) It wraps all steps in a database transaction + - B) It persists each step's completion in an event history — if a worker fails, another worker replays the history and resumes from the exact point of failure, never re-executing completed steps + - C) It locks the message broker partition until all steps complete + - D) It copies messages to a backup queue before processing --- diff --git a/EnterpriseIntegrationPlatform/tutorials/08-activities-pipeline.md b/EnterpriseIntegrationPlatform/tutorials/08-activities-pipeline.md index 6755cec..89280fb 100644 --- a/EnterpriseIntegrationPlatform/tutorials/08-activities-pipeline.md +++ b/EnterpriseIntegrationPlatform/tutorials/08-activities-pipeline.md @@ -296,13 +296,64 @@ public class PersistenceActivityTests --- -## Exercises +## Lab -1. **Design a pipeline**: You receive XML invoices via SFTP. Design the activity sequence: what activities do you need? In what order? +**Objective:** Design an activity pipeline for a real integration scenario, analyze failure modes, and identify where the Pipes and Filters pattern enables **independent scaling** of each stage. -2. **Failure handling**: Activity 3 (Transform) fails with a transient error. What happens? What if it fails with a permanent error (invalid schema)? +### Step 1: Design a Pipeline for XML Invoice Processing -3. **Extend the pipeline**: You need to add a "content enrichment" step that looks up customer data from a CRM API. Where in the activity chain would you add it? What interface would the activity use? +You receive XML invoices via SFTP. Design the complete activity sequence using the platform's activity classes: + +| Step | Activity | Class | Purpose | +|------|----------|-------|---------| +| 1 | Validate | `IntegrationActivities.ValidateMessageAsync` | Schema + payload checks | +| 2 | ? | ? | Sanitize input (XSS, SQL injection) | +| 3 | ? | ? | Transform XML → canonical JSON | +| 4 | ? | ? | Enrich with customer data from CRM | +| 5 | ? | ? | Route to correct downstream system | +| 6 | ? | ? | Deliver via HTTP connector | +| 7 | ? | ? | Persist to Cassandra | +| 8 | ? | ? | Send Ack/Nack notification | + +Open `src/Activities/` and `src/Workflow.Temporal/Activities/` to find the actual activity classes. + +### Step 2: Analyze Failure Modes and Atomicity + +For your pipeline above, analyze what happens at each failure point: + +- Step 3 fails with a **transient** error (network timeout) — what retry policy applies? +- Step 3 fails with a **permanent** error (invalid XML schema) — where does the message go? +- Step 6 fails after Step 7 already persisted — what compensation is needed? + +Explain how the Ack/Nack pattern at Step 8 ensures the originating system knows the final outcome, preserving **end-to-end atomicity**. + +### Step 3: Evaluate Per-Stage Scalability + +The Pipes and Filters pattern allows each activity to scale independently. For your pipeline: + +- Which step is likely the bottleneck under high load? (hint: external API calls) +- How would you scale Step 4 (CRM enrichment) without affecting Steps 1-3? +- What is the advantage of Temporal's activity-level retry over retrying the entire pipeline? + +## Exam + +1. In the Pipes and Filters pattern, what property must each filter (activity) maintain to allow **independent scaling**? + - A) All filters must share a single database connection + - B) Each filter processes the message using only the data in the envelope — no shared mutable state between filters — so multiple instances can run in parallel + - C) Filters must execute in a single thread to ensure ordering + - D) Each filter must cache results for the next filter + +2. Why does the platform split processing into separate activities (Validate, Transform, Route, Deliver) rather than a single monolithic handler? + - A) .NET requires separate classes for each async operation + - B) Separate activities enable independent retry policies, individual scaling, and granular saga compensation — a failure in Transform doesn't require re-running Validate + - C) Temporal cannot execute more than one method per workflow + - D) Separate activities reduce the total number of code lines + +3. What happens when an activity fails with a permanent error (e.g., invalid schema) in this platform? + - A) The workflow retries indefinitely until the message becomes valid + - B) The message is routed to the Dead Letter Queue with the failure reason, a Nack notification is sent to the originating system, and the workflow terminates cleanly + - C) The activity silently drops the message + - D) The Temporal worker crashes and restarts --- diff --git a/EnterpriseIntegrationPlatform/tutorials/09-content-based-router.md b/EnterpriseIntegrationPlatform/tutorials/09-content-based-router.md index 896cb93..071b25c 100644 --- a/EnterpriseIntegrationPlatform/tutorials/09-content-based-router.md +++ b/EnterpriseIntegrationPlatform/tutorials/09-content-based-router.md @@ -97,13 +97,58 @@ The router publishes to the selected topic via the broker producer **before** ac --- -## Exercises +## Lab -1. You have three routing rules with priorities 10, 5, and 1. A message matches rules at priorities 5 and 1. Which topic does the message go to and why? +**Objective:** Configure routing rules with priorities, trace how the Content-Based Router dispatches messages, and analyze routing **scalability** under high-throughput conditions. -2. A new requirement says messages with `Payload.customer.tier = "platinum"` must go to `priority-processing`. Write the `RoutingRule` record for this requirement. +### Step 1: Configure a Multi-Rule Routing Table -3. Why is pre-compiling regex patterns important for a high-throughput router? What happens if you skip compilation? +Open `src/Processing.Routing/ContentBasedRouter.cs`. Create a routing configuration for an e-commerce platform: + +| Priority | Field | Operator | Value | Output Topic | +|----------|-------|----------|-------|-------------| +| 1 | `Payload.customer.tier` | Equals | `"platinum"` | `priority-processing` | +| 5 | `MessageType` | Equals | `"OrderCreated"` | `orders.standard` | +| 10 | `MessageType` | Matches | `"Return.*"` | `returns.processing` | +| 100 | (default) | — | — | `general.inbox` | + +A message arrives with `MessageType = "OrderCreated"` and `Payload.customer.tier = "platinum"`. Which topic does it route to? Explain how priority ordering ensures deterministic routing. + +### Step 2: Trace the Routing Decision Path + +Using the `RoutingDecision` record, trace the router's decision path for a message that matches rules at priorities 1 and 5. Open the router implementation and identify: + +- How does the router evaluate rules? (sequential scan vs. sorted by priority?) +- Does evaluation stop at the first match, or are all rules evaluated? +- What `RoutingDecision` is returned — does it include the matched rule for auditing? + +### Step 3: Design for Routing Scalability + +Consider a Content-Based Router processing 50,000 messages/second with 200 routing rules: + +- What is the computational cost per message? (hint: O(n) for n rules) +- How does pre-compiling regex patterns (`RoutingOperator.Matches`) improve throughput? +- If you need to route to different brokers (Kafka for audit, NATS for real-time), how would the router's output topic abstraction enable this without code changes? + +## Exam + +1. You have routing rules with priorities 10, 5, and 1. A message matches rules at priorities 5 and 1. Which topic receives the message? + - A) Both topics receive the message (fan-out) + - B) Priority 1 — the router selects the lowest priority number (highest precedence) among matches + - C) Priority 10 — the router always uses the first rule defined + - D) Priority 5 — the router stops at the first match in definition order + +2. How does the Content-Based Router pattern support **atomic message routing**? + - A) It copies the message to all matching topics simultaneously + - B) Each message is routed to exactly one output topic — the routing decision is deterministic and idempotent, so replaying the same message always produces the same routing outcome + - C) It wraps the routing decision in a database transaction + - D) The router buffers messages until a batch is complete + +3. Why is pre-compiling regex patterns critical for **routing scalability** at high throughput? + - A) Pre-compilation reduces memory allocation per evaluation — without it, each message creates and discards regex objects, causing GC pressure that degrades throughput under load + - B) Pre-compilation is required by the .NET regex API + - C) Pre-compilation allows patterns to match across multiple lines + - D) Pre-compilation enables case-insensitive matching --- diff --git a/EnterpriseIntegrationPlatform/tutorials/10-message-filter.md b/EnterpriseIntegrationPlatform/tutorials/10-message-filter.md index d6e25ee..0d4ac42 100644 --- a/EnterpriseIntegrationPlatform/tutorials/10-message-filter.md +++ b/EnterpriseIntegrationPlatform/tutorials/10-message-filter.md @@ -64,6 +64,7 @@ public sealed class MessageFilterOptions public RuleLogicOperator Logic { get; init; } = RuleLogicOperator.And; public required string OutputTopic { get; init; } public string? DiscardTopic { get; init; } + public bool RequireDiscardTopic { get; init; } } ``` @@ -95,13 +96,71 @@ The platform enforces **no silent drops** in production deployments. When a `Dis --- -## Exercises +## Lab -1. Write a `MessageFilterOptions` configuration that passes only messages where `MessageType = "OrderCreated"` AND `Payload.total > 100`. Specify a `DiscardTopic`. +**Objective:** Configure message filter rules, analyze the no-silent-drop guarantee with `RequireDiscardTopic`, and design a filter topology for **scalable** multi-stage message processing. -2. A message fails all conditions but no `DiscardTopic` is configured. What happens? How would you change the design to prevent silent drops entirely? +### Step 1: Configure a Filter with Discard Routing -3. Compare the Message Filter to the Content-Based Router. When would you use a filter instead of a router? +Write a `MessageFilterOptions` configuration that passes only messages where `MessageType = "OrderCreated"` AND `Payload.total > 100`: + +```csharp +var options = new MessageFilterOptions +{ + Conditions = [ + new RuleCondition { FieldName = "MessageType", Operator = RuleConditionOperator.Equals, Value = "OrderCreated" }, + new RuleCondition { FieldName = "Payload.total", Operator = RuleConditionOperator.GreaterThan, Value = "100" } + ], + Logic = RuleLogicOperator.And, + OutputTopic = "high-value-orders", + DiscardTopic = "filtered-out.orders", + RequireDiscardTopic = true +}; +``` + +Explain what happens when `RequireDiscardTopic = true` and no `DiscardTopic` is configured — how does this enforce **zero message loss**? + +### Step 2: Trace the Filter's Atomicity Guarantee + +Open `src/Processing.Routing/MessageFilter.cs`. Trace the code path for a message that fails all conditions: + +1. The filter evaluates conditions → all fail → `MessageFilterResult.Passed = false` +2. With `DiscardTopic` set → message is published to the discard topic +3. With `DiscardTopic` null and `RequireDiscardTopic = true` → what exception is thrown? + +Draw the decision tree and explain how this guarantees every message is either delivered to `OutputTopic` or explicitly routed to `DiscardTopic` — never silently dropped. + +### Step 3: Design a Multi-Stage Filter Pipeline + +Design a pipeline with three cascading filters for an insurance claims system: + +| Stage | Filter Criteria | Output | Discard | +|-------|----------------|--------|---------| +| 1 | Claim amount > $0 and valid policy number | `claims.validated` | `claims.invalid` | +| 2 | Claim type is "auto" or "home" | `claims.supported` | `claims.unsupported` | +| 3 | Claim amount < $50,000 (auto-approve threshold) | `claims.auto-approve` | `claims.manual-review` | + +How does each filter's **discard topic** become a different team's input? How does this design scale — can each filter stage run independently with its own consumer group? + +## Exam + +1. A message fails all filter conditions but no `DiscardTopic` is configured and `RequireDiscardTopic = false`. What happens? + - A) The filter throws an `InvalidOperationException` + - B) The message is silently dropped — the filter logs a warning but takes no further action + - C) The message is automatically routed to the Dead Letter Queue + - D) The filter retries evaluation with relaxed conditions + +2. How does the Message Filter differ from the Content-Based Router in the EIP pattern catalog? + - A) They are identical patterns with different names + - B) The Router selects one of many output channels based on content; the Filter has a binary decision — pass or discard — making it simpler and more efficient for yes/no criteria + - C) The Filter can route to multiple topics simultaneously + - D) The Router only works with XML messages + +3. Why is `RequireDiscardTopic` essential for **production atomicity** in enterprise integration? + - A) It improves message throughput by forcing batch processing + - B) It prevents silent message loss — in production, every message must be accounted for, and throwing an exception forces the team to configure a discard destination before deployment + - C) It enables faster regex evaluation + - D) It is required by the NATS JetStream protocol --- diff --git a/EnterpriseIntegrationPlatform/tutorials/11-dynamic-router.md b/EnterpriseIntegrationPlatform/tutorials/11-dynamic-router.md index fa01011..5c00a0f 100644 --- a/EnterpriseIntegrationPlatform/tutorials/11-dynamic-router.md +++ b/EnterpriseIntegrationPlatform/tutorials/11-dynamic-router.md @@ -104,13 +104,59 @@ Routing decisions are deterministic for a given routing-table snapshot. If the p --- -## Exercises +## Lab -1. Participant D registers `conditionKey = "invoices"` with destination `"invoice-processing"`. A message arrives with `MessageType = "invoices"`. Trace the routing path. +**Objective:** Trace how the Dynamic Router updates its routing table at runtime, analyze the EIP pattern's role in **scalable** integration topologies, and design a consistent routing strategy for distributed deployments. -2. What happens if Participant D unregisters and a new message with `conditionKey = "invoices"` arrives before any other participant registers for that key? +### Step 1: Trace a Dynamic Registration Flow -3. How would you make the routing table consistent across 5 router replicas? Describe a broker-based approach. +Open `src/Processing.Routing/DynamicRouter.cs`. A new participant registers with `conditionKey = "invoices"` and destination `"invoice-processing"`. Then a message arrives with `MessageType = "invoices"`. Trace the code path: + +1. How does `RegisterAsync` store the mapping? +2. How does `RouteAsync` look up the destination? +3. What `RoutingDecision` is returned — does it include the matched condition for auditing? + +Now: Participant unregisters. A new message with the same key arrives. What happens? Where does the message go? + +### Step 2: Design for Multi-Replica Consistency + +You have 5 Dynamic Router replicas behind a load balancer. Participant D registers on Replica 1, but Replica 3 doesn't know about it. Design a solution using the platform's broker infrastructure: + +- Publish registration events to a `routing.registrations` topic +- Each replica subscribes and updates its local table +- How does this use the **Publish-Subscribe Channel** pattern to keep all replicas consistent? +- What happens to messages during the propagation delay? Is this an **atomicity** concern? + +### Step 3: Compare Dynamic Router Scalability vs. Content-Based Router + +| Aspect | Content-Based Router | Dynamic Router | +|--------|---------------------|---------------| +| Rule source | Static configuration | Runtime registrations | +| Adding new routes | ? | ? | +| Scalability model | ? | ? | +| Consistency across replicas | ? | ? | + +When would you choose a Dynamic Router over a Content-Based Router in a multi-tenant SaaS platform? + +## Exam + +1. What EIP pattern does the Dynamic Router implement that the Content-Based Router does not? + - A) Message Filter with discard + - B) A self-updating routing table where downstream participants register and unregister their interests at runtime, enabling topology changes without redeploying the router + - C) Priority-based message queuing + - D) Batch message processing + +2. In a horizontally scaled deployment with multiple router instances, what is the main **consistency** challenge? + - A) All routers must share a single-threaded execution context + - B) Registration changes on one instance must propagate to all others — during propagation, different instances may route the same message to different destinations + - C) Dynamic routers cannot be scaled horizontally + - D) Each router instance requires its own broker connection + +3. How does the Dynamic Router pattern support **scalable** integration topology changes? + - A) It requires a full system restart to add new routes + - B) New services register their routing interests at startup — the router begins directing matching messages to them immediately, with no configuration changes or redeployments needed + - C) It pre-allocates routes for all possible message types + - D) It uses a database trigger to detect new services --- diff --git a/EnterpriseIntegrationPlatform/tutorials/12-recipient-list.md b/EnterpriseIntegrationPlatform/tutorials/12-recipient-list.md index d7ff7e6..eabc382 100644 --- a/EnterpriseIntegrationPlatform/tutorials/12-recipient-list.md +++ b/EnterpriseIntegrationPlatform/tutorials/12-recipient-list.md @@ -92,13 +92,62 @@ This ensures either all recipients get the message or the source is redelivered. --- -## Exercises +## Lab -1. A message matches two rules contributing destinations `["audit", "billing", "audit"]`. What does `RecipientListResult` report for `ResolvedCount` and `DuplicatesRemoved`? +**Objective:** Analyze how the Recipient List pattern enables **scalable fan-out** to multiple destinations, design duplicate-safe publishing, and measure the performance impact of parallel vs. sequential delivery. -2. Design a metadata-based recipient list where the sender specifies destinations in `Metadata["recipients"] = "topic-a,topic-b"`. What are the trade-offs vs. rule-based resolution? +### Step 1: Trace a Recipient List Resolution -3. With 10 recipients and one slow destination (3 s latency), how does parallel publishing help compared to sequential publishing? +A message matches two routing rules that produce destinations `["audit", "billing", "audit"]`. Open `src/Processing.Routing/RecipientListRouter.cs` and trace: + +1. How are duplicate destinations handled? What does `RecipientListResult.DuplicatesRemoved` report? +2. What is the final `ResolvedCount`? +3. How does the router publish to each destination — sequentially or in parallel? + +### Step 2: Design a Metadata-Driven Recipient List + +Some integration scenarios require the **sender** to specify recipients dynamically via envelope metadata: + +```csharp +envelope.Metadata["recipients"] = "audit,billing,compliance"; +``` + +Design this approach and compare trade-offs: + +| Approach | Pros | Cons | +|----------|------|------| +| Rule-based (server-side) | Centralized control, auditable | ? | +| Metadata-based (sender-specified) | ? | Sender must know all destinations | + +Which approach provides better **atomicity** guarantees? (hint: what if the sender specifies a non-existent topic?) + +### Step 3: Analyze Fan-Out Scalability + +With 10 recipients and one slow destination (3-second latency): + +- How does parallel publishing (platform's default) compare to sequential publishing? +- What is the total latency for parallel vs. sequential? (hint: parallel ≈ max latency, sequential ≈ sum) +- If the slow destination fails, should the message be Ack'd or Nack'd for the other 9 successful deliveries? Design your atomicity strategy. + +## Exam + +1. A Recipient List resolves 5 destinations. Publishing to destination 3 fails. What should the platform do to maintain **atomicity**? + - A) Silently skip destination 3 and Ack the remaining 4 + - B) Log the failure and track partial delivery — the message enters a compensable state where the failed destination can be retried independently without re-publishing to the successful 4 + - C) Retry all 5 destinations from the beginning + - D) Route the entire message to the Dead Letter Queue + +2. Why does the Recipient List remove duplicate destinations before publishing? + - A) Duplicates are not supported by the NATS protocol + - B) Publishing the same message to the same topic multiple times creates duplicate processing downstream — de-duplication ensures **idempotent fan-out** at the routing layer + - C) Duplicate topics cause build errors + - D) The broker ignores duplicate publishes automatically + +3. How does parallel publishing to multiple recipients improve **throughput scalability**? + - A) It reduces the total message size + - B) Total fan-out latency equals the slowest recipient (not the sum of all) — this is critical when scaling to dozens of recipients, as sequential publishing would create unacceptable pipeline latency + - C) Parallel publishing uses less memory than sequential + - D) The broker handles parallelism internally regardless of how the producer publishes --- diff --git a/EnterpriseIntegrationPlatform/tutorials/13-routing-slip.md b/EnterpriseIntegrationPlatform/tutorials/13-routing-slip.md index 849f939..56be40e 100644 --- a/EnterpriseIntegrationPlatform/tutorials/13-routing-slip.md +++ b/EnterpriseIntegrationPlatform/tutorials/13-routing-slip.md @@ -111,13 +111,72 @@ The routing slip is stored in the envelope's `Metadata` dictionary as serialised --- -## Exercises +## Lab -1. Build a `RoutingSlip` with steps: Validate → Transform → Deliver. The Transform step needs a parameter `"targetFormat" = "XML"`. Write the C# construction code. +**Objective:** Build a Routing Slip, trace failure recovery with partial completion, and compare the Routing Slip pattern's **scalability** against Process Manager workflows. -2. A message has completed Validate and Transform but crashes during Deliver. What does the `RemainingSlip` look like when the message is redelivered? +### Step 1: Build a Routing Slip with Parameters -3. Compare a routing slip to a Temporal workflow pipeline. When would you choose a slip over a workflow? +Write C# code to construct a `RoutingSlip` with three steps: + +```csharp +var slip = new RoutingSlip([ + new RoutingSlipStep("Validate", new Dictionary()), + new RoutingSlipStep("Transform", new Dictionary + { + ["targetFormat"] = "XML", + ["schemaVersion"] = "2.0" + }), + new RoutingSlipStep("Deliver", new Dictionary + { + ["endpoint"] = "https://partner.example.com/api/orders" + }) +]); +``` + +Open `src/Processing.Routing/RoutingSlip.cs` and verify the record structure. How does each step carry its own parameters? Why is this important for **atomicity** — each step is self-contained with all the data it needs. + +### Step 2: Trace a Partial-Completion Recovery + +A message has completed Validate and Transform but the worker crashes during Deliver. The message is redelivered with the slip attached: + +1. What does `RemainingSlip` contain? (hint: only Deliver remains) +2. How does the platform know which steps already completed? +3. Are Validate and Transform re-executed? Why or why not? + +Draw the recovery timeline and explain how the Routing Slip pattern achieves **idempotent resume** — crashed messages resume from exactly where they left off. + +### Step 3: Compare Routing Slip vs. Temporal Workflow + +| Aspect | Routing Slip | Temporal Workflow (Process Manager) | +|--------|-------------|-------------------------------------| +| State persistence | In the message itself | In Temporal's event history | +| Dynamic step addition | ? | ? | +| Compensation support | ? | ? | +| Scalability | ? | ? | +| Best for | ? | ? | + +When would you choose a Routing Slip over a full Temporal workflow? Consider: simple linear pipelines vs. complex branching logic. + +## Exam + +1. A Routing Slip message has completed steps 1-3 of 5. The worker crashes. What happens on redelivery? + - A) All 5 steps execute from the beginning + - B) The slip indicates steps 1-3 are complete — only steps 4-5 are in `RemainingSlip`, so processing resumes from step 4 without re-executing completed work + - C) The message is routed to the Dead Letter Queue + - D) A new slip is created with all 5 steps + +2. Why does the Routing Slip pattern carry processing state **inside the message** rather than in an external store? + - A) External stores are too slow for message processing + - B) The message is self-contained — any processor can pick it up and resume, enabling **horizontal scaling** without shared state coordination between consumers + - C) The message broker requires all state in the payload + - D) External stores don't support key-value parameters + +3. What is the key **scalability** advantage of a Routing Slip over a centralized Process Manager? + - A) Routing slips are faster to serialize + - B) No central coordinator is needed — each step independently reads the slip and forwards to the next, so the pattern scales linearly with more processors and has no single-point-of-failure bottleneck + - C) Process Managers cannot run on multiple machines + - D) Routing slips support more data formats --- diff --git a/EnterpriseIntegrationPlatform/tutorials/14-process-manager.md b/EnterpriseIntegrationPlatform/tutorials/14-process-manager.md index 5e2c041..d91233e 100644 --- a/EnterpriseIntegrationPlatform/tutorials/14-process-manager.md +++ b/EnterpriseIntegrationPlatform/tutorials/14-process-manager.md @@ -103,11 +103,10 @@ public sealed class SagaCompensationActivities public async Task CompensateStepAsync(Guid correlationId, string stepName) { await _logging.LogAsync(correlationId, stepName, $"CompensationStarted:{stepName}"); - var result = await _compensation.CompensateAsync(correlationId, stepName); - await _logging.LogAsync(correlationId, stepName, result - ? $"CompensationSucceeded:{stepName}" - : $"CompensationFailed:{stepName}"); - return result; + var success = await _compensationService.CompensateAsync(correlationId, stepName); + var stage = success ? $"CompensationSucceeded:{stepName}" : $"CompensationFailed:{stepName}"; + await _logging.LogAsync(correlationId, stepName, stage); + return success; } } ``` @@ -126,13 +125,67 @@ The `AtomicPipelineWorkflow` implements full **saga compensation**. Completed st --- -## Exercises +## Lab -1. A workflow has steps: Persist → Validate → Transform → Deliver. Transform succeeds but Deliver fails. List the compensation steps in execution order. +**Objective:** Trace the Process Manager's orchestration of multi-step workflows with saga compensation, and analyze how centralized coordination enables **atomic** all-or-nothing processing. -2. What is the key difference between a Process Manager and a Routing Slip? When would you choose one over the other? +### Step 1: Trace a Compensation Sequence -3. A compensation activity (`CompensateStepAsync`) itself fails. What does Temporal do? What does the platform log? +A workflow has steps: Persist → Validate → Transform → Deliver. Transform succeeds but Deliver fails after all retries. Open `src/Workflow.Temporal/AtomicPipelineWorkflow.cs` and trace: + +1. Which steps need compensation? (only steps that committed work) +2. In what order do compensation steps execute? (hint: reverse) +3. What does `SagaCompensationActivities.CompensateStepAsync` do for each step? + +List the compensation sequence: + +| Order | Compensating | Original Step | +|-------|-------------|---------------| +| 1 | Undo Transform | Transform | +| 2 | ? | ? | +| 3 | ? | ? | + +### Step 2: Handle Compensation Failures + +The compensation for "Persist" itself fails. Open `src/Workflow.Temporal/Activities/SagaCompensationActivities.cs` and answer: + +- What does the `CompensateStepAsync` method return when compensation fails? +- Does Temporal retry the compensation? With what policy? +- What is logged? How does the operations team know that manual intervention is required? + +Design an alerting strategy for compensation failures — this is the **atomicity boundary** of the system. + +### Step 3: Compare Process Manager vs. Routing Slip + +| Aspect | Process Manager | Routing Slip | +|--------|----------------|-------------| +| Coordination | Centralized (Temporal) | Decentralized (in-message) | +| Compensation | Full saga support | Limited / manual | +| Visibility | Full execution history | ? | +| Scalability bottleneck | Temporal server | ? | +| Best for | Complex branching, compensation | ? | + +When would a Process Manager's centralized coordination be worth the **scalability** trade-off vs. a Routing Slip? + +## Exam + +1. In a Process Manager with saga compensation, why must compensation steps execute in **reverse order**? + - A) It's a convention with no technical reason + - B) Later steps may depend on earlier steps' committed state — reverse-order compensation ensures each rollback sees the state from the steps that preceded it, maintaining consistency + - C) Temporal only supports reverse execution + - D) Reverse order is faster for the scheduler + +2. A compensation step itself fails. What is the correct platform behavior for maintaining **atomicity**? + - A) Silently ignore the failure and mark the saga as complete + - B) Log the failure, mark the saga as partially compensated, and alert the operations team — some atomicity violations require human intervention when automatic compensation is impossible + - C) Restart the entire original workflow from Step 1 + - D) Route the compensation failure to the Dead Letter Queue and retry indefinitely + +3. What is the key advantage of the Process Manager pattern over the Routing Slip for **enterprise-grade atomicity**? + - A) Process Managers are faster for simple linear pipelines + - B) The Process Manager maintains a durable execution history with full saga compensation — if any step fails, all committed work can be rolled back to restore consistency + - C) Process Managers don't require a message broker + - D) Routing Slips cannot carry parameters --- diff --git a/EnterpriseIntegrationPlatform/tutorials/15-message-translator.md b/EnterpriseIntegrationPlatform/tutorials/15-message-translator.md index f23ac2c..7ad6dd3 100644 --- a/EnterpriseIntegrationPlatform/tutorials/15-message-translator.md +++ b/EnterpriseIntegrationPlatform/tutorials/15-message-translator.md @@ -97,13 +97,67 @@ The translator publishes the translated envelope to the target topic **before** --- -## Exercises +## Lab -1. Write a `FieldMapping` list that maps `{ "first_name": "Alice", "last_name": "Smith" }` to `{ "fullName": "Alice Smith", "source": "CRM" }`. Hint: one mapping uses `StaticValue`. +**Objective:** Build field mappings for cross-system data transformation, analyze how the Message Translator pattern preserves message **atomicity** through immutable transformations, and design a multi-format translation strategy. -2. When would you use `FuncPayloadTransform` vs `JsonFieldMappingTransform`? Give an example of each. +### Step 1: Build a Field Mapping Configuration -3. A translator receives a JSON message but the target system expects XML. Which platform components would you combine to achieve this? +Write a `FieldMapping` list that transforms this input: + +```json +{ "first_name": "Alice", "last_name": "Smith", "email": "alice@example.com" } +``` + +Into this output: + +```json +{ "fullName": "Alice Smith", "contactEmail": "alice@example.com", "source": "CRM" } +``` + +Identify: which mapping uses `SourceField`, which uses `StaticValue`, and how would you combine `first_name` + `last_name` into `fullName`? Open `src/Processing.Translator/JsonFieldMappingTransform.cs` to verify the mapping mechanics. + +### Step 2: Trace Immutability Through Translation + +Open `src/Processing.Translator/MessageTranslator.cs`. When a message is translated: + +1. Is the original `IntegrationEnvelope` mutated, or is a new envelope created? +2. How does the `CausationId` of the translated message link back to the original? +3. If translation fails (e.g., missing required field), what happens to the original message? + +Explain why **immutable transformation** is critical for atomicity: if translation fails, the original message is untouched and can be retried or routed to the DLQ. + +### Step 3: Design a Multi-Format Translation Pipeline + +A partner sends data in XML, but your downstream systems expect JSON. Another partner sends CSV. Design a translation strategy: + +| Source Format | Translator Step | Output | +|--------------|----------------|--------| +| XML → JSON | `XmlToJsonStep` | Canonical JSON | +| CSV → JSON | Custom `IPayloadTransform` | Canonical JSON | +| JSON → Canonical | `JsonFieldMappingTransform` | Normalized envelope | + +How does the **Canonical Data Model** (Tutorial 17 — Normalizer) relate to the Message Translator? Why is normalizing to a canonical format essential for **scalability** — what happens when you add a 5th source format? + +## Exam + +1. Why does the Message Translator create a **new envelope** rather than modifying the original? + - A) .NET records are always immutable + - B) Immutable transformation preserves the original for retry, DLQ routing, and audit — if translation fails, the untouched original maintains atomicity of the processing pipeline + - C) The broker rejects modified messages + - D) Creating new envelopes uses less memory + +2. When would you use `FuncPayloadTransform` (code-based) vs. `JsonFieldMappingTransform` (configuration-based)? + - A) They are interchangeable + - B) `JsonFieldMappingTransform` for simple field renaming/mapping that non-developers can configure; `FuncPayloadTransform` for complex logic like format conversion, calculations, or API enrichment that requires code + - C) `FuncPayloadTransform` is faster in all cases + - D) `JsonFieldMappingTransform` only works with XML + +3. How does the Canonical Data Model concept support **integration scalability**? + - A) It reduces message size for faster transport + - B) All message sources translate to one canonical format — adding a new source system requires only one new translator, not N translators for N downstream consumers + - C) Canonical models encrypt data for security + - D) It eliminates the need for a message broker --- diff --git a/EnterpriseIntegrationPlatform/tutorials/16-transform-pipeline.md b/EnterpriseIntegrationPlatform/tutorials/16-transform-pipeline.md index cddecea..945eb55 100644 --- a/EnterpriseIntegrationPlatform/tutorials/16-transform-pipeline.md +++ b/EnterpriseIntegrationPlatform/tutorials/16-transform-pipeline.md @@ -108,13 +108,59 @@ The pipeline is **all-or-nothing** within a single invocation. If any step throw --- -## Exercises +## Lab -1. Design a 3-step pipeline that: (a) converts XML to JSON, (b) applies a regex to redact email addresses, (c) filters to keep only `$.order.id` and `$.order.total`. List the steps in order. +**Objective:** Design a multi-step transform pipeline, trace how immutable `TransformContext` preserves **atomicity** through each stage, and analyze pipeline **scalability** under failure conditions. -2. After step 2 of 4 the pipeline fails. What is `StepsApplied`? What happens to the source message? +### Step 1: Design a Transform Pipeline -3. Why does `TransformContext` use `WithPayload` instead of mutable setters? What concurrency benefit does this provide? +Design a 3-step pipeline for PCI-compliant order processing: + +| Step | Transform | Class | Purpose | +|------|-----------|-------|---------| +| 1 | XML → JSON | `XmlToJsonStep` | Convert partner XML to canonical JSON | +| 2 | Redact PII | `RegexReplaceStep` | Mask email addresses with `***@***` | +| 3 | Filter fields | `JsonPathFilterStep` | Keep only `$.order.id` and `$.order.total` | + +Open `src/Processing.Transformer/` and verify each step class exists. Write the `TransformOptions` configuration for this pipeline. + +### Step 2: Trace Failure Recovery with StepsApplied + +After step 2 of 4, the pipeline fails (e.g., `JsonPathFilterStep` encounters malformed JSON): + +1. What is `TransformPipelineResult.StepsApplied`? (answer: 2) +2. Is the original source message modified? (hint: `TransformContext.WithPayload` creates copies) +3. How does the pipeline decide whether to retry vs. route to DLQ? + +Explain why `TransformContext` uses `WithPayload` (immutable updates) instead of mutable setters — what **concurrency** benefit does this provide when multiple messages are being transformed in parallel? + +### Step 3: Evaluate Pipeline Scalability + +A pipeline processes 10,000 messages/second. Step 2 (regex redaction) is 5x slower than the other steps: + +- Can you scale Step 2 independently? (hint: in Temporal, each step is an activity) +- What happens to pipeline throughput if you add a 4th step? +- How does the Pipes and Filters architecture prevent a slow step from blocking the entire system? + +## Exam + +1. Why does `TransformContext` use `WithPayload` (immutable copy) instead of mutating the payload in place? + - A) Mutable payloads are not supported by .NET records + - B) Immutable context ensures that if a later step fails, earlier step results are preserved — enabling safe retry and parallel processing without data corruption from shared mutable state + - C) `WithPayload` is faster than direct mutation + - D) The broker requires immutable messages + +2. A transform pipeline has 5 steps. Step 3 fails permanently. What should happen for **atomic** message processing? + - A) Steps 1-2 results are discarded and the original message is routed to the DLQ with failure context, preserving full traceability + - B) Steps 4-5 execute with partial data + - C) The pipeline retries all 5 steps from the beginning + - D) The message is silently dropped + +3. How does the Transform Pipeline pattern support **horizontal scalability**? + - A) All steps must run on the same machine + - B) Each step is an independent filter — Temporal can distribute steps across workers, and slow steps can be scaled by adding more activity workers without affecting other steps + - C) The pipeline pre-allocates resources for all steps + - D) Scalability is limited by the fastest step --- diff --git a/EnterpriseIntegrationPlatform/tutorials/17-normalizer.md b/EnterpriseIntegrationPlatform/tutorials/17-normalizer.md index f52eb39..3afca55 100644 --- a/EnterpriseIntegrationPlatform/tutorials/17-normalizer.md +++ b/EnterpriseIntegrationPlatform/tutorials/17-normalizer.md @@ -95,13 +95,66 @@ Normalization happens **before** any downstream processing. If normalization fai --- -## Exercises +## Lab -1. A partner sends CSV with `|` as delimiter and no header row. Write the `NormalizerOptions` configuration for this case. +**Objective:** Configure the Normalizer for multi-format input handling, analyze how the Canonical Data Model pattern enables **scalable** integration with diverse source systems, and design normalization strategies for edge cases. -2. A payload arrives with `contentType = "application/json"` but contains invalid JSON. What happens when `StrictContentType = true`? What about `false`? +### Step 1: Configure a CSV Normalizer -3. Why does the platform choose JSON as the canonical format rather than XML or a binary format like Protocol Buffers? +A partner sends CSV files with `|` as delimiter and no header row. Open `src/Processing.Normalizer/` and configure `NormalizerOptions`: + +```csharp +var options = new NormalizerOptions +{ + CsvDelimiter = '|', + CsvHasHeader = false, + CsvColumnNames = ["orderId", "customerId", "amount", "currency"], + StrictContentType = true +}; +``` + +Trace what happens when: (a) a valid CSV arrives, (b) JSON arrives with `contentType = "text/csv"` but `StrictContentType = true`. + +### Step 2: Map the Canonical Data Model + +The platform normalizes all inputs to JSON. Draw a diagram showing 4 source systems and how they funnel through the Normalizer: + +``` +Partner A (XML) ─────┐ +Partner B (CSV) ─────┤ +Partner C (JSON) ────┼──→ Normalizer ──→ Canonical JSON ──→ Router ──→ N consumers +Internal API (JSON) ─┘ +``` + +How many translators are needed for 4 sources and 6 consumers? With a canonical model: **4** (one per source). Without: **24** (4×6). This is the **scalability** argument for normalization. + +### Step 3: Handle Format Detection Failures + +A payload arrives with `contentType = "application/json"` but contains invalid JSON. Analyze: + +- What happens when `StrictContentType = true`? (exception → DLQ) +- What happens when `StrictContentType = false`? (format sniffing attempt) +- Why is strict mode recommended for production **atomicity** — what risks does lenient mode introduce? + +## Exam + +1. Why does the platform normalize all messages to a **Canonical Data Model** (JSON)? + - A) JSON is faster to parse than all other formats + - B) A single canonical format means adding a new source system requires only one new translator — not one for every downstream consumer — making the integration platform scale linearly with the number of systems + - C) JSON is required by the NATS protocol + - D) The .NET runtime only supports JSON serialization + +2. What is the risk of setting `StrictContentType = false` in a production environment? + - A) No risk — lenient mode is always preferred + - B) A message could be misinterpreted — e.g., XML interpreted as JSON due to format sniffing — leading to corrupt data flowing through the pipeline undetected, violating **data atomicity** + - C) Lenient mode disables all content validation + - D) Strict mode is slower than lenient mode + +3. How does the Normalizer pattern reduce **integration complexity** when scaling from 5 to 50 connected systems? + - A) It doesn't — complexity grows equally regardless + - B) Without normalization, N sources × M consumers = N×M translators; with normalization, only N + M translators are needed — this is the difference between O(N²) and O(N) scaling + - C) The Normalizer caches all messages, reducing duplicate processing + - D) The Normalizer compresses messages to reduce broker storage --- diff --git a/EnterpriseIntegrationPlatform/tutorials/18-content-enricher.md b/EnterpriseIntegrationPlatform/tutorials/18-content-enricher.md index 01ec82c..c4e2f1a 100644 --- a/EnterpriseIntegrationPlatform/tutorials/18-content-enricher.md +++ b/EnterpriseIntegrationPlatform/tutorials/18-content-enricher.md @@ -88,13 +88,60 @@ Enrichment is **not idempotent by default** if the external data changes between --- -## Exercises +## Lab -1. An order message `{ "orderId": 42 }` needs enrichment with customer data from `GET /api/customers/{customerId}`. But the order message doesn't contain `customerId` — only `orderId`. How would you design a two-step enrichment? +**Objective:** Design enrichment strategies using external data sources, analyze **atomicity** when enrichment depends on external service availability, and evaluate caching for **scalable** enrichment. -2. The external HTTP service is down. What happens to messages waiting for enrichment? How does the retry policy interact with the enricher? +### Step 1: Design a Two-Step Enrichment -3. Compare the Content Enricher to the Content Filter (Tutorial 19). How are they complementary? +An order message `{ "orderId": 42 }` needs customer data, but only contains `orderId` — not `customerId`. Design the enrichment flow: + +1. Step 1: Look up `customerId` from `GET /api/orders/42` → returns `{ "customerId": "CUST-7" }` +2. Step 2: Enrich with customer data from `GET /api/customers/CUST-7` → returns `{ "name": "Alice", "tier": "gold" }` + +Open `src/Processing.Enricher/ContentEnricher.cs` and identify how the enricher merges external data into the envelope. Does it mutate the original or create a new enriched envelope? + +### Step 2: Analyze Enrichment Failure Atomicity + +The external HTTP service is down during enrichment. Trace what happens: + +1. Does the enricher retry? What retry policy applies? +2. If all retries fail, where does the message go? +3. Is the original message preserved untouched for retry later? + +Now consider: the enricher calls two services. Service A succeeds but Service B fails. Is the partial enrichment from Service A committed? How does this affect **atomicity**? Design a strategy: should partial enrichment be discarded or preserved? + +### Step 3: Design a Caching Strategy for Scalability + +At 10,000 messages/second, each enrichment requires an HTTP call to an external CRM. Without caching, that's 10,000 HTTP calls/second. Design a caching strategy: + +| Cache Level | TTL | Hit Rate | Scalability Impact | +|-------------|-----|----------|-------------------| +| In-memory (per-worker) | 60s | ~80% | Reduces to 2,000 calls/second | +| Distributed (Redis) | 5min | ~95% | Reduces to 500 calls/second | +| Database fallback | 1hr | ~99% | ? | + +Open `src/Processing.Enricher/` and check if the platform implements caching. How does cache invalidation interact with message **consistency**? + +## Exam + +1. The Content Enricher calls an external service that is temporarily unavailable. What is the correct **atomic** behavior? + - A) Skip enrichment and forward the message without the additional data + - B) Preserve the original message, retry according to policy, and if all retries fail route to the DLQ — the message is never forwarded with missing enrichment data + - C) Cache the last known good response and use it + - D) Block all messages until the external service recovers + +2. How does caching in the Content Enricher improve **scalability** without sacrificing data accuracy? + - A) Caching eliminates the need for external services entirely + - B) Frequently accessed enrichment data (e.g., customer records) is cached with a TTL — this reduces external API calls by 80-95% while ensuring data freshness through time-based expiration + - C) The cache stores messages, not enrichment data + - D) Caching is only useful for batch processing + +3. How are the Content Enricher and Content Filter (Tutorial 19) **complementary** in a pipeline? + - A) They do the same thing in reverse order + - B) The Enricher adds data from external sources, then the Filter removes fields not needed downstream — together they ensure each consumer receives exactly the data it needs, no more and no less + - C) The Filter must always run before the Enricher + - D) They cannot be used in the same pipeline --- diff --git a/EnterpriseIntegrationPlatform/tutorials/19-content-filter.md b/EnterpriseIntegrationPlatform/tutorials/19-content-filter.md index a43a1bd..f0f7260 100644 --- a/EnterpriseIntegrationPlatform/tutorials/19-content-filter.md +++ b/EnterpriseIntegrationPlatform/tutorials/19-content-filter.md @@ -79,13 +79,61 @@ Filtering is a **pure, deterministic function** — the same input and keep-path --- -## Exercises +## Lab -1. A message has fields `order.id`, `order.items[]`, `customer.email`, `customer.phone`, `audit.createdBy`. You only need `order.id` and `customer.email`. Write the `keepPaths` list and describe the resulting JSON structure. +**Objective:** Apply the Content Filter pattern to remove unnecessary data, analyze data minimization for **security** and **scalability**, and design a filter-then-route pipeline. -2. A keep-path `customer.address.zipCode` is specified but the message doesn't have an `address` field. What happens? +### Step 1: Configure a Content Filter -3. Design a pipeline that first enriches a message (Tutorial 18) and then filters it. Why is this order important? +A message has fields: `order.id`, `order.items[]`, `customer.email`, `customer.phone`, `customer.ssn`, `audit.createdBy`. You need only `order.id` and `customer.email` for the downstream billing system. Write the `keepPaths` configuration: + +```csharp +var keepPaths = new[] { "order.id", "customer.email" }; +``` + +Open `src/Processing.Transformer/JsonPathFilterStep.cs` and trace: What happens to `customer.ssn`? What happens if `keepPaths` references a field that doesn't exist in the message (e.g., `customer.address.zipCode`)? + +### Step 2: Design for Security and Data Minimization + +The Content Filter is a key tool for **data minimization** (GDPR, PCI-DSS). Design a pipeline: + +| Consumer | Allowed Fields | Filtered Fields | +|----------|---------------|----------------| +| Billing | `order.id`, `customer.email`, `order.total` | PII, items, audit | +| Analytics | `order.id`, `order.items[]`, `order.total` | All customer PII | +| Audit | All fields | None (full record) | + +How does the Content Filter ensure that the billing system **never** receives `customer.ssn`? Why is this an **atomicity** concern — what happens if the filter is accidentally misconfigured? + +### Step 3: Design an Enrich-Then-Filter Pipeline + +Explain why the order matters: first Enrich (Tutorial 18) then Filter. Draw a pipeline: + +``` +Raw message → Content Enricher (add customer data) → Content Filter (remove sensitive fields) → Route to consumer +``` + +If you reverse the order (filter first, then enrich), what goes wrong? How does the pipeline order preserve both data completeness and data minimization? + +## Exam + +1. A keep-path references a field that doesn't exist in the message. What should the Content Filter do? + - A) Throw an exception and route to DLQ + - B) Silently omit the missing field from the output — the filter operates on what's present, producing a valid subset without failing, which supports graceful handling of schema variations + - C) Add the field with a null value + - D) Block the message until the field is available + +2. Why is the Content Filter critical for **PCI-DSS and GDPR compliance** in enterprise integration? + - A) It encrypts sensitive fields automatically + - B) It ensures each downstream consumer receives only the data it needs — preventing over-exposure of PII and cardholder data by stripping unauthorized fields before routing + - C) It logs all sensitive data access for audit + - D) It replaces sensitive data with synthetic values + +3. In a high-throughput pipeline, how does content filtering improve **scalability**? + - A) Filtering doesn't affect performance + - B) Removing unnecessary fields reduces message size — smaller messages mean lower broker storage costs, faster serialization, and reduced network bandwidth across the entire downstream processing chain + - C) Filtering enables parallel processing + - D) Filtered messages skip the routing step --- diff --git a/EnterpriseIntegrationPlatform/tutorials/20-splitter.md b/EnterpriseIntegrationPlatform/tutorials/20-splitter.md index d968a4a..849888a 100644 --- a/EnterpriseIntegrationPlatform/tutorials/20-splitter.md +++ b/EnterpriseIntegrationPlatform/tutorials/20-splitter.md @@ -98,13 +98,61 @@ All split items are published to the target topic before the source message is A --- -## Exercises +## Lab -1. A message `{ "orders": [{ "id": 1 }, { "id": 2 }, { "id": 3 }] }` is split using `JsonArraySplitStrategy` with `ArrayPropertyName = "orders"`. How many envelopes are in `SplitResult.SplitEnvelopes`? What is `ItemCount`? +**Objective:** Split composite messages into individual items, trace how `SequenceNumber` and `TotalCount` enable the Aggregator to reassemble split messages, and analyze **atomicity** when a split item fails. -2. After splitting, item at sequence 1 is lost due to a downstream failure. How does the Aggregator (Tutorial 21) detect this? +### Step 1: Split a Composite Message -3. Why does `JsonArraySplitStrategy` clone each element with `JsonSerializer.SerializeToElement`? What would happen if it didn't? +A message `{ "orders": [{ "id": 1, "total": 50 }, { "id": 2, "total": 150 }, { "id": 3, "total": 75 }] }` is split using `JsonArraySplitStrategy` with `ArrayPropertyName = "orders"`. Open `src/Processing.Splitter/` and trace: + +1. How many envelopes are in `SplitResult.SplitEnvelopes`? +2. What is `ItemCount`? +3. What `SequenceNumber` and `TotalCount` does each split envelope carry? +4. Do all split envelopes share the same `CorrelationId` as the original? + +### Step 2: Trace Atomicity When a Split Item Fails + +After splitting, the 3 items are processed independently. Item 2 (sequence 1) fails delivery: + +| Item | SequenceNumber | Status | +|------|---------------|--------| +| `{ "id": 1 }` | 0 | ✅ Delivered | +| `{ "id": 2 }` | 1 | ❌ Failed | +| `{ "id": 3 }` | 2 | ✅ Delivered | + +Questions: +- How does the Aggregator (Tutorial 21) detect that item 2 is missing? (hint: `TotalCount = 3` but only 2 arrived) +- Should the Aggregator wait indefinitely or timeout? What timeout strategy preserves **atomicity**? +- Should items 1 and 3 be rolled back (saga compensation), or should only item 2 be retried? + +### Step 3: Evaluate Splitter Scalability + +Splitting a message with 1,000 items creates 1,000 individual messages. Analyze: + +- Each split message is independently processed — what parallelism level is achievable? +- What is the memory impact of cloning 1,000 JSON elements? (hint: `JsonSerializer.SerializeToElement` creates deep copies) +- Why does `JsonArraySplitStrategy` clone each element rather than using references? What **concurrency** bug would occur without cloning? + +## Exam + +1. After splitting, why does each split envelope carry `SequenceNumber` and `TotalCount`? + - A) For sorting messages alphabetically + - B) These fields enable the downstream Aggregator to detect missing items and reassemble the complete set — without them, the Aggregator cannot determine when all pieces have arrived or which pieces are missing + - C) The broker requires sequence numbers for storage + - D) They are used for message deduplication + +2. Why does the Splitter clone each array element rather than using references to the original? + - A) .NET doesn't support object references in records + - B) Cloning ensures each split message is independently serializable and processable — without cloning, concurrent modifications by downstream consumers could corrupt the shared source data, violating processing **atomicity** + - C) Cloning is faster than referencing + - D) The broker serializer requires cloned objects + +3. A batch message with 100 items is split. Item 47 fails after items 1-46 and 48-100 succeed. What is the **scalable** recovery strategy? + - A) Retry all 100 items from the beginning + - B) Retry only item 47 using its `CorrelationId` and `SequenceNumber` — the other 99 items are already committed and don't need reprocessing, enabling efficient partial recovery + - C) Route all 100 items to the Dead Letter Queue + - D) Wait for item 47 to auto-heal --- diff --git a/EnterpriseIntegrationPlatform/tutorials/21-aggregator.md b/EnterpriseIntegrationPlatform/tutorials/21-aggregator.md index 59edcb1..98cd381 100644 --- a/EnterpriseIntegrationPlatform/tutorials/21-aggregator.md +++ b/EnterpriseIntegrationPlatform/tutorials/21-aggregator.md @@ -113,13 +113,56 @@ Each `AggregateAsync` call atomically adds the item to the store and checks comp --- -## Exercises +## Lab -1. A Splitter produces 5 items with `TotalCount = 5`. After receiving items 0, 1, 2, 3, what does `AggregateResult.ReceivedCount` return? What is `IsComplete`? +**Objective:** Trace the Aggregator's completion logic, design timeout strategies, and analyze how **idempotent** aggregation ensures **atomic** reassembly of split messages. -2. Design a `TimeoutCompletionStrategy` that completes a group if 30 seconds pass since the first item arrived. What challenges does this introduce? +### Step 1: Trace Aggregation Completion -3. Why must the `IMessageAggregateStore` be idempotent on `MessageId`? What happens without idempotency if a message is redelivered? +A Splitter produces 5 items with `TotalCount = 5`. Items arrive out of order: 3, 0, 4, 1, 2. Open `src/Processing.Aggregator/MessageAggregator.cs` and trace: + +1. After receiving items 0, 1, 2, 3 — what does `AggregateResult.ReceivedCount` return? What is `IsComplete`? +2. When item 4 arrives, how does the Aggregator know the group is complete? +3. What `CorrelationId` links all 5 items to the same aggregate group? + +### Step 2: Design a Timeout Completion Strategy + +Not all split items may arrive (e.g., item 2 fails permanently). Design a timeout strategy: + +- After 30 seconds from the first item, complete the aggregate with whatever has arrived +- Mark the result as `IsPartial = true` +- Route the partial aggregate to a `review.incomplete-batches` topic + +What **atomicity** decision must you make: should a partial aggregate be considered "successful" or should it trigger compensation for already-delivered items? + +### Step 3: Analyze Idempotent Aggregation + +A message with `SequenceNumber = 2` is delivered twice (broker redelivery). Without idempotency: + +- The aggregate would count 6 items instead of 5 +- `IsComplete` would never be true (6 > 5) or would fire prematurely + +Open `src/Processing.Aggregator/` and verify: How does `IMessageAggregateStore` handle duplicate `MessageId`s? Why is idempotency critical for **scalable** at-least-once delivery systems? + +## Exam + +1. A Splitter produces 5 items. The Aggregator receives items 0, 1, 3, 4 but item 2 never arrives. What should happen after the timeout? + - A) Wait indefinitely — the aggregate must be complete + - B) Complete with 4 items, mark as partial, and route for manual review — a timeout prevents indefinite resource consumption while preserving the received work for inspection + - C) Discard all 4 received items + - D) Re-request item 2 from the Splitter + +2. Why must the Aggregator's store be **idempotent** on `MessageId`? + - A) Idempotency is required by the NUnit testing framework + - B) In at-least-once delivery systems, duplicate messages are expected — without idempotency, the aggregate count would be corrupted, potentially triggering premature completion or preventing completion entirely + - C) Idempotency improves serialization performance + - D) The broker guarantees exactly-once delivery, so idempotency is unnecessary + +3. How does the Splitter-Aggregator pair maintain **end-to-end atomicity** for a batch message? + - A) The Splitter and Aggregator share a database transaction + - B) The `CorrelationId` links all split items; `SequenceNumber` and `TotalCount` enable the Aggregator to verify completeness — only when all items succeed (or timeout triggers) is the aggregate result committed or compensated + - C) The broker ensures all items are delivered simultaneously + - D) Each split item is independently atomic — there is no end-to-end guarantee --- diff --git a/EnterpriseIntegrationPlatform/tutorials/22-scatter-gather.md b/EnterpriseIntegrationPlatform/tutorials/22-scatter-gather.md index 36160f6..e7493f4 100644 --- a/EnterpriseIntegrationPlatform/tutorials/22-scatter-gather.md +++ b/EnterpriseIntegrationPlatform/tutorials/22-scatter-gather.md @@ -95,13 +95,67 @@ Scatter-Gather has **best-effort semantics** within the timeout window. If a rec --- -## Exercises +## Lab -1. You scatter a pricing request to 3 suppliers with a 5-second timeout. Supplier A responds in 1 s, Supplier B in 3 s, Supplier C never responds. What does `ScatterGatherResult` look like? +**Objective:** Trace the Scatter-Gather pattern's parallel request-response flow, analyze timeout behavior for **partial results**, and design a "best-of-N" selection strategy. -2. How would you implement a "best of N" strategy where you take the lowest price from all responses received within the timeout? +### Step 1: Trace a Scatter-Gather with Timeout -3. Compare Scatter-Gather to calling each service sequentially. What is the latency difference with 3 services averaging 2 seconds each? +You scatter a pricing request to 3 suppliers with `TimeoutMs = 5000`: + +| Supplier | Response Time | Price | +|----------|--------------|-------| +| A | 1 second | $120 | +| B | 3 seconds | $95 | +| C | Never responds | — | + +Open `src/Processing.ScatterGather/ScatterGatherer.cs` and trace: + +1. How does `ScatterGatherResult.Responses` look? (2 responses) +2. Is `TimedOut = true`? (yes — only 2 of 3 responded) +3. What is `Duration`? (≈5 seconds — the timeout) + +### Step 2: Design a "Best-of-N" Selection Strategy + +Using the partial results above, implement a selection strategy that picks the lowest price: + +``` +1. Scatter to all suppliers (parallel) +2. Gather responses until timeout +3. From gathered responses, select the one with lowest price +4. If no responses arrived, route to DLQ with reason "no-supplier-response" +``` + +What is the **atomicity** guarantee? The selected best price must be committed as a single decision — if the commit fails, no supplier should be charged. + +### Step 3: Compare Scatter-Gather Latency vs. Sequential Calls + +| Approach | 3 services × 2s avg | 10 services × 2s avg | +|----------|---------------------|----------------------| +| Sequential | 6 seconds total | 20 seconds total | +| Scatter-Gather | ~2 seconds (parallel) | ~2 seconds (parallel) | + +How does the Scatter-Gather pattern enable **scalable** multi-supplier/multi-service integration? What happens to latency as you add more recipients? + +## Exam + +1. A Scatter-Gather operation sends to 5 recipients with a 3-second timeout. Only 3 respond in time. What does the result indicate? + - A) Failure — all recipients must respond + - B) `TimedOut = true` with 3 responses — the caller receives partial results and can decide how to proceed based on business logic (e.g., select best from available) + - C) The operation retries the 2 missing recipients + - D) The 3 responses are discarded and the operation fails + +2. How does the Scatter-Gather pattern improve **integration scalability** compared to sequential service calls? + - A) It uses less memory per request + - B) Latency equals the slowest responder (or timeout), not the sum of all — adding more recipients doesn't increase total latency, enabling efficient multi-source integration at scale + - C) It reduces the number of network connections + - D) Sequential calls are always faster for small numbers of recipients + +3. What **atomicity** consideration arises when the Scatter-Gather selects one response from many? + - A) All responses must be stored permanently + - B) The selected response must be committed atomically — if the downstream commit fails, no side effects from the selection (e.g., supplier charges) should be applied, requiring compensation for any tentative reservations + - C) Non-selected responses are automatically compensated + - D) The broker handles selection atomicity --- diff --git a/EnterpriseIntegrationPlatform/tutorials/23-request-reply.md b/EnterpriseIntegrationPlatform/tutorials/23-request-reply.md index 92d9016..bf94df3 100644 --- a/EnterpriseIntegrationPlatform/tutorials/23-request-reply.md +++ b/EnterpriseIntegrationPlatform/tutorials/23-request-reply.md @@ -95,13 +95,57 @@ The request is published to the request topic and the correlator subscribes to t --- -## Exercises +## Lab -1. A request is sent with `TimeoutMs = 5000`. The responder takes 7 seconds. What does `RequestReplyResult` look like? +**Objective:** Trace the Request-Reply correlation mechanism, analyze timeout behavior, and design for **scalable** request-reply across distributed services. -2. Two requesters send requests with different `CorrelationId` values to the same request topic. How does each requester get the correct reply? +### Step 1: Trace Request-Reply Correlation -3. Why does the correlator subscribe to the reply topic **before** publishing the request? What race condition does this prevent? +A request is sent with `TimeoutMs = 5000`. The responder takes 7 seconds. Open `src/Processing.RequestReply/RequestReplyCorrelator.cs` and trace: + +1. What does `RequestReplyResult` look like? (`TimedOut = true`, no response) +2. If the responder takes 3 seconds, what does the result contain? +3. How does the `CorrelationId` in the request envelope match to the response? + +Now: Two requesters send requests with different `CorrelationId` values to the same request topic. How does each requester receive its own correct reply? + +### Step 2: Prevent the Subscribe-Before-Publish Race Condition + +The correlator subscribes to the reply topic **before** publishing the request. Explain: + +1. What race condition occurs if you publish first, then subscribe? +2. How does pre-subscribing ensure the reply is never lost? +3. Draw the timeline: Subscribe → Publish → Responder processes → Reply arrives → Correlator matches + +This is an **atomicity** concern: without pre-subscription, fast responders could publish replies before the requester is listening, causing permanent message loss. + +### Step 3: Design for Request-Reply Scalability + +At high throughput, many concurrent request-reply operations share the same reply topic: + +- How does the correlator isolate concurrent requests? (hint: `CorrelationId` matching) +- What happens if 1,000 requests are in flight simultaneously? Memory implications? +- How does the `TimeoutMs` prevent resource leaks from requests that never receive replies? + +## Exam + +1. Why does the Request-Reply correlator subscribe to the reply topic **before** publishing the request? + - A) Subscribing is faster than publishing + - B) A fast responder could publish the reply before the requester is listening — pre-subscribing eliminates this race condition, ensuring the reply is never lost even with sub-millisecond response times + - C) The broker requires subscriptions before publishes + - D) Pre-subscribing reduces network latency + +2. How does the `CorrelationId` enable **scalable** request-reply with many concurrent requests on the same topic? + - A) The broker routes replies based on `CorrelationId` automatically + - B) Each requester filters incoming replies by `CorrelationId` — only the matching reply is accepted, allowing thousands of concurrent request-reply operations to share a single reply topic without interference + - C) `CorrelationId` is used for message encryption + - D) Each request must use a unique reply topic + +3. What resource **scalability** concern does the timeout address in request-reply? + - A) Timeouts improve message throughput + - B) Without timeouts, requests that never receive replies would hold resources (memory, channel subscriptions) indefinitely — the timeout ensures cleanup even when responders fail, preventing memory leaks under sustained load + - C) Timeouts are only needed for testing + - D) The broker automatically cleans up timed-out requests --- diff --git a/EnterpriseIntegrationPlatform/tutorials/24-retry-framework.md b/EnterpriseIntegrationPlatform/tutorials/24-retry-framework.md index e5e5b9d..33d35cb 100644 --- a/EnterpriseIntegrationPlatform/tutorials/24-retry-framework.md +++ b/EnterpriseIntegrationPlatform/tutorials/24-retry-framework.md @@ -122,13 +122,71 @@ When all retry attempts are exhausted (`IsSucceeded = false`), the message shoul --- -## Exercises +## Lab -1. With `MaxAttempts = 4`, `InitialDelayMs = 500`, `BackoffMultiplier = 2.0`, and `UseJitter = false`, calculate the delay before each retry attempt. +**Objective:** Calculate exponential backoff delays, analyze why jitter is critical for **scalable** retry under thundering-herd conditions, and design a retry classification strategy. -2. Why is jitter important? Describe a scenario where 100 consumers without jitter cause problems for a recovering database. +### Step 1: Calculate Backoff Delays -3. A `JsonException` during deserialization is not retryable. How would you detect this and short-circuit to the DLQ? +With `MaxAttempts = 4`, `InitialDelayMs = 500`, `BackoffMultiplier = 2.0`, and `UseJitter = false`, calculate the delay before each retry: + +| Attempt | Delay Formula | Delay | +|---------|--------------|-------| +| 1 (first retry) | 500 × 2⁰ | 500ms | +| 2 | 500 × 2¹ | ? | +| 3 | 500 × 2² | ? | +| 4 | 500 × 2³ | ? | + +What is the total maximum wait time across all retries? Open `src/Processing.Retry/ExponentialBackoffRetryPolicy.cs` to verify the formula. + +### Step 2: Analyze the Thundering Herd Problem + +100 consumers lose connection to a database. All retry at the same exponential intervals (no jitter). Draw what happens: + +``` +t=0s: [100 consumers all fail] +t=500ms: [100 consumers all retry simultaneously] → database overwhelmed again +t=1000ms: [100 consumers all retry simultaneously] → database overwhelmed again +``` + +Now add jitter: each consumer randomizes its delay within ±50%. Explain: +- How does jitter spread the retry load over time? +- Why is this critical for **system-level scalability** during recovery? +- What is the relationship between jitter and the database's recovery time? + +### Step 3: Design Retry Classification + +Not all errors are retryable. Design a classification strategy: + +| Error Type | Retryable? | Action | +|-----------|-----------|--------| +| HTTP 503 (Service Unavailable) | Yes | Exponential backoff | +| HTTP 400 (Bad Request) | No | Immediate DLQ | +| `JsonException` (deserialization) | No | Immediate DLQ | +| `TimeoutException` (network) | Yes | ? | +| Schema validation failure | No | ? | + +Why is fast-failing non-retryable errors critical for **pipeline throughput**? What happens if you retry a `JsonException` 4 times before giving up? + +## Exam + +1. With `InitialDelayMs = 1000` and `BackoffMultiplier = 2.0`, what is the delay before the 4th retry attempt? + - A) 4000ms + - B) 8000ms — the delay doubles each attempt: 1000, 2000, 4000, 8000 + - C) 3000ms + - D) 16000ms + +2. Why is jitter critical for **scalable** retry strategies in distributed systems? + - A) Jitter makes retries faster + - B) Without jitter, all consumers retry at identical intervals — creating synchronized spikes that can overwhelm the recovering service; jitter spreads retries over time, enabling gradual recovery + - C) Jitter is only needed for testing + - D) The broker requires jitter in retry delays + +3. Why should non-retryable errors (e.g., `JsonException`) be routed to the DLQ immediately instead of retried? + - A) Non-retryable errors are rare and don't matter + - B) Retrying a permanent error wastes processing capacity and delays handling of valid messages — fast-failing to DLQ preserves pipeline **throughput** and enables rapid human intervention + - C) The DLQ can fix the error automatically + - D) Non-retryable errors eventually succeed after enough retries --- diff --git a/EnterpriseIntegrationPlatform/tutorials/25-dead-letter-queue.md b/EnterpriseIntegrationPlatform/tutorials/25-dead-letter-queue.md index 53be478..1db53c0 100644 --- a/EnterpriseIntegrationPlatform/tutorials/25-dead-letter-queue.md +++ b/EnterpriseIntegrationPlatform/tutorials/25-dead-letter-queue.md @@ -108,7 +108,8 @@ public sealed class MessageExpirationChecker : IMessageExpirationChecker await _deadLetterPublisher.PublishAsync( envelope, DeadLetterReason.MessageExpired, - $"Message expired at {envelope.ExpiresAt.Value:O}.", 0, cancellationToken); + $"Message expired at {envelope.ExpiresAt.Value:O}. Current time: {now:O}.", + 0, cancellationToken); return true; } } @@ -130,13 +131,64 @@ Dead-lettering is the **last resort** — it runs only after all retries are exh --- -## Exercises +## Lab -1. A message fails validation (`DeadLetterReason.ValidationFailed`). An operator fixes the schema and wants to reprocess it. Describe the replay flow through the Admin API. +**Objective:** Trace the Dead Letter Queue lifecycle from failure to replay, analyze how the DLQ preserves **zero message loss atomicity**, and design an operational replay workflow. -2. A message has `ExpiresAt = 2024-01-15T10:00:00Z` and the current time is `2024-01-15T10:00:01Z`. Trace the path through `MessageExpirationChecker` and `IDeadLetterPublisher`. +### Step 1: Trace an Expired Message to the DLQ -3. Why does the platform preserve the **complete original envelope** in `DeadLetterEnvelope` rather than just the error details? What operational benefit does this provide? +A message has `ExpiresAt = 2024-01-15T10:00:00Z` and the current time is `2024-01-15T10:00:01Z`. Open `src/Processing.DeadLetter/MessageExpirationChecker.cs` and trace: + +1. `CheckAndRouteIfExpiredAsync` detects expiration — what `DeadLetterReason` is used? +2. What information is logged? (hint: expiry time and current time) +3. Where does the complete original envelope end up? + +Verify that the **entire original envelope** is preserved in `DeadLetterEnvelope` — not just error details. + +### Step 2: Design an Operational Replay Workflow + +A message fails validation (`DeadLetterReason.ValidationFailed`). An operator fixes the downstream schema. Design the replay flow: + +``` +1. Operator queries DLQ via Admin API: GET /api/deadletter?reason=ValidationFailed +2. Operator reviews the original envelope and error details +3. Operator triggers replay: POST /api/deadletter/{id}/replay +4. Platform re-publishes the original envelope to its original topic +5. Message re-enters the pipeline from the beginning +``` + +What **atomicity** guarantees must the replay provide? (hint: replay must either fully re-publish or fail cleanly — no partial replays) + +### Step 3: Categorize DLQ Reasons and Operational Response + +| DLQ Reason | Cause | Operational Response | Can Auto-Replay? | +|-----------|-------|---------------------|-------------------| +| `MessageExpired` | TTL exceeded | Review TTL settings | No — stale data | +| `ValidationFailed` | Schema mismatch | Fix schema → replay | Yes | +| `MaxRetriesExceeded` | Transient failures | Investigate root cause → replay | Maybe | +| `PermanentFailure` | Non-retryable error | Manual intervention | No | + +Why is preserving the complete original envelope critical for DLQ operations? What would an operator lose if only the error message was stored? + +## Exam + +1. Why does the platform preserve the **complete original envelope** in the Dead Letter Queue? + - A) It's a storage requirement of the broker + - B) The original envelope enables accurate replay — operators can inspect the exact payload, metadata, and headers that caused the failure, and re-publish it unchanged for reprocessing after fixing the root cause + - C) The envelope is needed for deduplication + - D) Only the error details are stored + +2. How does the DLQ pattern ensure **zero message loss** in the integration platform? + - A) The DLQ stores messages in memory for fast retrieval + - B) Every message that cannot be processed successfully — whether due to expiration, validation failure, or exhausted retries — is routed to the DLQ rather than being silently dropped, ensuring nothing is ever lost + - C) The broker prevents message deletion + - D) Messages are automatically retried from the DLQ every minute + +3. What **atomicity** guarantee must a DLQ replay operation provide? + - A) The replay can be partial — some fields are replayed while others are skipped + - B) The replay must either fully re-publish the original message to its target topic or fail cleanly — partial replays could cause duplicate processing or data corruption + - C) The DLQ entry must be deleted before replay + - D) Replay is only possible within 24 hours of the original failure --- diff --git a/EnterpriseIntegrationPlatform/tutorials/26-message-replay.md b/EnterpriseIntegrationPlatform/tutorials/26-message-replay.md index 842ad39..da9eb9c 100644 --- a/EnterpriseIntegrationPlatform/tutorials/26-message-replay.md +++ b/EnterpriseIntegrationPlatform/tutorials/26-message-replay.md @@ -107,13 +107,68 @@ Replay re-publishes messages to the **same ingress topic** they originally enter --- -## Exercises +## Lab -1. An operator discovers a bug in the content enricher that corrupted messages between 09:00 and 09:30 UTC. Write the `ReplayFilter` to target only those messages using `FromTimestamp` and `ToTimestamp`. +**Objective:** Design a message replay operation for a production incident, analyze how the `ReplayId` header prevents duplicate processing, and evaluate replay store **scalability** requirements. -2. Why does the platform inject a `ReplayId` header instead of simply re-publishing the original message unchanged? What problems could occur without it? +### Step 1: Design a Time-Window Replay -3. Describe what a production `IMessageReplayStore` implementation would need to handle 10 million messages per day efficiently. +An operator discovers a bug in the content enricher that corrupted messages between 09:00 and 09:30 UTC on January 15th. Write the `ReplayFilter`: + +```csharp +var filter = new ReplayFilter +{ + FromTimestamp = DateTimeOffset.Parse("2024-01-15T09:00:00Z"), + ToTimestamp = DateTimeOffset.Parse("2024-01-15T09:30:00Z"), + Topic = "eip.orders.enriched" +}; +``` + +Open `src/Processing.Replay/MessageReplayer.cs` and trace: How does the replayer iterate over stored messages? What happens to messages that don't match the filter? + +### Step 2: Analyze the ReplayId Header for Atomicity + +The platform injects a `ReplayId` header into replayed messages. Explain why: + +1. Without `ReplayId` — downstream consumers process the message as if it's new → **duplicate side effects** (e.g., double billing) +2. With `ReplayId` — consumers can detect replays and apply **idempotent** processing +3. How does `ReplayId` interact with `MessageId`? (the original `MessageId` is preserved for correlation) + +Design a consumer that checks for `ReplayId` and skips already-processed messages using a deduplication store. + +### Step 3: Evaluate Replay Store Scalability + +A production system processes 10 million messages/day. Design the replay store requirements: + +| Requirement | Value | Justification | +|------------|-------|---------------| +| Storage per message | ~2KB (envelope) | Full envelope for accurate replay | +| Daily storage | ~20GB | 10M × 2KB | +| Retention period | 30 days | Regulatory and operational needs | +| Total storage | ~600GB | 30 × 20GB | +| Query performance | < 100ms for time-range | Fast incident response | + +What storage technology would you recommend? (hint: time-series databases, object storage with indexing) + +## Exam + +1. Why does the platform inject a `ReplayId` header instead of re-publishing the original message unchanged? + - A) `ReplayId` improves serialization performance + - B) Without `ReplayId`, downstream consumers cannot distinguish replayed messages from new ones — leading to duplicate side effects like double billing; the header enables idempotent replay processing + - C) The broker requires unique headers for each publish + - D) `ReplayId` replaces the original `MessageId` + +2. What **atomicity** guarantee must a replay operation provide? + - A) All replayed messages must succeed or the entire replay is rolled back + - B) Each replayed message is independently atomic — if message 500 of 1000 fails, the first 499 are committed and 500+ can be retried; the `ReplayId` prevents duplicates from the successful ones + - C) Replay operations are fire-and-forget with no guarantees + - D) The entire replay must complete within a single database transaction + +3. How does time-range filtering in replay operations support **operational scalability**? + - A) Time filtering is faster than content filtering + - B) Operators can target a precise incident window instead of replaying all messages — this minimizes unnecessary reprocessing and downstream load during recovery + - C) Time ranges are required by the message broker + - D) Filtering has no impact on replay performance --- diff --git a/EnterpriseIntegrationPlatform/tutorials/27-resequencer.md b/EnterpriseIntegrationPlatform/tutorials/27-resequencer.md index ae511a8..e350754 100644 --- a/EnterpriseIntegrationPlatform/tutorials/27-resequencer.md +++ b/EnterpriseIntegrationPlatform/tutorials/27-resequencer.md @@ -91,13 +91,59 @@ Messages are **Acked only after successful release** to the downstream topic. If --- -## Exercises +## Lab -1. Three messages arrive for `CorrelationId = "order-42"` in this order: #3, #1, #2. Trace the calls to `Accept` and describe the return value for each call. +**Objective:** Trace the Resequencer's buffering and release logic, analyze ordering guarantees for **atomic** batch processing, and design for partition-aware scaling. -2. A sequence has messages #1, #2, #4 buffered and `ReleaseTimeout` fires. Describe what `ReleaseOnTimeout` returns and what happens to the gap at #3. +### Step 1: Trace Out-of-Order Arrival -3. Why must all messages for a `CorrelationId` be routed to the same resequencer instance? What broker feature enables this? +Three messages arrive for `CorrelationId = "order-42"` in this order: #3, #1, #2. Open `src/Processing.Resequencer/` and trace each `Accept` call: + +| Arrival | SequenceNumber | Buffered? | Released? | Why? | +|---------|---------------|-----------|-----------|------| +| 1st | 3 | Yes | No | Waiting for #1 | +| 2nd | 1 | — | Released: #1 | Next expected | +| 3rd | 2 | — | Released: #2, then #3 | Completes the sequence | + +Verify your trace against the actual implementation. + +### Step 2: Handle Gaps with Timeout + +A sequence has messages #1, #2, #4 buffered, but #3 never arrives. After `ReleaseTimeout` fires: + +1. What does `ReleaseOnTimeout` return? (hint: #1 and #2 are released, #4 is released with a gap marker) +2. Is the gap reported for downstream awareness? +3. How does this design prevent indefinite buffering — critical for **system scalability** under high message volumes? + +Design an alerting strategy for gap detection: when should the operations team be notified? + +### Step 3: Partition-Aware Resequencing + +All messages for a `CorrelationId` must be routed to the same resequencer instance. Explain: + +- What broker feature enables this? (hint: Kafka partition keys, NATS subject-based routing) +- What happens if messages for the same `CorrelationId` land on different resequencer instances? +- How does partition-key routing enable **horizontal scaling** — each instance handles a subset of `CorrelationId`s independently? + +## Exam + +1. Why must all messages for a `CorrelationId` be routed to the **same** resequencer instance? + - A) Any instance can resequence any `CorrelationId` + - B) The resequencer maintains an ordered buffer per `CorrelationId` — if messages are split across instances, no single instance has the complete picture to determine correct ordering + - C) The broker automatically routes messages to the correct instance + - D) Resequencing doesn't require instance affinity + +2. How does the `ReleaseTimeout` prevent unbounded resource consumption? + - A) It deletes messages older than the timeout + - B) Without a timeout, a missing sequence number would cause all subsequent messages to buffer indefinitely — the timeout releases buffered messages with gap markers, preventing memory growth proportional to undelivered messages + - C) Timeouts are only needed in development + - D) The timeout reduces message processing latency + +3. How does partition-key routing enable **scalable** resequencing? + - A) All messages go to a single instance for global ordering + - B) Each resequencer instance handles a subset of `CorrelationId`s — adding instances distributes the load linearly, with no cross-instance coordination needed for ordering within each group + - C) Partition keys are only used for Kafka + - D) Routing is handled by the resequencer itself --- diff --git a/EnterpriseIntegrationPlatform/tutorials/28-competing-consumers.md b/EnterpriseIntegrationPlatform/tutorials/28-competing-consumers.md index ddc6781..a8ac327 100644 --- a/EnterpriseIntegrationPlatform/tutorials/28-competing-consumers.md +++ b/EnterpriseIntegrationPlatform/tutorials/28-competing-consumers.md @@ -48,23 +48,72 @@ public sealed class CompetingConsumerOrchestrator : BackgroundService { while (!stoppingToken.IsCancellationRequested) { - var lagInfo = await _lagMonitor.GetLagAsync( - _options.TargetTopic, _options.ConsumerGroup, stoppingToken); + try + { + await EvaluateAndScaleAsync(stoppingToken); + } + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) + { + break; + } + catch (Exception ex) + { + _logger.LogError(ex, "Error during competing consumer orchestration cycle"); + } + + await Task.Delay( + TimeSpan.FromMilliseconds(_options.CooldownMs), _timeProvider, stoppingToken); + } + } + + internal async Task EvaluateAndScaleAsync(CancellationToken cancellationToken) + { + var lagInfo = await _lagMonitor.GetLagAsync( + _options.TargetTopic, _options.ConsumerGroup, cancellationToken); - if (lagInfo.CurrentLag >= _options.ScaleUpThreshold) - await _scaler.ScaleAsync( - Math.Min(_scaler.CurrentCount + 1, _options.MaxConsumers), stoppingToken); - else if (lagInfo.CurrentLag <= _options.ScaleDownThreshold - && _scaler.CurrentCount > _options.MinConsumers) - await _scaler.ScaleAsync(_scaler.CurrentCount - 1, stoppingToken); + var currentCount = _scaler.CurrentCount; + var now = _timeProvider.GetUtcNow(); + var cooldown = TimeSpan.FromMilliseconds(_options.CooldownMs); - await Task.Delay(_options.CooldownMs, stoppingToken); + if (lagInfo.CurrentLag >= _options.ScaleUpThreshold) + { + if (currentCount >= _options.MaxConsumers) + { + _backpressure.Signal(); // signal backpressure when at capacity + return; + } + + _backpressure.Release(); + if ((now - _lastScaleTime) < cooldown) return; // cooldown guard + + var desired = Math.Min(currentCount + 1, _options.MaxConsumers); + await _scaler.ScaleAsync(desired, cancellationToken); + _lastScaleTime = now; + } + else if (lagInfo.CurrentLag <= _options.ScaleDownThreshold) + { + _backpressure.Release(); + if (currentCount <= _options.MinConsumers) return; + if (_backpressure.IsBackpressured) return; // pause scale-down under backpressure + if ((now - _lastScaleTime) < cooldown) return; + + var desired = Math.Max(currentCount - 1, _options.MinConsumers); + await _scaler.ScaleAsync(desired, cancellationToken); + _lastScaleTime = now; + } + else + { + _backpressure.Release(); } } } ``` -The orchestrator runs as a hosted `BackgroundService`. On each evaluation cycle it reads the consumer lag via `GetLagAsync`, compares against thresholds, and calls `ScaleAsync` with the desired consumer count. +The orchestrator runs as a hosted `BackgroundService`. On each evaluation cycle it reads the consumer lag via `GetLagAsync`, compares against thresholds, and calls `ScaleAsync` with the desired consumer count. Key features: + +- **Backpressure signaling** — when at max capacity, signals backpressure to upstream producers +- **Cooldown guard** — prevents scaling flapping with a configurable cooldown period +- **Backpressure-aware scale-down** — won't scale down while backpressure is active ### IConsumerLagMonitor @@ -144,13 +193,65 @@ Each consumer processes messages independently and Acks them individually. If a --- -## Exercises +## Lab + +**Objective:** Trace the auto-scaling orchestrator with backpressure signaling, analyze cooldown to prevent scaling flap, and design a production backpressure integration. + +### Step 1: Trace the Scaling Decision Path + +A topic has 8 partitions, `MaxConsumers = 12`, and current consumer lag is 5,000. Open `src/Processing.CompetingConsumers/CompetingConsumerOrchestrator.cs` and trace `EvaluateAndScaleAsync`: + +1. Lag exceeds `ScaleUpThreshold` → what happens if current consumers = 8? +2. Lag exceeds threshold but `currentCount >= MaxConsumers` → what signal is emitted? +3. After scaling up, what prevents another scale-up in the next cycle? (hint: cooldown) + +Now: with `MaxConsumers = 12` and 8 Kafka partitions, what happens when the orchestrator scales to 9 consumers? (hint: one consumer will be idle — Kafka can't assign more consumers than partitions) + +### Step 2: Analyze Cooldown for Scaling Stability + +Consumer lag oscillates between 900 and 1100 with `ScaleUpThreshold = 1000`. Without cooldown: + +``` +Cycle 1: lag=1100 → scale up (3→4) +Cycle 2: lag=900 → scale down (4→3) +Cycle 3: lag=1100 → scale up (3→4) +... flapping forever +``` + +How does `CooldownMs` break this cycle? What is the relationship between cooldown duration and scaling stability? What value would you set for a production system? + +### Step 3: Design a Backpressure Integration + +When the consumer pool is at maximum capacity and lag keeps growing, the orchestrator signals backpressure. Design a system-wide response: + +| Component | Backpressure Action | +|-----------|-------------------| +| Gateway API | Return HTTP 429 to upstream senders | +| Ingestion producers | Pause or slow message publishing | +| Dashboard (OpenClaw) | Show backpressure warning to operators | +| Monitoring (OpenTelemetry) | Emit backpressure metrics and alerts | + +How does backpressure prevent **cascade failures** in a scalable system? What happens without it? + +## Exam -1. A topic has 8 partitions and `MaxConsumers = 12`. What happens when the orchestrator tries to scale beyond 8 consumers? Why is `MaxConsumers` still useful? +1. A topic has 8 partitions and the orchestrator scales to 12 consumers. What happens? + - A) All 12 consumers share the 8 partitions equally + - B) 8 consumers each get 1 partition; 4 consumers are idle — Kafka cannot assign more consumers than partitions in a consumer group; `MaxConsumers` should be set to match partition count + - C) The broker creates 4 additional partitions automatically + - D) The extra consumers process from a different topic -2. Consumer lag oscillates between 900 and 1100 with `ScaleUpThreshold = 1000`. Without `CooldownMs`, what behavior would you observe? How does cooldown fix it? +2. Why is cooldown critical for **scalable** auto-scaling? + - A) Cooldown reduces memory usage + - B) Without cooldown, oscillating lag near the threshold causes rapid scale-up/scale-down flapping — cooldown ensures each scaling decision has time to take effect before the next evaluation, preventing resource waste and instability + - C) Cooldown is only needed during maintenance windows + - D) The broker enforces cooldown automatically -3. Design an `IBackpressureSignal` integration that returns HTTP 429 from the Gateway API when backpressure is active. +3. How does backpressure signaling maintain **system-level atomicity** under overload? + - A) Backpressure drops excess messages to protect the system + - B) Backpressure slows or pauses upstream producers — this prevents message accumulation that would exceed processing capacity, ensuring every accepted message can be processed atomically rather than overwhelming the pipeline + - C) Backpressure increases consumer count beyond the maximum + - D) Backpressure is only relevant for batch processing --- diff --git a/EnterpriseIntegrationPlatform/tutorials/29-throttle-rate-limiting.md b/EnterpriseIntegrationPlatform/tutorials/29-throttle-rate-limiting.md index 5e45b88..2eb2052 100644 --- a/EnterpriseIntegrationPlatform/tutorials/29-throttle-rate-limiting.md +++ b/EnterpriseIntegrationPlatform/tutorials/29-throttle-rate-limiting.md @@ -150,13 +150,65 @@ When `AcquireAsync` delays a message, the message remains **uncommitted** — no --- -## Exercises +## Lab -1. Design a `ThrottlePolicy` that allows a partner system to send 50 messages/second with bursts up to 200. Which `ThrottlePartitionKey` fields would you set? +**Objective:** Design throttle policies for multi-tenant rate limiting, trace the token bucket algorithm, and analyze why per-tenant throttling is essential for **fair scalability**. -2. The `TokenBucketThrottle` has 0 available tokens and a `MaxWait` of 5 seconds. A message arrives. Describe the sequence of events. +### Step 1: Design a Multi-Tenant Throttle Policy -3. Explain why the platform uses per-`TenantId` throttling by default for multi-tenant deployments rather than a single global throttle. +Design a `ThrottlePolicy` for a partner system: + +| Parameter | Value | Purpose | +|-----------|-------|---------| +| Rate | 50 messages/second | Sustained throughput | +| Burst | 200 messages | Peak absorption | +| `PartitionKey.TenantId` | `"partner-x"` | Per-tenant isolation | +| `MaxWait` | 5 seconds | Max time to wait for a token | + +Open `src/Processing.Throttle/` and verify: How does the `TokenBucketThrottle` implement this? What happens when all 200 burst tokens are consumed? + +### Step 2: Trace Token Exhaustion + +The `TokenBucketThrottle` has 0 available tokens and `MaxWait = 5s`. A message arrives: + +1. The throttle checks: 0 tokens available +2. Waits for token replenishment (50 tokens/second = 1 token every 20ms) +3. After ~20ms, 1 token becomes available → message proceeds +4. If no token is available after 5 seconds → what happens? + +What is the maximum queuing depth during a sustained burst? How does `MaxWait` prevent unbounded queue growth? + +### Step 3: Analyze Per-Tenant vs. Global Throttling + +Why does the platform use per-`TenantId` throttling by default? + +| Scenario | Global Throttle | Per-Tenant Throttle | +|----------|----------------|-------------------| +| Tenant A sends 10,000 msg/s | Blocks Tenant B too | Only Tenant A is throttled | +| Tenant B sends 10 msg/s | May be blocked by A | Always gets through | +| Fair resource allocation | No guarantee | Each tenant gets its quota | + +How does per-tenant throttling prevent the **noisy neighbor** problem? Why is this critical for **multi-tenant scalability**? + +## Exam + +1. A token bucket with rate=100/s and burst=500 receives 600 messages in 1 second. What happens? + - A) All 600 messages are processed immediately + - B) The first 500 are processed from the burst allowance; the remaining 100 wait for token replenishment at 100/s — after 1 second, all 600 have been processed; messages beyond capacity wait up to `MaxWait` before being rejected + - C) All 600 messages are rejected + - D) The burst limit is increased automatically + +2. Why is per-tenant throttling essential for **multi-tenant scalability**? + - A) Per-tenant throttling uses less memory + - B) Without per-tenant isolation, one tenant's traffic spike would exhaust the global rate limit and block all other tenants — the noisy neighbor problem; per-tenant throttling ensures fair resource allocation + - C) The broker requires per-tenant configuration + - D) Global throttling is always preferable for simplicity + +3. What happens when a message exceeds the `MaxWait` timeout in the throttle? + - A) The message is processed anyway + - B) The message is rejected with an appropriate error — this prevents unbounded queue growth and provides backpressure to the upstream sender, maintaining system stability under sustained overload + - C) The throttle increases its rate automatically + - D) The message is routed to a different topic --- diff --git a/EnterpriseIntegrationPlatform/tutorials/30-rule-engine.md b/EnterpriseIntegrationPlatform/tutorials/30-rule-engine.md index f748d8e..aa55bfb 100644 --- a/EnterpriseIntegrationPlatform/tutorials/30-rule-engine.md +++ b/EnterpriseIntegrationPlatform/tutorials/30-rule-engine.md @@ -141,13 +141,70 @@ Rule evaluation happens **within the pipeline transaction**. If the selected act --- -## Exercises +## Lab -1. Write a `BusinessRule` that routes all messages from source `"PartnerX"` with `MessageType` containing `"order"` to topic `"orders-priority"`. +**Objective:** Write business rules with conditions and logic operators, trace priority-based evaluation, and analyze rule caching for **scalable** high-throughput routing decisions. -2. A rule has `LogicOperator = RuleLogicOperator.Or` with two conditions. Explain how evaluation differs from `And`. +### Step 1: Write a Priority-Based Business Rule -3. Why does the platform evaluate rules in priority order and stop at the first match rather than evaluating all rules? +Write a `BusinessRule` that routes all messages from source `"PartnerX"` with `MessageType` containing `"order"` to topic `"orders-priority"`: + +```csharp +var rule = new BusinessRule +{ + Name = "PartnerX-Orders", + Priority = 1, + LogicOperator = RuleLogicOperator.And, + Conditions = [ + new RuleCondition { FieldName = "Source", Operator = RuleConditionOperator.Equals, Value = "PartnerX" }, + new RuleCondition { FieldName = "MessageType", Operator = RuleConditionOperator.Contains, Value = "order" } + ], + OutputTopic = "orders-priority" +}; +``` + +Open `src/RuleEngine/BusinessRuleEngine.cs` and trace: How does `And` vs. `Or` logic change the evaluation? + +### Step 2: Trace Priority-Based Evaluation + +Rules are evaluated in priority order (lowest number = highest priority): + +| Priority | Rule | Conditions | +|----------|------|-----------| +| 1 | Premium orders | Source = "PartnerX" AND Type contains "order" | +| 5 | All orders | Type contains "order" | +| 10 | Default | Always matches | + +A message from `PartnerX` with type `"order.created"` matches rules at priorities 1 and 5. Which rule wins? Why does the engine stop at the first match? (hint: deterministic routing for **atomicity**) + +### Step 3: Design Rule Caching for Scalability + +At 50,000 messages/second with 100 rules, each message evaluates up to 100 conditions. Design a caching strategy: + +- Rules change infrequently (hourly) but messages arrive constantly +- How does the platform cache compiled rules? (Open `src/RuleEngine/` to check) +- What is the cache invalidation strategy when rules are updated? +- What is the performance difference between cached vs. uncached rule evaluation? + +## Exam + +1. A rule engine has 3 rules with priorities 1, 5, 10. A message matches rules at priorities 5 and 10. Which rule is applied? + - A) Both rules are applied (fan-out) + - B) Priority 5 — the engine evaluates in priority order and stops at the first match, ensuring deterministic and **atomic** routing to exactly one destination + - C) Priority 10 — the last match wins + - D) The engine randomly selects one + +2. Why does the rule engine use `And`/`Or` logic operators for conditions? + - A) They're required by the .NET compiler + - B) `And` requires all conditions to match (strict targeting); `Or` requires any condition to match (broad targeting) — this enables both precise and flexible routing rules for different business scenarios + - C) Logic operators improve serialization performance + - D) They're equivalent — both produce the same result + +3. How does rule caching improve **throughput scalability**? + - A) Caching stores message results, not rules + - B) Compiled rules are cached in memory — avoiding repeated parsing and compilation of rule definitions for every message; since rules change infrequently but messages arrive at high volume, caching amortizes the compilation cost over millions of evaluations + - C) Caching is only useful during testing + - D) Rules are too small to benefit from caching --- diff --git a/EnterpriseIntegrationPlatform/tutorials/31-event-sourcing.md b/EnterpriseIntegrationPlatform/tutorials/31-event-sourcing.md index 5fc5891..4ca2670 100644 --- a/EnterpriseIntegrationPlatform/tutorials/31-event-sourcing.md +++ b/EnterpriseIntegrationPlatform/tutorials/31-event-sourcing.md @@ -148,13 +148,63 @@ Optimistic concurrency ensures **consistency without locks**. The `expectedVersi --- -## Exercises +## Lab -1. An aggregate has 10,000 events. Without snapshots, what is the cost of reconstructing current state? With a snapshot at version 9,900? +**Objective:** Analyze event sourcing's append-only model for **audit-complete atomicity**, trace optimistic concurrency conflict resolution, and design snapshot strategies for **scalable** aggregate reconstruction. -2. Two commands arrive simultaneously for the same stream at version 5. Both expect version 5. Trace the optimistic concurrency flow. +### Step 1: Calculate Aggregate Reconstruction Cost -3. Use `TemporalQuery.ReplayToPointInTimeAsync` to reconstruct an order aggregate's state as of yesterday at noon. What parameters do you need to supply? +An aggregate has 10,000 events. Compare reconstruction approaches: + +| Approach | Events Replayed | Cost | Time (est.) | +|----------|----------------|------|-------------| +| Full replay (no snapshots) | 10,000 | High CPU + memory | ~100ms | +| Snapshot at version 9,900 | 100 | Low | ~1ms | +| Snapshot at version 9,999 | 1 | Minimal | ~0.1ms | + +Open `src/EventSourcing/` and trace: How does the event store load a snapshot, then replay only subsequent events? What is the **scalability** trade-off between snapshot frequency and storage cost? + +### Step 2: Trace Optimistic Concurrency Conflict + +Two commands arrive simultaneously for the same stream at version 5. Both expect version 5: + +``` +Command A: Append event at version 5 → succeeds (stream now at version 6) +Command B: Append event at version 5 → CONFLICT (expected 5, actual 6) +``` + +Trace the conflict resolution: +1. What exception is thrown? +2. Does Command B retry? With what strategy? +3. How does optimistic concurrency ensure **atomic** state transitions without distributed locks? + +### Step 3: Design a Temporal Query for Audit + +Use `TemporalQuery.ReplayToPointInTimeAsync` to reconstruct an order aggregate's state as of yesterday at noon: + +- What parameters do you supply? (stream ID, point-in-time) +- How does this differ from loading current state? +- Why is this capability essential for **regulatory compliance** and audit trails? + +## Exam + +1. Why does event sourcing use an append-only log rather than mutable state updates? + - A) Append-only is faster for write operations + - B) Every state change is permanently recorded as an immutable event — this provides a complete audit trail, enables temporal queries (reconstructing past state), and guarantees **atomic** state transitions through optimistic concurrency + - C) Databases don't support mutable updates + - D) Append-only reduces storage costs + +2. How does optimistic concurrency prevent **atomicity** violations in concurrent event sourcing? + - A) It uses distributed locks to prevent concurrent access + - B) Each append specifies the expected version — if another command modified the stream first, the version mismatch is detected and the second command fails cleanly, ensuring only one writer succeeds per state transition + - C) Events are automatically merged when conflicts occur + - D) The event store queues concurrent commands + +3. How do snapshots improve **aggregate reconstruction scalability**? + - A) Snapshots reduce the number of events stored + - B) A snapshot captures aggregate state at a point in time — reconstruction replays only events after the snapshot instead of the entire history, reducing reconstruction time from O(N) to O(recent events) + - C) Snapshots are required by the event store + - D) Snapshots improve write performance --- diff --git a/EnterpriseIntegrationPlatform/tutorials/32-multi-tenancy.md b/EnterpriseIntegrationPlatform/tutorials/32-multi-tenancy.md index 39d16a2..fc2a5e2 100644 --- a/EnterpriseIntegrationPlatform/tutorials/32-multi-tenancy.md +++ b/EnterpriseIntegrationPlatform/tutorials/32-multi-tenancy.md @@ -61,10 +61,10 @@ public sealed class TenantContext public string? TenantName { get; init; } public bool IsResolved { get; init; } - public static TenantContext Anonymous => new() + public static readonly TenantContext Anonymous = new() { TenantId = "anonymous", - IsResolved = false + IsResolved = false, }; } ``` @@ -136,13 +136,62 @@ The isolation guard runs **before any processing** — a cross-tenant message is --- -## Exercises +## Lab -1. A message arrives with `X-Tenant-Id: tenant-a` but the JWT claim says `tenant-b`. How should the resolver handle this conflict? +**Objective:** Trace tenant resolution and isolation enforcement, design the onboarding resource provisioning pipeline, and analyze why tenant isolation is non-negotiable for **multi-tenant scalability**. -2. Describe the self-service flow when a new tenant onboards: what resources are provisioned and in what order? +### Step 1: Resolve a Tenant Identity Conflict -3. Why is `TenantIsolationException` non-retryable? Under what circumstances could a cross-tenant message be legitimate? +A message arrives with `X-Tenant-Id: tenant-a` but the JWT claim says `tenant-b`. Open `src/MultiTenancy/` and trace: + +1. How does the tenant resolver prioritize these conflicting signals? +2. Should the resolver trust the header or the JWT? (hint: JWT is cryptographically signed) +3. What exception is thrown for the conflict? Is it retryable? + +Design a resolution policy: When is a conflict legitimate (e.g., admin impersonation) vs. a security violation? + +### Step 2: Design the Onboarding Pipeline + +When a new tenant onboards, trace the self-service provisioning flow: + +| Step | Resource | Class | Atomic? | +|------|----------|-------|---------| +| 1 | Create tenant record | `InMemoryTenantOnboardingService` | Yes | +| 2 | Provision broker namespace | `InMemoryBrokerNamespaceProvisioner` | Yes | +| 3 | Set quota limits | `InMemoryTenantQuotaManager` | Yes | +| 4 | Initialize configuration | ConfigurationStore | Yes | + +If Step 3 fails, what compensation is needed for Steps 1-2? How does this relate to the Saga pattern? + +### Step 3: Analyze Tenant Isolation for Scalability + +| Without Isolation | With Isolation | +|------------------|----------------| +| Tenant A's traffic spike affects Tenant B | Each tenant has dedicated queues and quotas | +| One tenant's DLQ overflow blocks all tenants | Isolated DLQ per tenant | +| Security breach in one tenant exposes all | `TenantIsolationGuard` prevents cross-tenant access | + +Why is `TenantIsolationException` non-retryable? Under what circumstances could a cross-tenant message be legitimate? + +## Exam + +1. Why must tenant resolution trust JWT claims over HTTP headers? + - A) HTTP headers are faster to parse + - B) JWTs are cryptographically signed and cannot be forged by the caller — headers can be spoofed; trusting unsigned headers would allow any caller to impersonate any tenant, violating isolation + - C) The broker requires JWT tokens + - D) Headers don't support tenant identifiers + +2. Why is `TenantIsolationException` non-retryable? + - A) Retries would succeed with different credentials + - B) A cross-tenant access attempt is a security violation — retrying won't change the tenant identity; it must be investigated as a potential breach, not automatically retried + - C) The exception is transient and self-healing + - D) Non-retryable exceptions are faster to process + +3. How does per-tenant resource provisioning enable **horizontal scalability**? + - A) All tenants share a single resource pool + - B) Each tenant gets isolated broker namespaces and quotas — adding tenants doesn't affect existing tenants' performance, and each tenant's resources can be independently scaled based on their usage patterns + - C) Resource provisioning is only needed for premium tenants + - D) The broker automatically provisions resources --- diff --git a/EnterpriseIntegrationPlatform/tutorials/33-security.md b/EnterpriseIntegrationPlatform/tutorials/33-security.md index 18c4bda..b693a05 100644 --- a/EnterpriseIntegrationPlatform/tutorials/33-security.md +++ b/EnterpriseIntegrationPlatform/tutorials/33-security.md @@ -145,13 +145,63 @@ Sanitization runs **before the message is Acked**. Callers can use `IsClean` to --- -## Exercises +## Lab -1. A payload contains `` embedded in a JSON string value. Describe how the sanitizer detects and removes it while preserving valid JSON structure. +**Objective:** Trace the input sanitization pipeline, analyze how defense-in-depth protects **message atomicity** from injection attacks, and evaluate secret management for **scalable** multi-environment deployments. -2. Why does the platform use a separate `IPayloadSizeGuard` instead of checking size inside `IInputSanitizer`? +### Step 1: Trace XSS Sanitization -3. Compare `AzureKeyVaultSecretProvider` and `VaultSecretProvider`. When would you choose one over the other? +A payload contains `` embedded in a JSON string value. Open `src/Security/InputSanitizer.cs` and trace: + +1. How does the sanitizer detect the `