Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion EnterpriseIntegrationPlatform/rules/completion-log.md
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ See `milestones.md` for current phase status and next chunk.
- `PostgresRoutingIntegrationTests` (7 tests): ContentBasedRouter (MessageType + Metadata + Regex), MessageFilter, RecipientListRouter, DynamicRouter (register + route), Detour (activate/deactivate)
- `PostgresAdvancedEipTests` (7 tests): Splitter (split + causation chain), DeadLetterPublisher (single + multi-reason), Resequencer (out-of-order → in-order + publish), Retry + DLQ pipeline, Aggregator (count completion + concat), full pipeline (Splitter → Router → DLQ)
- All tests use unique topics (Guid-based) to prevent cross-test interference
- All tests Docker-gated: Assert.Ignore when Aspire Postgres container unavailable
- All tests Docker-gated: Assert.Fail when Aspire Postgres container unavailable
- **Files created**:
- `tests/TutorialLabs/InfrastructureTests/PostgresRoutingIntegrationTests.cs` (7 tests)
- `tests/TutorialLabs/InfrastructureTests/PostgresAdvancedEipTests.cs` (7 tests)
Expand Down
39 changes: 22 additions & 17 deletions EnterpriseIntegrationPlatform/tests/TestAppHost/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,23 +40,28 @@
.WithEnvironment("POSTGRES_PASSWORD", "eip")
.WithEndpoint(targetPort: 5432, name: "postgres-tcp", scheme: "tcp");

