diff --git a/EnterpriseIntegrationPlatform/EnterpriseIntegrationPlatform.sln b/EnterpriseIntegrationPlatform/EnterpriseIntegrationPlatform.sln
index 4c1043c..4563c3e 100644
--- a/EnterpriseIntegrationPlatform/EnterpriseIntegrationPlatform.sln
+++ b/EnterpriseIntegrationPlatform/EnterpriseIntegrationPlatform.sln
@@ -117,6 +117,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Processing.RequestReply", "
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Connectors", "src\Connectors\Connectors.csproj", "{7998C735-EB8F-4DBE-BB32-978E9A465433}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Testing", "src\Testing\Testing.csproj", "{F13607C8-980A-4EFF-93B5-5D6FE344F08C}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -751,6 +753,18 @@ Global
{7998C735-EB8F-4DBE-BB32-978E9A465433}.Release|x64.Build.0 = Release|Any CPU
{7998C735-EB8F-4DBE-BB32-978E9A465433}.Release|x86.ActiveCfg = Release|Any CPU
{7998C735-EB8F-4DBE-BB32-978E9A465433}.Release|x86.Build.0 = Release|Any CPU
+ {F13607C8-980A-4EFF-93B5-5D6FE344F08C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {F13607C8-980A-4EFF-93B5-5D6FE344F08C}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {F13607C8-980A-4EFF-93B5-5D6FE344F08C}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {F13607C8-980A-4EFF-93B5-5D6FE344F08C}.Debug|x64.Build.0 = Debug|Any CPU
+ {F13607C8-980A-4EFF-93B5-5D6FE344F08C}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {F13607C8-980A-4EFF-93B5-5D6FE344F08C}.Debug|x86.Build.0 = Debug|Any CPU
+ {F13607C8-980A-4EFF-93B5-5D6FE344F08C}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {F13607C8-980A-4EFF-93B5-5D6FE344F08C}.Release|Any CPU.Build.0 = Release|Any CPU
+ {F13607C8-980A-4EFF-93B5-5D6FE344F08C}.Release|x64.ActiveCfg = Release|Any CPU
+ {F13607C8-980A-4EFF-93B5-5D6FE344F08C}.Release|x64.Build.0 = Release|Any CPU
+ {F13607C8-980A-4EFF-93B5-5D6FE344F08C}.Release|x86.ActiveCfg = Release|Any CPU
+ {F13607C8-980A-4EFF-93B5-5D6FE344F08C}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -811,5 +825,6 @@ Global
{F7FBDC14-6ED2-46AC-B348-2427C14F0158} = {A1B2C3D4-0001-0001-0001-000000000002}
{F8DD5966-EE52-4ADA-BE4F-D23636F424F8} = {A1B2C3D4-0001-0001-0001-000000000002}
{7998C735-EB8F-4DBE-BB32-978E9A465433} = {A1B2C3D4-0001-0001-0001-000000000002}
+ {F13607C8-980A-4EFF-93B5-5D6FE344F08C} = {A1B2C3D4-0001-0001-0001-000000000001}
EndGlobalSection
EndGlobal
diff --git a/EnterpriseIntegrationPlatform/src/Testing/AspireIntegrationTestHost.cs b/EnterpriseIntegrationPlatform/src/Testing/AspireIntegrationTestHost.cs
new file mode 100644
index 0000000..9d08616
--- /dev/null
+++ b/EnterpriseIntegrationPlatform/src/Testing/AspireIntegrationTestHost.cs
@@ -0,0 +1,103 @@
+// ============================================================================
+// AspireIntegrationTestHost – DI-based test host for E2E integration testing
+// ============================================================================
+
+using EnterpriseIntegrationPlatform.Ingestion;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Logging.Abstractions;
+
+namespace EnterpriseIntegrationPlatform.Testing;
+
+///
+/// Aspire-style integration test host that wires real EIP components
+/// with MockEndpoints via dependency injection.
+///
+public sealed class AspireIntegrationTestHost : IAsyncDisposable
+{
+ private readonly IHost _host;
+ private readonly Dictionary _endpoints;
+
+ internal AspireIntegrationTestHost(IHost host, Dictionary endpoints)
+ {
+ _host = host;
+ _endpoints = endpoints;
+ }
+
+ public IServiceProvider Services => _host.Services;
+
+ public T GetService() where T : notnull =>
+ _host.Services.GetRequiredService();
+
+ public MockEndpoint GetEndpoint(string name) => _endpoints[name];
+
+ public IReadOnlyDictionary Endpoints => _endpoints;
+
+ public static Builder CreateBuilder() => new();
+
+ public ValueTask DisposeAsync()
+ {
+ _host.Dispose();
+ return ValueTask.CompletedTask;
+ }
+
+ public sealed class Builder
+ {
+ private readonly HostApplicationBuilder _inner;
+ private readonly Dictionary _endpoints = new();
+
+ public Builder()
+ {
+ _inner = Host.CreateApplicationBuilder([]);
+ _inner.Services.AddSingleton(NullLoggerFactory.Instance);
+ _inner.Services.AddSingleton(typeof(ILogger<>), typeof(NullLogger<>));
+ }
+
+ /// Adds a named MockEndpoint for testing.
+ public MockEndpoint AddMockEndpoint(string name)
+ {
+ var ep = new MockEndpoint(name);
+ _endpoints[name] = ep;
+ return ep;
+ }
+
+ /// Registers a MockEndpoint as the default IMessageBrokerProducer.
+ public Builder UseProducer(MockEndpoint endpoint)
+ {
+ _inner.Services.AddSingleton(endpoint);
+ return this;
+ }
+
+ /// Registers a MockEndpoint as the default IMessageBrokerConsumer.
+ public Builder UseConsumer(MockEndpoint endpoint)
+ {
+ _inner.Services.AddSingleton(endpoint);
+ return this;
+ }
+
+ public Builder ConfigureServices(Action configure)
+ {
+ configure(_inner.Services);
+ return this;
+ }
+
+ public Builder AddSingleton(TService instance) where TService : class
+ {
+ _inner.Services.AddSingleton(instance);
+ return this;
+ }
+
+ public Builder Configure(Action configure) where TOptions : class
+ {
+ _inner.Services.Configure(configure);
+ return this;
+ }
+
+ public AspireIntegrationTestHost Build()
+ {
+ var host = _inner.Build();
+ return new AspireIntegrationTestHost(host, _endpoints);
+ }
+ }
+}
diff --git a/EnterpriseIntegrationPlatform/src/Testing/MockActivityServices.cs b/EnterpriseIntegrationPlatform/src/Testing/MockActivityServices.cs
new file mode 100644
index 0000000..3773f04
--- /dev/null
+++ b/EnterpriseIntegrationPlatform/src/Testing/MockActivityServices.cs
@@ -0,0 +1,172 @@
+// ============================================================================
+// MockActivityServices – In-memory activity services for testing
+// ============================================================================
+
+using System.Collections.Concurrent;
+using EnterpriseIntegrationPlatform.Activities;
+
+namespace EnterpriseIntegrationPlatform.Testing;
+
+///
+/// Real in-memory implementation of
+/// that returns configurable results per step name.
+///
+public sealed class MockCompensationActivityService : ICompensationActivityService
+{
+ private readonly Dictionary _stepResults = new();
+ private readonly ConcurrentQueue _calls = new();
+ private bool _defaultResult = true;
+
+ /// All compensation calls recorded.
+ public IReadOnlyList Calls => _calls.ToArray();
+
+ /// Sets the result for a specific step name.
+ public MockCompensationActivityService WithStepResult(string stepName, bool success)
+ {
+ _stepResults[stepName] = success;
+ return this;
+ }
+
+ /// Sets the default result for unmatched steps.
+ public MockCompensationActivityService WithDefaultResult(bool success)
+ {
+ _defaultResult = success;
+ return this;
+ }
+
+ public Task CompensateAsync(Guid correlationId, string stepName)
+ {
+ _calls.Enqueue(new CompensationCallRecord(correlationId, stepName));
+ var result = _stepResults.TryGetValue(stepName, out var r) ? r : _defaultResult;
+ return Task.FromResult(result);
+ }
+
+ public sealed record CompensationCallRecord(Guid CorrelationId, string StepName);
+}
+
+///
+/// Real in-memory implementation of
+/// that returns configurable validation results per message type.
+///
+public sealed class MockMessageValidationService : IMessageValidationService
+{
+ private readonly Dictionary _results = new();
+ private readonly ConcurrentQueue _calls = new();
+ private MessageValidationResult _defaultResult = MessageValidationResult.Success;
+
+ /// All validation calls recorded.
+ public IReadOnlyList Calls => _calls.ToArray();
+
+ /// Sets the result for a specific message type.
+ public MockMessageValidationService WithResult(string messageType, MessageValidationResult result)
+ {
+ _results[messageType] = result;
+ return this;
+ }
+
+ /// Sets the default result for unmatched types.
+ public MockMessageValidationService WithDefaultResult(MessageValidationResult result)
+ {
+ _defaultResult = result;
+ return this;
+ }
+
+ public Task ValidateAsync(string messageType, string payloadJson)
+ {
+ _calls.Enqueue(new ValidationCallRecord(messageType, payloadJson));
+ var result = _results.TryGetValue(messageType, out var r) ? r : _defaultResult;
+ return Task.FromResult(result);
+ }
+
+ public sealed record ValidationCallRecord(string MessageType, string PayloadJson);
+}
+
+///
+/// Real in-memory implementation of
+/// that captures all persistence calls.
+///
+public sealed class MockPersistenceActivityService : IPersistenceActivityService
+{
+ private readonly ConcurrentQueue _calls = new();
+
+ /// All persistence calls recorded.
+ public IReadOnlyList Calls => _calls.ToArray();
+
+ /// Number of SaveMessage calls.
+ public int SaveCount => _calls.Count(c => c.Operation == "SaveMessage");
+
+ /// Number of UpdateDeliveryStatus calls.
+ public int UpdateStatusCount => _calls.Count(c => c.Operation == "UpdateDeliveryStatus");
+
+ /// Number of SaveFault calls.
+ public int SaveFaultCount => _calls.Count(c => c.Operation == "SaveFault");
+
+ public Task SaveMessageAsync(IntegrationPipelineInput input, CancellationToken cancellationToken = default)
+ {
+ _calls.Enqueue(new PersistenceCallRecord("SaveMessage", input.MessageId, input.MessageType, null));
+ return Task.CompletedTask;
+ }
+
+ public Task UpdateDeliveryStatusAsync(
+ Guid messageId, Guid correlationId, DateTimeOffset recordedAt,
+ string status, CancellationToken cancellationToken = default)
+ {
+ _calls.Enqueue(new PersistenceCallRecord("UpdateDeliveryStatus", messageId, status, null));
+ return Task.CompletedTask;
+ }
+
+ public Task SaveFaultAsync(
+ Guid messageId, Guid correlationId, string messageType,
+ string faultedBy, string reason, int retryCount,
+ CancellationToken cancellationToken = default)
+ {
+ _calls.Enqueue(new PersistenceCallRecord("SaveFault", messageId, messageType, reason));
+ return Task.CompletedTask;
+ }
+
+ /// Asserts that SaveMessage was called the expected number of times.
+ public void AssertSaveCount(int expected) =>
+ NUnit.Framework.Assert.That(SaveCount, NUnit.Framework.Is.EqualTo(expected));
+
+ public void Reset()
+ {
+ while (_calls.TryDequeue(out _)) { }
+ }
+
+ public sealed record PersistenceCallRecord(string Operation, Guid MessageId, string? Detail, string? Reason);
+}
+
+///
+/// Real in-memory implementation of
+/// that captures all log entries.
+///
+public sealed class MockMessageLoggingService : IMessageLoggingService
+{
+ private readonly ConcurrentQueue _logs = new();
+
+ /// All log entries recorded.
+ public IReadOnlyList Logs => _logs.ToArray();
+
+ /// Number of log entries.
+ public int LogCount => _logs.Count;
+
+ public Task LogAsync(Guid messageId, string messageType, string stage)
+ {
+ _logs.Enqueue(new LogRecord(messageId, messageType, stage));
+ return Task.CompletedTask;
+ }
+
+ /// Asserts a specific stage was logged for the given message.
+ public void AssertLogged(Guid messageId, string stage) =>
+ NUnit.Framework.Assert.That(
+ _logs.Any(l => l.MessageId == messageId && l.Stage == stage),
+ NUnit.Framework.Is.True,
+ $"Expected log entry for message {messageId} at stage '{stage}'");
+
+ public void Reset()
+ {
+ while (_logs.TryDequeue(out _)) { }
+ }
+
+ public sealed record LogRecord(Guid MessageId, string MessageType, string Stage);
+}
diff --git a/EnterpriseIntegrationPlatform/src/Testing/MockAggregationStrategy.cs b/EnterpriseIntegrationPlatform/src/Testing/MockAggregationStrategy.cs
new file mode 100644
index 0000000..083c452
--- /dev/null
+++ b/EnterpriseIntegrationPlatform/src/Testing/MockAggregationStrategy.cs
@@ -0,0 +1,30 @@
+// ============================================================================
+// MockAggregationStrategy – Configurable aggregation for testing
+// ============================================================================
+
+using EnterpriseIntegrationPlatform.Processing.Aggregator;
+
+namespace EnterpriseIntegrationPlatform.Testing;
+
+///
+/// Real in-memory implementation of
+/// that applies a configurable aggregation function.
+///
+public sealed class MockAggregationStrategy : IAggregationStrategy
+{
+ private readonly Func, TAggregate> _aggregateFunc;
+ private int _callCount;
+
+ /// Creates a mock strategy with the given aggregation function.
+ public MockAggregationStrategy(Func, TAggregate> aggregateFunc) =>
+ _aggregateFunc = aggregateFunc;
+
+ /// Number of aggregation calls.
+ public int CallCount => _callCount;
+
+ public TAggregate Aggregate(IReadOnlyList items)
+ {
+ Interlocked.Increment(ref _callCount);
+ return _aggregateFunc(items);
+ }
+}
diff --git a/EnterpriseIntegrationPlatform/src/Testing/MockEndpoint.cs b/EnterpriseIntegrationPlatform/src/Testing/MockEndpoint.cs
new file mode 100644
index 0000000..2677b38
--- /dev/null
+++ b/EnterpriseIntegrationPlatform/src/Testing/MockEndpoint.cs
@@ -0,0 +1,158 @@
+// ============================================================================
+// MockEndpoint – Real in-memory message broker for end-to-end integration testing
+// ============================================================================
+// Captures messages published by EIP components and feeds test messages to
+// consumers. Inspired by Apache Camel's MockEndpoint pattern. A real service
+// implementation — not a test double.
+// ============================================================================
+
+using System.Collections.Concurrent;
+using EnterpriseIntegrationPlatform.Contracts;
+using EnterpriseIntegrationPlatform.Ingestion;
+using NUnit.Framework;
+
+namespace EnterpriseIntegrationPlatform.Testing;
+
+///
+/// Real in-memory message broker endpoint for end-to-end integration testing.
+/// Implements all broker interfaces so it can act as both producer (captures
+/// outbound messages) and consumer (feeds inbound messages to handlers).
+///
+public sealed class MockEndpoint : IMessageBrokerProducer, IMessageBrokerConsumer,
+ IEventDrivenConsumer, IPollingConsumer, ISelectiveConsumer, IAsyncDisposable
+{
+ private readonly string _name;
+ private readonly ConcurrentQueue _received = new();
+ private readonly ConcurrentQueue