diff --git a/e2e/database_test.go b/e2e/database_test.go index 52f29367..95139b35 100644 --- a/e2e/database_test.go +++ b/e2e/database_test.go @@ -8,10 +8,9 @@ import ( "fmt" "iter" "testing" + "time" "github.com/jackc/pgx/v5" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" controlplane "github.com/pgEdge/control-plane/api/apiv1/gen/control_plane" "github.com/pgEdge/control-plane/client" @@ -356,49 +355,83 @@ func (d *DatabaseFixture) waitForTask(ctx context.Context, task *controlplane.Ta func (f *DatabaseFixture) WaitForReplication(ctx context.Context, t testing.TB, username, password string) { tLog(t, "waiting for replication to catch up on all nodes") - // Execute sync_event on all primary nodes - nodeSyncMap := make(map[string]string) - for _, node := range f.Spec.Nodes { - primaryOpts := ConnectionOptions{ - Matcher: And(WithNode(node.Name), WithRole("primary")), - Username: username, - Password: password, + // After a primary change (switchover/failover), Spock subscriptions need + // time to reconnect to the new primary. Retry with fresh sync markers + // until all nodes are in sync or the 5-minute deadline is exceeded. + // The 5-minute budget accommodates Spock's reconnect delay when the + // provider node's primary changes (the logical replication slot must be + // re-created on the new primary, which takes additional time in CI). + deadline := time.Now().Add(5 * time.Minute) + for { + if err := f.Refresh(ctx); err != nil { + t.Errorf("failed to refresh database state while waiting for replication: %v", err) + return + } + ok, reason := f.pollReplication(ctx, username, password) + if ok { + return + } + if time.Now().After(deadline) { + t.Errorf("replication did not catch up on all nodes within 5 minutes (last failure: %s)", reason) + return + } + tLogf(t, "replication not yet in sync, retrying in 15s... (%s)", reason) + select { + case <-ctx.Done(): + t.Errorf("replication wait aborted: %v", ctx.Err()) + return + case <-time.After(15 * time.Second): } - - f.WithConnection(ctx, primaryOpts, t, func(conn *pgx.Conn) { - var syncLSN string - - row := conn.QueryRow(ctx, "SELECT spock.sync_event();") - require.NoError(t, row.Scan(&syncLSN)) - - assert.NotEmpty(t, syncLSN) - - nodeSyncMap[node.Name] = syncLSN - }) } +} - // Verify wait_for_sync_event on all other nodes +// pollReplication inserts a sync marker on every primary, then checks that +// each marker has been delivered to every other node's primary. Returns +// (true, "") only when all cross-node checks pass. On failure it returns +// (false, reason) where reason identifies the failing pair so callers can +// log it. Never calls t.Fatal/t.Error so it can be used safely in a retry loop. +func (f *DatabaseFixture) pollReplication(ctx context.Context, username, password string) (bool, string) { + nodeSyncMap := make(map[string]string) for _, node := range f.Spec.Nodes { - primaryOpts := ConnectionOptions{ + conn, err := f.ConnectToInstance(ctx, ConnectionOptions{ Matcher: And(WithNode(node.Name), WithRole("primary")), Username: username, Password: password, + }) + if err != nil { + return false, fmt.Sprintf("connect to %s primary: %v", node.Name, err) + } + var syncLSN string + err = conn.QueryRow(ctx, "SELECT spock.sync_event();").Scan(&syncLSN) + conn.Close(ctx) + if err != nil || syncLSN == "" { + return false, fmt.Sprintf("sync_event on %s: %v", node.Name, err) } + nodeSyncMap[node.Name] = syncLSN + } + for _, node := range f.Spec.Nodes { for peerNode, lsn := range nodeSyncMap { if peerNode == node.Name { continue } - - f.WithConnection(ctx, primaryOpts, t, func(conn *pgx.Conn) { - var synced bool - - row := conn.QueryRow(ctx, "CALL spock.wait_for_sync_event(true, $1, $2::pg_lsn, 30);", peerNode, lsn) - require.NoError(t, row.Scan(&synced)) - assert.True(t, synced) + conn, err := f.ConnectToInstance(ctx, ConnectionOptions{ + Matcher: And(WithNode(node.Name), WithRole("primary")), + Username: username, + Password: password, }) + if err != nil { + return false, fmt.Sprintf("connect to %s primary for peer check: %v", node.Name, err) + } + var synced bool + err = conn.QueryRow(ctx, "CALL spock.wait_for_sync_event(true, $1, $2::pg_lsn, 30);", peerNode, lsn).Scan(&synced) + conn.Close(ctx) + if err != nil || !synced { + return false, fmt.Sprintf("%s waiting for %s events (LSN %s): synced=%v err=%v", node.Name, peerNode, lsn, synced, err) + } } } + return true, "" } func (d *DatabaseFixture) SwitchoverDatabaseNode(ctx context.Context, req *controlplane.SwitchoverDatabaseNodePayload) error { diff --git a/e2e/failover_test.go b/e2e/failover_test.go index 03e30e9d..af73f10b 100644 --- a/e2e/failover_test.go +++ b/e2e/failover_test.go @@ -17,11 +17,14 @@ import ( func TestFailoverScenarios(t *testing.T) { t.Parallel() - host1 := fixture.HostIDs()[0] - host2 := fixture.HostIDs()[1] - host3 := fixture.HostIDs()[2] - - ctx, cancel := context.WithTimeout(context.Background(), 8*time.Minute) + hostIDs := fixture.HostIDs() + require.GreaterOrEqual(t, len(hostIDs), 4, "fixture must provide at least 4 hosts") + host1 := hostIDs[0] + host2 := hostIDs[1] + host3 := hostIDs[2] + host4 := hostIDs[3] + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Minute) defer cancel() db := fixture.NewDatabaseFixture(ctx, t, &controlplane.CreateDatabaseRequest{ @@ -42,6 +45,10 @@ func TestFailoverScenarios(t *testing.T) { Name: "n1", HostIds: []controlplane.Identifier{controlplane.Identifier(host1), controlplane.Identifier(host2), controlplane.Identifier(host3)}, }, + { + Name: "n2", + HostIds: []controlplane.Identifier{controlplane.Identifier(host4)}, + }, }, }, }) @@ -138,6 +145,7 @@ func TestFailoverScenarios(t *testing.T) { "[auto] primary did not change within timeout (still %s)", origPrimary) newPrimary := getPrimaryInstanceID() t.Logf("[auto] new primary: %s", newPrimary) + db.WaitForReplication(ctx, t, "admin", "password") }) t.Run("failover to a specific candidate", func(t *testing.T) { diff --git a/e2e/switchover_test.go b/e2e/switchover_test.go index ef5a74ab..3b7d2d58 100644 --- a/e2e/switchover_test.go +++ b/e2e/switchover_test.go @@ -17,11 +17,14 @@ import ( func TestSwitchoverScenarios(t *testing.T) { t.Parallel() - host1 := fixture.HostIDs()[0] - host2 := fixture.HostIDs()[1] - host3 := fixture.HostIDs()[2] - - ctx, cancel := context.WithTimeout(context.Background(), 8*time.Minute) + hostIDs := fixture.HostIDs() + require.GreaterOrEqual(t, len(hostIDs), 4, "fixture must provide at least 4 hosts") + host1 := hostIDs[0] + host2 := hostIDs[1] + host3 := hostIDs[2] + host4 := hostIDs[3] + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Minute) defer cancel() db := fixture.NewDatabaseFixture(ctx, t, &controlplane.CreateDatabaseRequest{ @@ -42,6 +45,10 @@ func TestSwitchoverScenarios(t *testing.T) { Name: "n1", HostIds: []controlplane.Identifier{controlplane.Identifier(host1), controlplane.Identifier(host2), controlplane.Identifier(host3)}, }, + { + Name: "n2", + HostIds: []controlplane.Identifier{controlplane.Identifier(host4)}, + }, }, }, }) @@ -140,6 +147,7 @@ func TestSwitchoverScenarios(t *testing.T) { "[auto] primary did not change within timeout (still %s)", origPrimary) newPrimary := getPrimaryInstanceID() t.Logf("[auto] new primary: %s", newPrimary) + db.WaitForReplication(ctx, t, "admin", "password") }) t.Run("switchover to a specific candidate", func(t *testing.T) { diff --git a/server/internal/patroni/client.go b/server/internal/patroni/client.go index f8ad26b8..80384077 100644 --- a/server/internal/patroni/client.go +++ b/server/internal/patroni/client.go @@ -706,6 +706,14 @@ func (c *Client) Readiness(ctx context.Context) error { return nil } +// WaitForLeaderChange polls until the cluster reports a different running leader +// than oldLeader. It uses the same polling logic as waitForSwitchover and is +// safe to call after both switchover and failover requests. The caller's +// context is wrapped with a one-minute hard deadline. +func (c *Client) WaitForLeaderChange(ctx context.Context, oldLeader *string) error { + return c.waitForSwitchover(ctx, oldLeader) +} + func (c *Client) waitForSwitchover(ctx context.Context, oldLeader *string) error { ctx, cancel := context.WithTimeout(ctx, time.Minute) defer cancel() diff --git a/server/internal/postgres/create_db.go b/server/internal/postgres/create_db.go index cd6e0107..d4a70339 100644 --- a/server/internal/postgres/create_db.go +++ b/server/internal/postgres/create_db.go @@ -210,6 +210,7 @@ func DropSubscription(providerName, subscriberName string) Statement { } } + func DropAllSubscriptions() Statement { return Statement{ SQL: "SELECT spock.sub_drop(sub_name) FROM spock.subscription;", diff --git a/server/internal/workflows/activities/failover.go b/server/internal/workflows/activities/failover.go index 55b20801..7101176b 100644 --- a/server/internal/workflows/activities/failover.go +++ b/server/internal/workflows/activities/failover.go @@ -65,6 +65,16 @@ func (a *Activities) PerformFailover(ctx context.Context, input *PerformFailover return nil, fmt.Errorf("patroni initiate failover call failed: %w", err) } - logger.Info("patroni initiate failover request sent") + // Wait for the new primary to be running before returning. This mirrors + // the wait=true behaviour in ScheduleSwitchover and ensures the task + // completes only after the new primary is confirmed. + // For crashed-primary scenarios the old leader's endpoint may be + // unreachable; WaitForLeaderChange tolerates up to 3 connection errors + // before giving up, so it degrades gracefully. + if err := pClient.WaitForLeaderChange(ctx, failReq.Leader); err != nil { + logger.Warn("timed out waiting for failover to complete", "error", err) + } + + logger.Info("patroni failover completed, new primary is running") return &PerformFailoverOutput{}, nil } diff --git a/server/internal/workflows/activities/switchover.go b/server/internal/workflows/activities/switchover.go index d242ff17..55fba610 100644 --- a/server/internal/workflows/activities/switchover.go +++ b/server/internal/workflows/activities/switchover.go @@ -64,7 +64,13 @@ func (a *Activities) PerformSwitchover(ctx context.Context, input *PerformSwitch swReq.ScheduledAt = &input.ScheduledAt } - if err := pClient.ScheduleSwitchover(ctx, swReq, false); err != nil { + // For immediate switchovers, wait for Patroni to complete the primary + // promotion before returning. This ensures the new primary is running + // before the workflow proceeds to refresh Spock subscriptions. Scheduled + // switchovers return immediately because the promotion happens at the + // scheduled time, which is after the workflow task completes. + wait := input.ScheduledAt.IsZero() + if err := pClient.ScheduleSwitchover(ctx, swReq, wait); err != nil { return nil, fmt.Errorf("patroni scheduled switchover call failed: %w", err) }