// ── Apache Kafka (via Bitnami image) — high-throughput event streaming ───────
// Uses KRaft mode (no ZooKeeper) for minimal resource footprint in tests.
// IMPORTANT: The EXTERNAL listener port MUST be pinned (port: 29094) and the
// KAFKA_CFG_ADVERTISED_LISTENERS EXTERNAL address MUST match, otherwise Kafka
// advertises the container-internal port in metadata and the Confluent.Kafka
// producer reconnects to the wrong address after the initial bootstrap.
var kafka = builder.AddContainer("kafka", "bitnami/kafka", "3.9.0")
.WithEnvironment("KAFKA_CFG_NODE_ID", "0")
.WithEnvironment("KAFKA_CFG_PROCESS_ROLES", "controller,broker")
.WithEnvironment("KAFKA_CFG_CONTROLLER_QUORUM_VOTERS", "0@localhost:9093")
.WithEnvironment("KAFKA_CFG_CONTROLLER_LISTENER_NAMES", "CONTROLLER")
.WithEnvironment("KAFKA_CFG_LISTENERS", "PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094")
.WithEnvironment("KAFKA_CFG_ADVERTISED_LISTENERS", "PLAINTEXT://localhost:9092,EXTERNAL://localhost:29094")
.WithEnvironment("KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP", "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT")
.WithEnvironment("KAFKA_CFG_INTER_BROKER_LISTENER_NAME", "PLAINTEXT")
.WithEnvironment("KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE", "true")
.WithEndpoint(port: 29094, targetPort: 9094, name: "kafka-tcp", scheme: "tcp");
// ── Apache Kafka — high-throughput event streaming for T05 and broker tests ──
// Uses official Apache Kafka image in KRaft mode (no ZooKeeper).
// IMPORTANT: isProxied: false is required because Kafka's binary protocol needs
// the advertised listener port to match the actual host port. With Aspire's TCP
// proxy, the proxy port is dynamic and cannot be injected into the container's
// KAFKA_ADVERTISED_LISTENERS before startup. Direct Docker port mapping solves
// this: host:29092 → container:9092, and Kafka advertises localhost:29092.
// NOTE: Listeners MUST use ://:PORT format (not ://0.0.0.0:PORT) because
// the Apache Kafka Docker image rejects 0.0.0.0 in advertised.listeners.
var kafka = builder.AddContainer("kafka", "apache/kafka", "3.9.0")
.WithEnvironment("KAFKA_NODE_ID", "1")
.WithEnvironment("KAFKA_PROCESS_ROLES", "broker,controller")
.WithEnvironment("KAFKA_CONTROLLER_QUORUM_VOTERS", "1@localhost:9093")
.WithEnvironment("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER")
.WithEnvironment("KAFKA_LISTENERS", "PLAINTEXT://:9092,CONTROLLER://:9093")
.WithEnvironment("KAFKA_ADVERTISED_LISTENERS", "PLAINTEXT://localhost:29092")
.WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
.WithEnvironment("KAFKA_INTER_BROKER_LISTENER_NAME", "PLAINTEXT")
.WithEnvironment("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
.WithEnvironment("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "true")
.WithEnvironment("CLUSTER_ID", "eip-test-cluster-001")
.WithEndpoint(port: 29092, targetPort: 9092, name: "kafka-tcp", scheme: "tcp", isProxied: false);

// ── Apache Pulsar — Key_Shared subscription for recipient-keyed distribution ─
// Standalone mode includes broker + bookie + ZooKeeper in a single container.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
// test classes in the assembly. It starts once per test run.
// ============================================================================

using System.Diagnostics;
using NUnit.Framework;
using TutorialLabs.Infrastructure;

Expand Down Expand Up @@ -80,45 +81,57 @@ public async Task GlobalTearDown()

/// <summary>
/// Creates a NatsBrokerEndpoint connected to the real NATS JetStream.
/// Throws Ignore if Docker is not available.
/// Throws Fail if Docker is not available.
/// </summary>
public static NatsBrokerEndpoint CreateNatsEndpoint(string name)
{
if (!IsAvailable || NatsUrl is null)
Assert.Ignore("Docker not available — skipping real broker test");
{
Assert.Fail("Docker not available — skipping real broker test");
throw new UnreachableException();
}
return new NatsBrokerEndpoint(name, NatsUrl);
}

/// <summary>
/// Creates a PostgresBrokerEndpoint connected to the real PostgreSQL.
/// Throws Ignore if Docker is not available.
/// Throws Fail if Docker is not available.
/// </summary>
public static PostgresBrokerEndpoint CreatePostgresEndpoint(string name)
{
if (!IsAvailable || PostgresConnectionString is null)
Assert.Ignore("Docker not available — skipping real Postgres broker test");
{
Assert.Fail("Docker not available — skipping real Postgres broker test");
throw new UnreachableException();
}
return new PostgresBrokerEndpoint(name, PostgresConnectionString);
}

/// <summary>
/// Creates a KafkaBrokerEndpoint connected to the real Apache Kafka.
/// Throws Ignore if Docker is not available.
/// Throws Fail if Docker is not available.
/// </summary>
public static KafkaBrokerEndpoint CreateKafkaEndpoint(string name)
{
if (!IsAvailable || KafkaBootstrapServers is null)
Assert.Ignore("Docker not available — skipping real Kafka broker test");
{
Assert.Fail("Docker not available — skipping real Kafka broker test");
throw new UnreachableException();
}
return new KafkaBrokerEndpoint(name, KafkaBootstrapServers);
}

/// <summary>
/// Creates a PulsarBrokerEndpoint connected to the real Apache Pulsar.
/// Throws Ignore if Docker is not available.
/// Throws Fail if Docker is not available.
/// </summary>
public static PulsarBrokerEndpoint CreatePulsarEndpoint(string name)
{
if (!IsAvailable || PulsarServiceUrl is null)
Assert.Ignore("Docker not available — skipping real Pulsar broker test");
{
Assert.Fail("Docker not available — skipping real Pulsar broker test");
throw new UnreachableException();
}
return new PulsarBrokerEndpoint(name, PulsarServiceUrl);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// 3. Apache Pulsar (Key_Shared recipient-keyed distribution)
// 4. PostgreSQL (SQL-based, ACID, pg_notify)
//
// Requires Docker; tests are skipped (Assert.Ignore) when unavailable.
// Requires Docker; tests fail (Assert.Fail) when unavailable.
// ============================================================================

using EnterpriseIntegrationPlatform.Contracts;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace TutorialLabs.InfrastructureTests;

/// <summary>
/// Smoke tests that verify the Aspire-hosted test infrastructure starts
/// and connects. Requires Docker; tests are skipped when unavailable.
/// and connects. Requires Docker; tests fail when unavailable.
/// </summary>
[TestFixture]
public sealed class InfrastructureConnectivityTests
Expand All @@ -22,7 +22,10 @@ public async Task Nats_PublishAndReceive_RoundTrip()
{
var natsUrl = await SharedTestAppHost.GetNatsUrlAsync();
if (natsUrl is null)
Assert.Ignore("Docker not available — skipping NATS test");
{
Assert.Fail("Docker not available — skipping NATS test");
return;
}

await using var endpoint = new NatsBrokerEndpoint("nats-test", natsUrl);

Expand Down Expand Up @@ -56,7 +59,10 @@ public async Task Nats_ProducerCaptures_PublishedMessages()
{
var natsUrl = await SharedTestAppHost.GetNatsUrlAsync();
if (natsUrl is null)
Assert.Ignore("Docker not available — skipping NATS test");
{
Assert.Fail("Docker not available — skipping NATS test");
return;
}

await using var endpoint = new NatsBrokerEndpoint("capture-test", natsUrl);

Expand Down Expand Up @@ -110,7 +116,10 @@ public async Task AspireTestAppHost_StartsSuccessfully()
{
var app = await SharedTestAppHost.GetAppAsync();
if (app is null)
Assert.Ignore("Docker not available — skipping Aspire test");
{
Assert.Fail("Docker not available — skipping Aspire test");
return;
}

Assert.That(SharedTestAppHost.IsAvailable, Is.True,
"Aspire TestAppHost should be running");
Expand All @@ -123,7 +132,10 @@ public async Task Postgres_PublishAndPoll_RoundTrip()
{
var connStr = await SharedTestAppHost.GetPostgresConnectionStringAsync();
if (connStr is null)
Assert.Ignore("Docker not available — skipping Postgres test");
{
Assert.Fail("Docker not available — skipping Postgres test");
return;
}

await using var endpoint = new PostgresBrokerEndpoint("pg-roundtrip-test", connStr);
await endpoint.EnsureSchemaAsync();
Expand All @@ -147,7 +159,10 @@ public async Task Postgres_ProducerCaptures_PublishedMessages()
{
var connStr = await SharedTestAppHost.GetPostgresConnectionStringAsync();
if (connStr is null)
Assert.Ignore("Docker not available — skipping Postgres test");
{
Assert.Fail("Docker not available — skipping Postgres test");
return;
}

await using var endpoint = new PostgresBrokerEndpoint("pg-capture-test", connStr);
await endpoint.EnsureSchemaAsync();
Expand All @@ -169,7 +184,10 @@ public async Task Postgres_SubscribeAndReceive_EventDriven()
{
var connStr = await SharedTestAppHost.GetPostgresConnectionStringAsync();
if (connStr is null)
Assert.Ignore("Docker not available — skipping Postgres test");
{
Assert.Fail("Docker not available — skipping Postgres test");
return;
}

await using var endpoint = new PostgresBrokerEndpoint("pg-subscribe-test", connStr);
await endpoint.EnsureSchemaAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// ============================================================================
// Proves advanced EIP patterns work on real PostgreSQL broker transport:
// Splitter, Aggregator, Resequencer, Dead-Letter Publisher, Retry Policy.
// Requires Docker (Aspire Postgres container); tests skipped when unavailable.
// Requires Docker (Aspire Postgres container); tests fail when unavailable.
// ============================================================================

using EnterpriseIntegrationPlatform.Contracts;
Expand Down Expand Up @@ -32,7 +32,7 @@ public sealed class PostgresAdvancedEipTests
public async Task Splitter_SplitsComposite_PublishesEachPart_ViaPostgres()
{
var connStr = await SharedTestAppHost.GetPostgresConnectionStringAsync();
if (connStr is null) Assert.Ignore("Docker not available");
if (connStr is null) { Assert.Fail("Docker not available"); return; }

await using var broker = new PostgresBrokerEndpoint("split-pg", connStr);
var splitTopic = $"split-{Guid.NewGuid():N}";
Expand Down Expand Up @@ -65,7 +65,7 @@ public async Task Splitter_SplitsComposite_PublishesEachPart_ViaPostgres()
public async Task DeadLetterPublisher_PublishesToDlqTopic_ViaPostgres()
{
var connStr = await SharedTestAppHost.GetPostgresConnectionStringAsync();
if (connStr is null) Assert.Ignore("Docker not available");
if (connStr is null) { Assert.Fail("Docker not available"); return; }

await using var broker = new PostgresBrokerEndpoint("dlq-pg", connStr);
var dlqTopic = $"dlq-{Guid.NewGuid():N}";
Expand Down Expand Up @@ -105,7 +105,7 @@ await dlq.PublishAsync(
public async Task DeadLetterPublisher_MultipleReasons_ViaPostgres()
{
var connStr = await SharedTestAppHost.GetPostgresConnectionStringAsync();
if (connStr is null) Assert.Ignore("Docker not available");
if (connStr is null) { Assert.Fail("Docker not available"); return; }

await using var broker = new PostgresBrokerEndpoint("dlq-multi-pg", connStr);
var dlqTopic = $"dlq-multi-{Guid.NewGuid():N}";
Expand All @@ -131,7 +131,7 @@ public async Task DeadLetterPublisher_MultipleReasons_ViaPostgres()
public async Task Resequencer_OrdersOutOfSequenceMessages_ViaPostgres()
{
var connStr = await SharedTestAppHost.GetPostgresConnectionStringAsync();
if (connStr is null) Assert.Ignore("Docker not available");
if (connStr is null) { Assert.Fail("Docker not available"); return; }

// Resequencer is in-memory but we prove it works in a Postgres pipeline
var resequencer = new MessageResequencer(
Expand Down Expand Up @@ -210,7 +210,7 @@ public async Task Resequencer_OrdersOutOfSequenceMessages_ViaPostgres()
public async Task RetryPolicy_ExhaustsRetries_ThenDlq_ViaPostgres()
{
var connStr = await SharedTestAppHost.GetPostgresConnectionStringAsync();
if (connStr is null) Assert.Ignore("Docker not available");
if (connStr is null) { Assert.Fail("Docker not available"); return; }

await using var broker = new PostgresBrokerEndpoint("retry-pg", connStr);
var dlqTopic = $"retry-dlq-{Guid.NewGuid():N}";
Expand Down Expand Up @@ -270,7 +270,7 @@ await dlq.PublishAsync(
public async Task Aggregator_CollectsAndPublishesAggregate_ViaPostgres()
{
var connStr = await SharedTestAppHost.GetPostgresConnectionStringAsync();
if (connStr is null) Assert.Ignore("Docker not available");
if (connStr is null) { Assert.Fail("Docker not available"); return; }

await using var broker = new PostgresBrokerEndpoint("agg-pg", connStr);
var aggTopic = $"aggregated-{Guid.NewGuid():N}";
Expand Down Expand Up @@ -315,7 +315,7 @@ public async Task Aggregator_CollectsAndPublishesAggregate_ViaPostgres()
public async Task FullPipeline_SplitterRouterDlq_ViaPostgres()
{
var connStr = await SharedTestAppHost.GetPostgresConnectionStringAsync();
if (connStr is null) Assert.Ignore("Docker not available");
if (connStr is null) { Assert.Fail("Docker not available"); return; }

await using var broker = new PostgresBrokerEndpoint("pipeline-pg", connStr);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// These tests wire real EIP routing components (ContentBasedRouter, MessageFilter,
// RecipientListRouter, DynamicRouter, Detour) to the real PostgresBrokerEndpoint.
// Each test creates unique topics to prevent cross-test interference.
// Requires Docker (Aspire Postgres container); tests are skipped when unavailable.
// Requires Docker (Aspire Postgres container); tests fail when unavailable.
// ============================================================================

using EnterpriseIntegrationPlatform.Contracts;
Expand All @@ -30,7 +30,7 @@ public sealed class PostgresRoutingIntegrationTests
public async Task ContentBasedRouter_RoutesOnMessageType_ViaPostgres()
{
var connStr = await SharedTestAppHost.GetPostgresConnectionStringAsync();
if (connStr is null) Assert.Ignore("Docker not available");
if (connStr is null) { Assert.Fail("Docker not available"); return; }

await using var broker = new PostgresBrokerEndpoint("cbr-pg", connStr);

Expand Down Expand Up @@ -87,7 +87,7 @@ public async Task ContentBasedRouter_RoutesOnMessageType_ViaPostgres()
public async Task ContentBasedRouter_RoutesOnMetadata_ViaPostgres()
{
var connStr = await SharedTestAppHost.GetPostgresConnectionStringAsync();
if (connStr is null) Assert.Ignore("Docker not available");
if (connStr is null) { Assert.Fail("Docker not available"); return; }

await using var broker = new PostgresBrokerEndpoint("cbr-meta-pg", connStr);

Expand Down Expand Up @@ -133,7 +133,7 @@ public async Task ContentBasedRouter_RoutesOnMetadata_ViaPostgres()
public async Task MessageFilter_FiltersAndRoutes_ViaPostgres()
{
var connStr = await SharedTestAppHost.GetPostgresConnectionStringAsync();
if (connStr is null) Assert.Ignore("Docker not available");
if (connStr is null) { Assert.Fail("Docker not available"); return; }

await using var broker = new PostgresBrokerEndpoint("filter-pg", connStr);

Expand Down Expand Up @@ -176,7 +176,7 @@ public async Task MessageFilter_FiltersAndRoutes_ViaPostgres()
public async Task RecipientListRouter_FansOutToAllMatching_ViaPostgres()
{
var connStr = await SharedTestAppHost.GetPostgresConnectionStringAsync();
if (connStr is null) Assert.Ignore("Docker not available");
if (connStr is null) { Assert.Fail("Docker not available"); return; }

await using var broker = new PostgresBrokerEndpoint("rlist-pg", connStr);

Expand Down Expand Up @@ -226,7 +226,7 @@ public async Task RecipientListRouter_FansOutToAllMatching_ViaPostgres()
public async Task DynamicRouter_RegisterAndRoute_ViaPostgres()
{
var connStr = await SharedTestAppHost.GetPostgresConnectionStringAsync();
if (connStr is null) Assert.Ignore("Docker not available");
if (connStr is null) { Assert.Fail("Docker not available"); return; }

await using var broker = new PostgresBrokerEndpoint("dynroute-pg", connStr);

Expand Down Expand Up @@ -264,7 +264,7 @@ public async Task DynamicRouter_RegisterAndRoute_ViaPostgres()
public async Task Detour_ActivateAndDeactivate_ViaPostgres()
{
var connStr = await SharedTestAppHost.GetPostgresConnectionStringAsync();
if (connStr is null) Assert.Ignore("Docker not available");
if (connStr is null) { Assert.Fail("Docker not available"); return; }

await using var broker = new PostgresBrokerEndpoint("detour-pg", connStr);

Expand Down Expand Up @@ -309,7 +309,7 @@ public async Task Detour_ActivateAndDeactivate_ViaPostgres()
public async Task ContentBasedRouter_RegexRouting_ViaPostgres()
{
var connStr = await SharedTestAppHost.GetPostgresConnectionStringAsync();
if (connStr is null) Assert.Ignore("Docker not available");
if (connStr is null) { Assert.Fail("Docker not available"); return; }

await using var broker = new PostgresBrokerEndpoint("cbr-regex-pg", connStr);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ public async Task SetUp()
{
var natsUrl = await SharedTestAppHost.GetNatsUrlAsync();
if (natsUrl is null)
Assert.Ignore("Docker not available — skipping real broker test");
{
Assert.Fail("Docker not available — skipping real broker test");
return;
}

_natsUrl = natsUrl;
_broker = new NatsBrokerEndpoint("broker", _natsUrl);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ public async Task SetUp()
{
var natsUrl = await SharedTestAppHost.GetNatsUrlAsync();
if (natsUrl is null)
Assert.Ignore("Docker not available — skipping real broker test");
{
Assert.Fail("Docker not available — skipping real broker test");
return;
}

_natsUrl = natsUrl;
_broker = new NatsBrokerEndpoint("broker", _natsUrl);
Expand Down
Loading
Loading