From 8abb644dff36fdb26e570a14efd1aeb64fb40f5a Mon Sep 17 00:00:00 2001 From: Siva Date: Fri, 15 May 2026 23:06:37 +0530 Subject: [PATCH 1/9] test(e2e): add WaitForReplication after first switchover/failover --- e2e/failover_test.go | 1 + e2e/switchover_test.go | 1 + 2 files changed, 2 insertions(+) diff --git a/e2e/failover_test.go b/e2e/failover_test.go index 03e30e9d..9d183d4c 100644 --- a/e2e/failover_test.go +++ b/e2e/failover_test.go @@ -138,6 +138,7 @@ func TestFailoverScenarios(t *testing.T) { "[auto] primary did not change within timeout (still %s)", origPrimary) newPrimary := getPrimaryInstanceID() t.Logf("[auto] new primary: %s", newPrimary) + db.WaitForReplication(ctx, t, "admin", "password") }) t.Run("failover to a specific candidate", func(t *testing.T) { diff --git a/e2e/switchover_test.go b/e2e/switchover_test.go index ef5a74ab..8b15431c 100644 --- a/e2e/switchover_test.go +++ b/e2e/switchover_test.go @@ -140,6 +140,7 @@ func TestSwitchoverScenarios(t *testing.T) { "[auto] primary did not change within timeout (still %s)", origPrimary) newPrimary := getPrimaryInstanceID() t.Logf("[auto] new primary: %s", newPrimary) + db.WaitForReplication(ctx, t, "admin", "password") }) t.Run("switchover to a specific candidate", func(t *testing.T) { From 78a8e0cb88667a2eb011b5efc731fa58d19efd7a Mon Sep 17 00:00:00 2001 From: Siva Date: Tue, 26 May 2026 20:27:27 +0530 Subject: [PATCH 2/9] added n2 node to switchover/failover e2e tests --- e2e/failover_test.go | 7 ++++++- e2e/switchover_test.go | 7 ++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/e2e/failover_test.go b/e2e/failover_test.go index 9d183d4c..810fefc4 100644 --- a/e2e/failover_test.go +++ b/e2e/failover_test.go @@ -20,8 +20,9 @@ func TestFailoverScenarios(t *testing.T) { host1 := fixture.HostIDs()[0] host2 := fixture.HostIDs()[1] host3 := fixture.HostIDs()[2] + host4 := fixture.HostIDs()[3] - ctx, cancel := context.WithTimeout(context.Background(), 8*time.Minute) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) defer cancel() db := fixture.NewDatabaseFixture(ctx, t, &controlplane.CreateDatabaseRequest{ @@ -42,6 +43,10 @@ func TestFailoverScenarios(t *testing.T) { Name: "n1", HostIds: []controlplane.Identifier{controlplane.Identifier(host1), controlplane.Identifier(host2), controlplane.Identifier(host3)}, }, + { + Name: "n2", + HostIds: []controlplane.Identifier{controlplane.Identifier(host4)}, + }, }, }, }) diff --git a/e2e/switchover_test.go b/e2e/switchover_test.go index 8b15431c..d2b61b99 100644 --- a/e2e/switchover_test.go +++ b/e2e/switchover_test.go @@ -20,8 +20,9 @@ func TestSwitchoverScenarios(t *testing.T) { host1 := fixture.HostIDs()[0] host2 := fixture.HostIDs()[1] host3 := fixture.HostIDs()[2] + host4 := fixture.HostIDs()[3] - ctx, cancel := context.WithTimeout(context.Background(), 8*time.Minute) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) defer cancel() db := fixture.NewDatabaseFixture(ctx, t, &controlplane.CreateDatabaseRequest{ @@ -42,6 +43,10 @@ func TestSwitchoverScenarios(t *testing.T) { Name: "n1", HostIds: []controlplane.Identifier{controlplane.Identifier(host1), controlplane.Identifier(host2), controlplane.Identifier(host3)}, }, + { + Name: "n2", + HostIds: []controlplane.Identifier{controlplane.Identifier(host4)}, + }, }, }, }) From f5c61dc4f3a21ea7b921126aa3502fd9a411cdaa Mon Sep 17 00:00:00 2001 From: Siva Date: Tue, 26 May 2026 21:25:17 +0530 Subject: [PATCH 3/9] addressing AI review comments --- e2e/database_test.go | 2 +- e2e/failover_test.go | 10 ++++++---- e2e/switchover_test.go | 10 ++++++---- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/e2e/database_test.go b/e2e/database_test.go index 52f29367..f0a0c85f 100644 --- a/e2e/database_test.go +++ b/e2e/database_test.go @@ -393,7 +393,7 @@ func (f *DatabaseFixture) WaitForReplication(ctx context.Context, t testing.TB, f.WithConnection(ctx, primaryOpts, t, func(conn *pgx.Conn) { var synced bool - row := conn.QueryRow(ctx, "CALL spock.wait_for_sync_event(true, $1, $2::pg_lsn, 30);", peerNode, lsn) + row := conn.QueryRow(ctx, "CALL spock.wait_for_sync_event(true, $1, $2::pg_lsn, 90);", peerNode, lsn) require.NoError(t, row.Scan(&synced)) assert.True(t, synced) }) diff --git a/e2e/failover_test.go b/e2e/failover_test.go index 810fefc4..411c3c86 100644 --- a/e2e/failover_test.go +++ b/e2e/failover_test.go @@ -17,10 +17,12 @@ import ( func TestFailoverScenarios(t *testing.T) { t.Parallel() - host1 := fixture.HostIDs()[0] - host2 := fixture.HostIDs()[1] - host3 := fixture.HostIDs()[2] - host4 := fixture.HostIDs()[3] + hostIDs := fixture.HostIDs() + require.GreaterOrEqual(t, len(hostIDs), 4, "fixture must provide at least 4 hosts") + host1 := hostIDs[0] + host2 := hostIDs[1] + host3 := hostIDs[2] + host4 := hostIDs[3] ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) defer cancel() diff --git a/e2e/switchover_test.go b/e2e/switchover_test.go index d2b61b99..02825805 100644 --- a/e2e/switchover_test.go +++ b/e2e/switchover_test.go @@ -17,10 +17,12 @@ import ( func TestSwitchoverScenarios(t *testing.T) { t.Parallel() - host1 := fixture.HostIDs()[0] - host2 := fixture.HostIDs()[1] - host3 := fixture.HostIDs()[2] - host4 := fixture.HostIDs()[3] + hostIDs := fixture.HostIDs() + require.GreaterOrEqual(t, len(hostIDs), 4, "fixture must provide at least 4 hosts") + host1 := hostIDs[0] + host2 := hostIDs[1] + host3 := hostIDs[2] + host4 := hostIDs[3] ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) defer cancel() From 817c9735390cf6bf2f073fcf2d0774037c03fba0 Mon Sep 17 00:00:00 2001 From: Siva Date: Tue, 26 May 2026 22:46:42 +0530 Subject: [PATCH 4/9] fixing e2e tests --- e2e/database_test.go | 77 +++++++++++++++++++++++++++----------------- 1 file changed, 48 insertions(+), 29 deletions(-) diff --git a/e2e/database_test.go b/e2e/database_test.go index f0a0c85f..89cbf811 100644 --- a/e2e/database_test.go +++ b/e2e/database_test.go @@ -8,10 +8,9 @@ import ( "fmt" "iter" "testing" + "time" "github.com/jackc/pgx/v5" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" controlplane "github.com/pgEdge/control-plane/api/apiv1/gen/control_plane" "github.com/pgEdge/control-plane/client" @@ -356,49 +355,69 @@ func (d *DatabaseFixture) waitForTask(ctx context.Context, task *controlplane.Ta func (f *DatabaseFixture) WaitForReplication(ctx context.Context, t testing.TB, username, password string) { tLog(t, "waiting for replication to catch up on all nodes") - // Execute sync_event on all primary nodes - nodeSyncMap := make(map[string]string) - for _, node := range f.Spec.Nodes { - primaryOpts := ConnectionOptions{ - Matcher: And(WithNode(node.Name), WithRole("primary")), - Username: username, - Password: password, + // After a primary change (switchover/failover), Spock subscriptions need + // time to reconnect to the new primary. Retry with fresh sync markers + // until all nodes are in sync or the 3-minute deadline is exceeded. + deadline := time.Now().Add(3 * time.Minute) + for { + if f.pollReplication(ctx, username, password) { + return } - - f.WithConnection(ctx, primaryOpts, t, func(conn *pgx.Conn) { - var syncLSN string - - row := conn.QueryRow(ctx, "SELECT spock.sync_event();") - require.NoError(t, row.Scan(&syncLSN)) - - assert.NotEmpty(t, syncLSN) - - nodeSyncMap[node.Name] = syncLSN - }) + if time.Now().After(deadline) { + t.Error("replication did not catch up on all nodes within 3 minutes") + return + } + tLog(t, "replication not yet in sync, retrying in 15s...") + time.Sleep(15 * time.Second) } +} - // Verify wait_for_sync_event on all other nodes +// pollReplication inserts a sync marker on every primary, then checks that +// each marker has been delivered to every other node's primary. Returns true +// only when all cross-node checks pass. Never calls t.Fatal/t.Error so it +// can be used safely inside a retry loop. +func (f *DatabaseFixture) pollReplication(ctx context.Context, username, password string) bool { + nodeSyncMap := make(map[string]string) for _, node := range f.Spec.Nodes { - primaryOpts := ConnectionOptions{ + conn, err := f.ConnectToInstance(ctx, ConnectionOptions{ Matcher: And(WithNode(node.Name), WithRole("primary")), Username: username, Password: password, + }) + if err != nil { + return false + } + var syncLSN string + err = conn.QueryRow(ctx, "SELECT spock.sync_event();").Scan(&syncLSN) + conn.Close(ctx) + if err != nil || syncLSN == "" { + return false } + nodeSyncMap[node.Name] = syncLSN + } + for _, node := range f.Spec.Nodes { for peerNode, lsn := range nodeSyncMap { if peerNode == node.Name { continue } - - f.WithConnection(ctx, primaryOpts, t, func(conn *pgx.Conn) { - var synced bool - - row := conn.QueryRow(ctx, "CALL spock.wait_for_sync_event(true, $1, $2::pg_lsn, 90);", peerNode, lsn) - require.NoError(t, row.Scan(&synced)) - assert.True(t, synced) + conn, err := f.ConnectToInstance(ctx, ConnectionOptions{ + Matcher: And(WithNode(node.Name), WithRole("primary")), + Username: username, + Password: password, }) + if err != nil { + return false + } + var synced bool + err = conn.QueryRow(ctx, "CALL spock.wait_for_sync_event(true, $1, $2::pg_lsn, 30);", peerNode, lsn).Scan(&synced) + conn.Close(ctx) + if err != nil || !synced { + return false + } } } + return true } func (d *DatabaseFixture) SwitchoverDatabaseNode(ctx context.Context, req *controlplane.SwitchoverDatabaseNodePayload) error { From c0aa0f5024c9e017b53ef5d21b5c521b26f82ca8 Mon Sep 17 00:00:00 2001 From: Siva Date: Tue, 26 May 2026 23:01:03 +0530 Subject: [PATCH 5/9] addressing AI comments --- e2e/database_test.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/e2e/database_test.go b/e2e/database_test.go index 89cbf811..7550faa6 100644 --- a/e2e/database_test.go +++ b/e2e/database_test.go @@ -360,6 +360,10 @@ func (f *DatabaseFixture) WaitForReplication(ctx context.Context, t testing.TB, // until all nodes are in sync or the 3-minute deadline is exceeded. deadline := time.Now().Add(3 * time.Minute) for { + if err := f.Refresh(ctx); err != nil { + t.Errorf("failed to refresh database state while waiting for replication: %v", err) + return + } if f.pollReplication(ctx, username, password) { return } @@ -368,7 +372,12 @@ func (f *DatabaseFixture) WaitForReplication(ctx context.Context, t testing.TB, return } tLog(t, "replication not yet in sync, retrying in 15s...") - time.Sleep(15 * time.Second) + select { + case <-ctx.Done(): + t.Errorf("replication wait aborted: %v", ctx.Err()) + return + case <-time.After(15 * time.Second): + } } } From 57aa7db9353d5291686b0ac98edd1fd30e47c370 Mon Sep 17 00:00:00 2001 From: Siva Date: Wed, 27 May 2026 09:51:02 +0530 Subject: [PATCH 6/9] e2e tests fix --- e2e/database_test.go | 33 +++++++++++++++++++-------------- e2e/failover_test.go | 2 +- e2e/switchover_test.go | 2 +- 3 files changed, 21 insertions(+), 16 deletions(-) diff --git a/e2e/database_test.go b/e2e/database_test.go index 7550faa6..95139b35 100644 --- a/e2e/database_test.go +++ b/e2e/database_test.go @@ -357,21 +357,25 @@ func (f *DatabaseFixture) WaitForReplication(ctx context.Context, t testing.TB, // After a primary change (switchover/failover), Spock subscriptions need // time to reconnect to the new primary. Retry with fresh sync markers - // until all nodes are in sync or the 3-minute deadline is exceeded. - deadline := time.Now().Add(3 * time.Minute) + // until all nodes are in sync or the 5-minute deadline is exceeded. + // The 5-minute budget accommodates Spock's reconnect delay when the + // provider node's primary changes (the logical replication slot must be + // re-created on the new primary, which takes additional time in CI). + deadline := time.Now().Add(5 * time.Minute) for { if err := f.Refresh(ctx); err != nil { t.Errorf("failed to refresh database state while waiting for replication: %v", err) return } - if f.pollReplication(ctx, username, password) { + ok, reason := f.pollReplication(ctx, username, password) + if ok { return } if time.Now().After(deadline) { - t.Error("replication did not catch up on all nodes within 3 minutes") + t.Errorf("replication did not catch up on all nodes within 5 minutes (last failure: %s)", reason) return } - tLog(t, "replication not yet in sync, retrying in 15s...") + tLogf(t, "replication not yet in sync, retrying in 15s... (%s)", reason) select { case <-ctx.Done(): t.Errorf("replication wait aborted: %v", ctx.Err()) @@ -382,10 +386,11 @@ func (f *DatabaseFixture) WaitForReplication(ctx context.Context, t testing.TB, } // pollReplication inserts a sync marker on every primary, then checks that -// each marker has been delivered to every other node's primary. Returns true -// only when all cross-node checks pass. Never calls t.Fatal/t.Error so it -// can be used safely inside a retry loop. -func (f *DatabaseFixture) pollReplication(ctx context.Context, username, password string) bool { +// each marker has been delivered to every other node's primary. Returns +// (true, "") only when all cross-node checks pass. On failure it returns +// (false, reason) where reason identifies the failing pair so callers can +// log it. Never calls t.Fatal/t.Error so it can be used safely in a retry loop. +func (f *DatabaseFixture) pollReplication(ctx context.Context, username, password string) (bool, string) { nodeSyncMap := make(map[string]string) for _, node := range f.Spec.Nodes { conn, err := f.ConnectToInstance(ctx, ConnectionOptions{ @@ -394,13 +399,13 @@ func (f *DatabaseFixture) pollReplication(ctx context.Context, username, passwor Password: password, }) if err != nil { - return false + return false, fmt.Sprintf("connect to %s primary: %v", node.Name, err) } var syncLSN string err = conn.QueryRow(ctx, "SELECT spock.sync_event();").Scan(&syncLSN) conn.Close(ctx) if err != nil || syncLSN == "" { - return false + return false, fmt.Sprintf("sync_event on %s: %v", node.Name, err) } nodeSyncMap[node.Name] = syncLSN } @@ -416,17 +421,17 @@ func (f *DatabaseFixture) pollReplication(ctx context.Context, username, passwor Password: password, }) if err != nil { - return false + return false, fmt.Sprintf("connect to %s primary for peer check: %v", node.Name, err) } var synced bool err = conn.QueryRow(ctx, "CALL spock.wait_for_sync_event(true, $1, $2::pg_lsn, 30);", peerNode, lsn).Scan(&synced) conn.Close(ctx) if err != nil || !synced { - return false + return false, fmt.Sprintf("%s waiting for %s events (LSN %s): synced=%v err=%v", node.Name, peerNode, lsn, synced, err) } } } - return true + return true, "" } func (d *DatabaseFixture) SwitchoverDatabaseNode(ctx context.Context, req *controlplane.SwitchoverDatabaseNodePayload) error { diff --git a/e2e/failover_test.go b/e2e/failover_test.go index 411c3c86..af73f10b 100644 --- a/e2e/failover_test.go +++ b/e2e/failover_test.go @@ -24,7 +24,7 @@ func TestFailoverScenarios(t *testing.T) { host3 := hostIDs[2] host4 := hostIDs[3] - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Minute) defer cancel() db := fixture.NewDatabaseFixture(ctx, t, &controlplane.CreateDatabaseRequest{ diff --git a/e2e/switchover_test.go b/e2e/switchover_test.go index 02825805..3b7d2d58 100644 --- a/e2e/switchover_test.go +++ b/e2e/switchover_test.go @@ -24,7 +24,7 @@ func TestSwitchoverScenarios(t *testing.T) { host3 := hostIDs[2] host4 := hostIDs[3] - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Minute) defer cancel() db := fixture.NewDatabaseFixture(ctx, t, &controlplane.CreateDatabaseRequest{ From ba21dc8025207f351b17c886734d6f589005e892 Mon Sep 17 00:00:00 2001 From: Siva Date: Wed, 27 May 2026 12:51:45 +0530 Subject: [PATCH 7/9] restore Spock replication after switchover/failover --- .../internal/workflows/activities/switchover.go | 8 +++++++- server/internal/workflows/failover.go | 13 +++++++++++++ server/internal/workflows/switchover.go | 17 +++++++++++++++++ 3 files changed, 37 insertions(+), 1 deletion(-) diff --git a/server/internal/workflows/activities/switchover.go b/server/internal/workflows/activities/switchover.go index d242ff17..55fba610 100644 --- a/server/internal/workflows/activities/switchover.go +++ b/server/internal/workflows/activities/switchover.go @@ -64,7 +64,13 @@ func (a *Activities) PerformSwitchover(ctx context.Context, input *PerformSwitch swReq.ScheduledAt = &input.ScheduledAt } - if err := pClient.ScheduleSwitchover(ctx, swReq, false); err != nil { + // For immediate switchovers, wait for Patroni to complete the primary + // promotion before returning. This ensures the new primary is running + // before the workflow proceeds to refresh Spock subscriptions. Scheduled + // switchovers return immediately because the promotion happens at the + // scheduled time, which is after the workflow task completes. + wait := input.ScheduledAt.IsZero() + if err := pClient.ScheduleSwitchover(ctx, swReq, wait); err != nil { return nil, fmt.Errorf("patroni scheduled switchover call failed: %w", err) } diff --git a/server/internal/workflows/failover.go b/server/internal/workflows/failover.go index 1e1764b2..dc15a29e 100644 --- a/server/internal/workflows/failover.go +++ b/server/internal/workflows/failover.go @@ -162,6 +162,19 @@ func (w *Workflows) Failover(ctx workflow.Context, in *FailoverInput) (*Failover return nil, handleError(fmt.Errorf("perform failover activity failed: %w", err)) } + // After the primary changes, logical replication slots used by Spock are + // missing from the new primary (Patroni does not replicate them). Run a + // resource refresh to recreate the missing slots and update subscription + // interfaces so that Spock reconnects without manual intervention. + logger.Info("refreshing database state to restore Spock subscriptions after failover") + refreshIn := &RefreshCurrentStateInput{ + DatabaseID: in.DatabaseID, + TaskID: in.TaskID, + } + if _, err := w.ExecuteRefreshCurrentState(ctx, refreshIn).Get(ctx); err != nil { + logger.Warn("failed to refresh database state after failover; Spock replication may recover slowly", "error", err) + } + completeUpdate := &activities.UpdateTaskInput{ Scope: task.ScopeDatabase, EntityID: in.DatabaseID, diff --git a/server/internal/workflows/switchover.go b/server/internal/workflows/switchover.go index d1e1eab8..638b5761 100644 --- a/server/internal/workflows/switchover.go +++ b/server/internal/workflows/switchover.go @@ -163,6 +163,23 @@ func (w *Workflows) Switchover(ctx workflow.Context, in *SwitchoverInput) (*Swit return nil, handleError(fmt.Errorf("perform switchover activity failed: %w", err)) } + // After the primary changes, logical replication slots used by Spock are + // missing from the new primary (Patroni does not replicate them). Run a + // resource refresh to recreate the missing slots and update subscription + // interfaces so that Spock reconnects without manual intervention. + // Only do this for immediate switchovers; scheduled switchovers fire at a + // future time outside the scope of this workflow execution. + if in.ScheduledAt.IsZero() { + logger.Info("refreshing database state to restore Spock subscriptions after switchover") + refreshIn := &RefreshCurrentStateInput{ + DatabaseID: in.DatabaseID, + TaskID: in.TaskID, + } + if _, err := w.ExecuteRefreshCurrentState(ctx, refreshIn).Get(ctx); err != nil { + logger.Warn("failed to refresh database state after switchover; Spock replication may recover slowly", "error", err) + } + } + completeUpdate := &activities.UpdateTaskInput{ Scope: task.ScopeDatabase, EntityID: in.DatabaseID, From 6b47b9ea91ff1706677954f5a437e930cc6d5e09 Mon Sep 17 00:00:00 2001 From: Siva Date: Wed, 27 May 2026 16:31:07 +0530 Subject: [PATCH 8/9] fixing e2e tests --- server/internal/patroni/client.go | 8 ++++++++ server/internal/workflows/activities/failover.go | 13 ++++++++++++- 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/server/internal/patroni/client.go b/server/internal/patroni/client.go index f8ad26b8..80384077 100644 --- a/server/internal/patroni/client.go +++ b/server/internal/patroni/client.go @@ -706,6 +706,14 @@ func (c *Client) Readiness(ctx context.Context) error { return nil } +// WaitForLeaderChange polls until the cluster reports a different running leader +// than oldLeader. It uses the same polling logic as waitForSwitchover and is +// safe to call after both switchover and failover requests. The caller's +// context is wrapped with a one-minute hard deadline. +func (c *Client) WaitForLeaderChange(ctx context.Context, oldLeader *string) error { + return c.waitForSwitchover(ctx, oldLeader) +} + func (c *Client) waitForSwitchover(ctx context.Context, oldLeader *string) error { ctx, cancel := context.WithTimeout(ctx, time.Minute) defer cancel() diff --git a/server/internal/workflows/activities/failover.go b/server/internal/workflows/activities/failover.go index 55b20801..20eebbd7 100644 --- a/server/internal/workflows/activities/failover.go +++ b/server/internal/workflows/activities/failover.go @@ -65,6 +65,17 @@ func (a *Activities) PerformFailover(ctx context.Context, input *PerformFailover return nil, fmt.Errorf("patroni initiate failover call failed: %w", err) } - logger.Info("patroni initiate failover request sent") + // Wait for the new primary to be running before returning. This mirrors + // the wait=true behaviour in ScheduleSwitchover and ensures that the + // subsequent RefreshCurrentState sub-workflow in the failover workflow + // sees the new primary rather than the old one. + // For crashed-primary scenarios the old leader's endpoint may be + // unreachable; WaitForLeaderChange tolerates up to 3 connection errors + // before giving up, so it degrades gracefully. + if err := pClient.WaitForLeaderChange(ctx, failReq.Leader); err != nil { + logger.Warn("timed out waiting for failover to complete; subscription refresh may run before new primary is ready", "error", err) + } + + logger.Info("patroni failover completed, new primary is running") return &PerformFailoverOutput{}, nil } From b7c6e3644b12ee73ee1aa029866cfb02c534a45d Mon Sep 17 00:00:00 2001 From: Siva Date: Wed, 27 May 2026 18:39:32 +0530 Subject: [PATCH 9/9] fixing e2e tests --- server/internal/postgres/create_db.go | 1 + .../internal/workflows/activities/failover.go | 7 +++---- server/internal/workflows/failover.go | 13 ------------- server/internal/workflows/switchover.go | 17 ----------------- 4 files changed, 4 insertions(+), 34 deletions(-) diff --git a/server/internal/postgres/create_db.go b/server/internal/postgres/create_db.go index cd6e0107..d4a70339 100644 --- a/server/internal/postgres/create_db.go +++ b/server/internal/postgres/create_db.go @@ -210,6 +210,7 @@ func DropSubscription(providerName, subscriberName string) Statement { } } + func DropAllSubscriptions() Statement { return Statement{ SQL: "SELECT spock.sub_drop(sub_name) FROM spock.subscription;", diff --git a/server/internal/workflows/activities/failover.go b/server/internal/workflows/activities/failover.go index 20eebbd7..7101176b 100644 --- a/server/internal/workflows/activities/failover.go +++ b/server/internal/workflows/activities/failover.go @@ -66,14 +66,13 @@ func (a *Activities) PerformFailover(ctx context.Context, input *PerformFailover } // Wait for the new primary to be running before returning. This mirrors - // the wait=true behaviour in ScheduleSwitchover and ensures that the - // subsequent RefreshCurrentState sub-workflow in the failover workflow - // sees the new primary rather than the old one. + // the wait=true behaviour in ScheduleSwitchover and ensures the task + // completes only after the new primary is confirmed. // For crashed-primary scenarios the old leader's endpoint may be // unreachable; WaitForLeaderChange tolerates up to 3 connection errors // before giving up, so it degrades gracefully. if err := pClient.WaitForLeaderChange(ctx, failReq.Leader); err != nil { - logger.Warn("timed out waiting for failover to complete; subscription refresh may run before new primary is ready", "error", err) + logger.Warn("timed out waiting for failover to complete", "error", err) } logger.Info("patroni failover completed, new primary is running") diff --git a/server/internal/workflows/failover.go b/server/internal/workflows/failover.go index dc15a29e..1e1764b2 100644 --- a/server/internal/workflows/failover.go +++ b/server/internal/workflows/failover.go @@ -162,19 +162,6 @@ func (w *Workflows) Failover(ctx workflow.Context, in *FailoverInput) (*Failover return nil, handleError(fmt.Errorf("perform failover activity failed: %w", err)) } - // After the primary changes, logical replication slots used by Spock are - // missing from the new primary (Patroni does not replicate them). Run a - // resource refresh to recreate the missing slots and update subscription - // interfaces so that Spock reconnects without manual intervention. - logger.Info("refreshing database state to restore Spock subscriptions after failover") - refreshIn := &RefreshCurrentStateInput{ - DatabaseID: in.DatabaseID, - TaskID: in.TaskID, - } - if _, err := w.ExecuteRefreshCurrentState(ctx, refreshIn).Get(ctx); err != nil { - logger.Warn("failed to refresh database state after failover; Spock replication may recover slowly", "error", err) - } - completeUpdate := &activities.UpdateTaskInput{ Scope: task.ScopeDatabase, EntityID: in.DatabaseID, diff --git a/server/internal/workflows/switchover.go b/server/internal/workflows/switchover.go index 638b5761..d1e1eab8 100644 --- a/server/internal/workflows/switchover.go +++ b/server/internal/workflows/switchover.go @@ -163,23 +163,6 @@ func (w *Workflows) Switchover(ctx workflow.Context, in *SwitchoverInput) (*Swit return nil, handleError(fmt.Errorf("perform switchover activity failed: %w", err)) } - // After the primary changes, logical replication slots used by Spock are - // missing from the new primary (Patroni does not replicate them). Run a - // resource refresh to recreate the missing slots and update subscription - // interfaces so that Spock reconnects without manual intervention. - // Only do this for immediate switchovers; scheduled switchovers fire at a - // future time outside the scope of this workflow execution. - if in.ScheduledAt.IsZero() { - logger.Info("refreshing database state to restore Spock subscriptions after switchover") - refreshIn := &RefreshCurrentStateInput{ - DatabaseID: in.DatabaseID, - TaskID: in.TaskID, - } - if _, err := w.ExecuteRefreshCurrentState(ctx, refreshIn).Get(ctx); err != nil { - logger.Warn("failed to refresh database state after switchover; Spock replication may recover slowly", "error", err) - } - } - completeUpdate := &activities.UpdateTaskInput{ Scope: task.ScopeDatabase, EntityID: in.DatabaseID,