Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion EnterpriseIntegrationPlatform/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ EnterpriseIntegrationPlatform/
│ ├── Processing.Retry/ # Retry framework with exponential backoff
│ ├── Processing.Replay/ # Replay failed/DLQ messages
│ ├── Processing.Throttle/ # Token-bucket throttle with per-tenant partitioning
│ ├── Processing.Dispatcher/ # EIP: Message Dispatcher & Service Activator
│ ├── Processing.RequestReply/ # EIP: Request-Reply correlator
│ ├── Processing.Resequencer/ # EIP: Resequencer — reorder out-of-sequence messages
│ ├── RuleEngine/ # Business rule evaluation (conditions, AND/OR, actions)
│ ├── EventSourcing/ # Event store, snapshots, projection engine
│ ├── Connector.Http/ # HTTP connector (EIP: Channel Adapter)
Expand All @@ -130,7 +133,10 @@ EnterpriseIntegrationPlatform/
│ ├── Observability/ # Lifecycle recording, Loki storage, OpenClaw API
│ ├── AI.Ollama/ # Ollama AI integration
│ ├── AI.RagFlow/ # RagFlow RAG client
│ ├── AI.RagKnowledge/ # RAG knowledge base parser & query matcher
│ ├── SystemManagement/ # EIP: Control Bus, Message Store, Smart Proxy, Test Message
│ ├── OpenClaw.Web/ # "Where is my message?" web UI & RAG knowledge API
│ ├── Admin.Web/ # Vue 3 admin dashboard (proxies to Admin.Api)
│ ├── Gateway.Api/ # API gateway (EIP: Messaging Gateway)
│ ├── Admin.Api/ # Administration REST API (EIP: Control Bus)
│ └── Demo.Pipeline/ # End-to-end demo pipeline
Expand All @@ -139,7 +145,7 @@ EnterpriseIntegrationPlatform/
│ ├── ContractTests/ # Contract verification tests (29 tests)
│ ├── WorkflowTests/ # Temporal workflow tests (24 tests)
│ ├── IntegrationTests/ # Testcontainers-based integration tests (17 tests)
│ ├── PlaywrightTests/ # End-to-end browser tests for OpenClaw UI (13 tests)
│ ├── PlaywrightTests/ # End-to-end browser tests for Admin dashboard & OpenClaw UI (24 tests)
│ └── LoadTests/ # Performance and load tests (10 tests)
├── docs/ # Architecture, ADRs, runbooks, and design docs
├── deploy/ # Helm charts, Kustomize overlays, K8s manifests
Expand Down
79 changes: 49 additions & 30 deletions EnterpriseIntegrationPlatform/docs/developer-setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,41 +129,60 @@ Install these extensions:
```
EnterpriseIntegrationPlatform/
├── src/
│ ├── AppHost/ # .NET Aspire orchestrator
│ ├── ServiceDefaults/ # Shared service configuration (OpenTelemetry, health checks)
│ ├── Contracts/ # Shared message contracts and interfaces
│ ├── Ingestion/ # Broker abstraction (IMessageBrokerProducer/Consumer)
│ ├── Ingestion.Kafka/ # Kafka streaming provider
│ ├── Ingestion.Nats/ # NATS JetStream provider (default)
│ ├── Ingestion.Pulsar/ # Apache Pulsar Key_Shared provider
│ ├── Workflow.Temporal/ # Temporal workflow worker
│ ├── Activities/ # Workflow activity implementations
│ ├── Processing.Routing/ # Content-based routing logic
│ ├── Processing.Translator/ # Message transformation logic
│ ├── Processing.Splitter/ # Message splitter
│ ├── Processing.Aggregator/ # Message aggregator
│ ├── Processing.DeadLetter/ # Dead letter queue management
│ ├── Processing.Retry/ # Retry framework
│ ├── Processing.Replay/ # Replay framework
│ ├── Connector.Http/ # HTTP connector
│ ├── Connector.Sftp/ # SFTP connector
│ ├── Connector.Email/ # Email connector
│ ├── Connector.File/ # File connector
│ ├── Storage.Cassandra/ # Cassandra data access layer
│ ├── Security/ # Input sanitization, payload guards, encryption
│ ├── MultiTenancy/ # Tenant resolution and isolation
│ ├── AI.Ollama/ # Ollama AI integration
│ ├── AI.RagFlow/ # RagFlow RAG client
│ ├── Observability/ # OpenTelemetry configuration
│ ├── OpenClaw.Web/ # "Where is my message?" web UI & RAG knowledge API
│ ├── Admin.Api/ # Administration REST API
│ └── Demo.Pipeline/ # End-to-end demo pipeline
│ ├── AppHost/ # .NET Aspire orchestrator
│ ├── ServiceDefaults/ # Shared service configuration (OpenTelemetry, health checks)
│ ├── Contracts/ # Shared message contracts and interfaces
│ ├── Ingestion/ # Broker abstraction (IMessageBrokerProducer/Consumer)
│ ├── Ingestion.Kafka/ # Kafka streaming provider
│ ├── Ingestion.Nats/ # NATS JetStream provider (default)
│ ├── Ingestion.Pulsar/ # Apache Pulsar Key_Shared provider
│ ├── Workflow.Temporal/ # Temporal workflow worker
│ ├── Activities/ # Workflow activity implementations
│ ├── Processing.Routing/ # Content-based routing logic
│ ├── Processing.Translator/ # Message transformation logic
│ ├── Processing.Transform/ # Payload pipeline — JSON↔XML, regex, JSONPath
│ ├── Processing.Splitter/ # Message splitter
│ ├── Processing.Aggregator/ # Message aggregator
│ ├── Processing.ScatterGather/ # Scatter-Gather pattern
│ ├── Processing.CompetingConsumers/ # Competing Consumers with autoscaling
│ ├── Processing.DeadLetter/ # Dead letter queue management
│ ├── Processing.Retry/ # Retry framework
│ ├── Processing.Replay/ # Replay framework
│ ├── Processing.Throttle/ # Token-bucket throttle
│ ├── Processing.Dispatcher/ # Message Dispatcher & Service Activator
│ ├── Processing.RequestReply/ # Request-Reply correlator
│ ├── Processing.Resequencer/ # Resequencer — reorder out-of-sequence messages
│ ├── RuleEngine/ # Business rule evaluation
│ ├── EventSourcing/ # Event store, snapshots, projection engine
│ ├── Connector.Http/ # HTTP connector
│ ├── Connector.Sftp/ # SFTP connector
│ ├── Connector.Email/ # Email connector
│ ├── Connector.File/ # File connector
│ ├── Connectors/ # Unified connector registry & factory
│ ├── Storage.Cassandra/ # Cassandra data access layer
│ ├── Configuration/ # Dynamic config store, feature flags
│ ├── Security/ # Input sanitization, payload guards, encryption
│ ├── Security.Secrets/ # Secret providers (Azure KV, Vault), rotation
│ ├── MultiTenancy/ # Tenant resolution and isolation
│ ├── MultiTenancy.Onboarding/ # Self-service tenant provisioning & quotas
│ ├── DisasterRecovery/ # Failover, replication, RPO/RTO, DR drills
│ ├── Performance.Profiling/ # CPU/memory profiling, GC tuning, benchmarks
│ ├── Observability/ # Lifecycle recording, Loki storage, OpenClaw API
│ ├── AI.Ollama/ # Ollama AI integration
│ ├── AI.RagFlow/ # RagFlow RAG client
│ ├── AI.RagKnowledge/ # RAG knowledge base parser & query matcher
│ ├── SystemManagement/ # Control Bus, Message Store, Smart Proxy, Test Message
│ ├── OpenClaw.Web/ # "Where is my message?" web UI & RAG knowledge API
│ ├── Admin.Web/ # Vue 3 admin dashboard (proxies to Admin.Api)
│ ├── Gateway.Api/ # API gateway (Messaging Gateway)
│ ├── Admin.Api/ # Administration REST API
│ └── Demo.Pipeline/ # End-to-end demo pipeline
├── tests/
│ ├── UnitTests/ # Fast, isolated unit tests (402 tests)
│ ├── ContractTests/ # Contract verification tests (29 tests)
│ ├── WorkflowTests/ # Temporal workflow tests (24 tests)
│ ├── IntegrationTests/ # Testcontainers-based integration tests (17 tests)
│ ├── PlaywrightTests/ # End-to-end browser tests for OpenClaw UI (13 tests)
│ ├── PlaywrightTests/ # End-to-end browser tests for Admin dashboard & OpenClaw UI (24 tests)
│ └── LoadTests/ # Performance and load tests (5 tests)
├── docs/ # Architecture and design documentation
└── rules/ # Development standards and milestones
Expand Down
2 changes: 1 addition & 1 deletion EnterpriseIntegrationPlatform/tutorials/01-introduction.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ Every EIP pattern has a corresponding platform component:
| Content-Based Router | `IContentBasedRouter` | `src/Processing.Routing/` |
| Message Translator | `IMessageTranslator<TIn, TOut>` | `src/Processing.Translator/` |
| Splitter | `IMessageSplitter<T>` | `src/Processing.Splitter/` |
| Aggregator | `IMessageAggregator<TItem, TAgg>` | `src/Processing.Aggregator/` |
| Aggregator | `IMessageAggregator<TItem, TAggregate>` | `src/Processing.Aggregator/` |
| Dead Letter Channel | `IDeadLetterPublisher<T>` | `src/Processing.DeadLetter/` |
| Process Manager | Temporal Workflows | `src/Workflow.Temporal/` |
| Channel Adapter | `IConnector` | `src/Connector.*` |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ The test suite includes:
| ContractTests | Contract verification between services |
| WorkflowTests | Temporal workflow behavior tests |
| IntegrationTests | Testcontainers-based tests with real infrastructure |
| PlaywrightTests | End-to-end browser tests for OpenClaw UI |
| PlaywrightTests | End-to-end browser tests for Admin dashboard & OpenClaw UI |
| LoadTests | Performance and throughput benchmarks |

> **Note:** IntegrationTests and PlaywrightTests require Docker to be running.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,11 +261,11 @@ For each scenario, identify the correct EIP channel pattern and the platform cla
| Handling messages in XML or JSON from different partners | ? | ? |
| Quarantining messages with missing required fields | ? | ? |

Open `src/Processing.Channels/` and verify your answers against the actual implementations.
Open `src/Ingestion/Channels/` and verify your answers against the actual implementations.

### Step 2: Design a Messaging Bridge for Broker Migration

Your company uses Kafka for all integrations but wants to add NATS for new microservices. Using the `MessagingBridge` class in `src/Processing.Channels/`, design a bridge configuration that:
Your company uses Kafka for all integrations but wants to add NATS for new microservices. Using the `MessagingBridge` class in `src/Ingestion/Channels/`, design a bridge configuration that:

- Reads from Kafka topic `legacy.orders.created`
- Publishes to NATS subject `eip.orders.created`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,7 @@ Activities are the building blocks that workflows orchestrate. Each activity is
// src/Workflow.Temporal/Activities/IntegrationActivities.cs (simplified)
// Handles validation and processing-stage logging

[Activity]
public class IntegrationActivities
public sealed class IntegrationActivities
{
[Activity]
public async Task<MessageValidationResult> ValidateMessageAsync(
Expand All @@ -230,8 +229,7 @@ public class IntegrationActivities
// src/Workflow.Temporal/Activities/PipelineActivities.cs (simplified)
// Handles persistence, delivery status, acknowledgments, and faults

[Activity]
public class PipelineActivities
public sealed class PipelineActivities
{
[Activity]
public async Task PersistMessageAsync(IntegrationPipelineInput input)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ Open `src/Processing.Routing/ContentBasedRouter.cs`. Create a routing configurat
|----------|-------|----------|-------|-------------|
| 1 | `Payload.customer.tier` | Equals | `"platinum"` | `priority-processing` |
| 5 | `MessageType` | Equals | `"OrderCreated"` | `orders.standard` |
| 10 | `MessageType` | Matches | `"Return.*"` | `returns.processing` |
| 10 | `MessageType` | Regex | `"Return.*"` | `returns.processing` |
| 100 | (default) | — | — | `general.inbox` |

A message arrives with `MessageType = "OrderCreated"` and `Payload.customer.tier = "platinum"`. Which topic does it route to? Explain how priority ordering ensures deterministic routing.
Expand All @@ -127,7 +127,7 @@ Using the `RoutingDecision` record, trace the router's decision path for a messa
Consider a Content-Based Router processing 50,000 messages/second with 200 routing rules:

- What is the computational cost per message? (hint: O(n) for n rules)
- How does pre-compiling regex patterns (`RoutingOperator.Matches`) improve throughput?
- How does pre-compiling regex patterns (`RoutingOperator.Regex`) improve throughput?
- If you need to route to different brokers (Kafka for audit, NATS for real-time), how would the router's output topic abstraction enable this without code changes?

## Exam
Expand Down
8 changes: 4 additions & 4 deletions EnterpriseIntegrationPlatform/tutorials/13-routing-slip.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,20 +121,20 @@ Write C# code to construct a `RoutingSlip` with three steps:

```csharp
var slip = new RoutingSlip([
new RoutingSlipStep("Validate", new Dictionary<string, string>()),
new RoutingSlipStep("Transform", new Dictionary<string, string>
new RoutingSlipStep("Validate"),
new RoutingSlipStep("Transform", Parameters: new Dictionary<string, string>
{
["targetFormat"] = "XML",
["schemaVersion"] = "2.0"
}),
new RoutingSlipStep("Deliver", new Dictionary<string, string>
new RoutingSlipStep("Deliver", Parameters: new Dictionary<string, string>
{
["endpoint"] = "https://partner.example.com/api/orders"
})
]);
```

Open `src/Processing.Routing/RoutingSlip.cs` and verify the record structure. How does each step carry its own parameters? Why is this important for **atomicity** — each step is self-contained with all the data it needs.
Open `src/Contracts/RoutingSlip.cs` and verify the record structure. How does each step carry its own parameters? Why is this important for **atomicity** — each step is self-contained with all the data it needs.

### Step 2: Trace a Partial-Completion Recovery

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ The `AtomicPipelineWorkflow` implements full **saga compensation**. Completed st

### Step 1: Trace a Compensation Sequence

A workflow has steps: Persist → Validate → Transform → Deliver. Transform succeeds but Deliver fails after all retries. Open `src/Workflow.Temporal/AtomicPipelineWorkflow.cs` and trace:
A workflow has steps: Persist → Validate → Transform → Deliver. Transform succeeds but Deliver fails after all retries. Open `src/Workflow.Temporal/Workflows/AtomicPipelineWorkflow.cs` and trace:

1. Which steps need compensation? (only steps that committed work)
2. In what order do compensation steps execute? (hint: reverse)
Expand Down
2 changes: 1 addition & 1 deletion EnterpriseIntegrationPlatform/tutorials/27-resequencer.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public interface IResequencer

### MessageResequencer (concrete)

The `MessageResequencer` class implements `IResequencer`. Internally it maintains a `ConcurrentDictionary<string, SortedList<int, IntegrationEnvelope<string>>>` keyed by `CorrelationId`. Each entry tracks:
The `MessageResequencer` class implements `IResequencer`. Internally it maintains a `ConcurrentDictionary<Guid, SequenceBuffer>` keyed by `CorrelationId`. `SequenceBuffer` is a private inner class that uses a `ConcurrentDictionary<int, object>` for thread-safe storage and `OrderBy` for sequenced release. Each entry tracks:

1. **Expected next sequence number** — starts at 1
2. **Buffered messages** — sorted by `SequenceNumber`
Expand Down
4 changes: 2 additions & 2 deletions EnterpriseIntegrationPlatform/tutorials/31-event-sourcing.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public sealed record EventEnvelope(

```csharp
// src/EventSourcing/OptimisticConcurrencyException.cs
public sealed class OptimisticConcurrencyException : Exception
public sealed class OptimisticConcurrencyException : InvalidOperationException
{
public string StreamId { get; }
public long ExpectedVersion { get; }
Expand All @@ -101,7 +101,7 @@ When `AppendAsync` is called with an `expectedVersion` that does not match the s
public static class TemporalQuery
{
public static async Task<(TState State, long Version)> ReplayToPointInTimeAsync<TState>(
IEventStore store,
IEventStore eventStore,
IEventProjection<TState> projection,
string streamId,
DateTimeOffset pointInTime,
Expand Down
14 changes: 8 additions & 6 deletions EnterpriseIntegrationPlatform/tutorials/34-connector-http.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,15 @@ The source message is **Acked only after a successful `ConnectorResult`**. If th
An external API requires a token from `https://auth.example.com/token`. Write the connector configuration:

```csharp
await connector.SendWithTokenAsync(
await connector.SendWithTokenAsync<string, object>(
envelope,
endpoint: "https://api.partner.com/orders",
tokenUrl: "https://auth.example.com/token",
clientId: "eip-platform",
clientSecret: await secretProvider.GetSecretAsync("partner-api-secret"),
cancellationToken: ct);
relativeUrl: "/orders",
method: HttpMethod.Post,
tokenEndpoint: "https://auth.example.com/token",
tokenRequestBody: "grant_type=client_credentials&client_id=eip-platform&client_secret=" +
await secretProvider.GetSecretAsync("partner-api-secret"),
tokenHeaderName: "Authorization",
ct: ct);
```

Open `src/Connector.Http/HttpConnector.cs` and trace: How does the connector obtain, cache, and refresh tokens?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ Open `src/Connector.Sftp/SftpConnector.cs` and check: How does the platform pool
```csharp
var remotePath = await sftpConnector.UploadAsync(
envelope,
remoteDir: "/incoming/orders",
fileName: "order-42.json",
serializer: payload => System.Text.Encoding.UTF8.GetBytes(
System.Text.Json.JsonSerializer.Serialize(payload)),
cancellationToken: ct);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,10 @@ Lifecycle recording is a **best-effort side effect** — it must not block or fa
A message was received 30 minutes ago but never reached "Delivered" status. Use `ITraceAnalyzer.WhereIsMessageAsync` to investigate:

```csharp
var location = await traceAnalyzer.WhereIsMessageAsync(messageId);
// Returns: { Stage: "Transform", Status: "InProgress", SinceUtc: "30 min ago" }
var location = await traceAnalyzer.WhereIsMessageAsync(
correlationId: messageCorrelationId,
knownState: """{"lastSeen":"Transform","status":"InProgress"}""");
// Returns: a natural-language description of the message's current location
```

Open `src/Observability/TraceAnalyzer.cs` and trace: How does the analyzer query the message state store? What lifecycle states are tracked (Received, Routing, Transforming, Delivering, Delivered, Failed)?
Expand Down
Loading
Loading