Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 62 additions & 29 deletions e2e/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ import (
"fmt"
"iter"
"testing"
"time"

"github.com/jackc/pgx/v5"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

controlplane "github.com/pgEdge/control-plane/api/apiv1/gen/control_plane"
"github.com/pgEdge/control-plane/client"
Expand Down Expand Up @@ -356,49 +355,83 @@ func (d *DatabaseFixture) waitForTask(ctx context.Context, task *controlplane.Ta
func (f *DatabaseFixture) WaitForReplication(ctx context.Context, t testing.TB, username, password string) {
tLog(t, "waiting for replication to catch up on all nodes")

// Execute sync_event on all primary nodes
nodeSyncMap := make(map[string]string)
for _, node := range f.Spec.Nodes {
primaryOpts := ConnectionOptions{
Matcher: And(WithNode(node.Name), WithRole("primary")),
Username: username,
Password: password,
// After a primary change (switchover/failover), Spock subscriptions need
// time to reconnect to the new primary. Retry with fresh sync markers
// until all nodes are in sync or the 5-minute deadline is exceeded.
// The 5-minute budget accommodates Spock's reconnect delay when the
// provider node's primary changes (the logical replication slot must be
// re-created on the new primary, which takes additional time in CI).
deadline := time.Now().Add(5 * time.Minute)
for {
if err := f.Refresh(ctx); err != nil {
t.Errorf("failed to refresh database state while waiting for replication: %v", err)
return
}
ok, reason := f.pollReplication(ctx, username, password)
if ok {
return
}
if time.Now().After(deadline) {
t.Errorf("replication did not catch up on all nodes within 5 minutes (last failure: %s)", reason)
return
}
tLogf(t, "replication not yet in sync, retrying in 15s... (%s)", reason)
select {
case <-ctx.Done():
t.Errorf("replication wait aborted: %v", ctx.Err())
return
case <-time.After(15 * time.Second):
}

f.WithConnection(ctx, primaryOpts, t, func(conn *pgx.Conn) {
var syncLSN string

row := conn.QueryRow(ctx, "SELECT spock.sync_event();")
require.NoError(t, row.Scan(&syncLSN))

assert.NotEmpty(t, syncLSN)

nodeSyncMap[node.Name] = syncLSN
})
}
}

// Verify wait_for_sync_event on all other nodes
// pollReplication inserts a sync marker on every primary, then checks that
// each marker has been delivered to every other node's primary. Returns
// (true, "") only when all cross-node checks pass. On failure it returns
// (false, reason) where reason identifies the failing pair so callers can
// log it. Never calls t.Fatal/t.Error so it can be used safely in a retry loop.
func (f *DatabaseFixture) pollReplication(ctx context.Context, username, password string) (bool, string) {
nodeSyncMap := make(map[string]string)
for _, node := range f.Spec.Nodes {
primaryOpts := ConnectionOptions{
conn, err := f.ConnectToInstance(ctx, ConnectionOptions{
Matcher: And(WithNode(node.Name), WithRole("primary")),
Username: username,
Password: password,
})
if err != nil {
return false, fmt.Sprintf("connect to %s primary: %v", node.Name, err)
}
var syncLSN string
err = conn.QueryRow(ctx, "SELECT spock.sync_event();").Scan(&syncLSN)
conn.Close(ctx)
if err != nil || syncLSN == "" {
return false, fmt.Sprintf("sync_event on %s: %v", node.Name, err)
}
nodeSyncMap[node.Name] = syncLSN
}

for _, node := range f.Spec.Nodes {
for peerNode, lsn := range nodeSyncMap {
if peerNode == node.Name {
continue
}

f.WithConnection(ctx, primaryOpts, t, func(conn *pgx.Conn) {
var synced bool

row := conn.QueryRow(ctx, "CALL spock.wait_for_sync_event(true, $1, $2::pg_lsn, 30);", peerNode, lsn)
require.NoError(t, row.Scan(&synced))
assert.True(t, synced)
conn, err := f.ConnectToInstance(ctx, ConnectionOptions{
Matcher: And(WithNode(node.Name), WithRole("primary")),
Username: username,
Password: password,
})
if err != nil {
return false, fmt.Sprintf("connect to %s primary for peer check: %v", node.Name, err)
}
var synced bool
err = conn.QueryRow(ctx, "CALL spock.wait_for_sync_event(true, $1, $2::pg_lsn, 30);", peerNode, lsn).Scan(&synced)
conn.Close(ctx)
if err != nil || !synced {
return false, fmt.Sprintf("%s waiting for %s events (LSN %s): synced=%v err=%v", node.Name, peerNode, lsn, synced, err)
}
}
}
return true, ""
}

func (d *DatabaseFixture) SwitchoverDatabaseNode(ctx context.Context, req *controlplane.SwitchoverDatabaseNodePayload) error {
Expand Down
18 changes: 13 additions & 5 deletions e2e/failover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ import (
func TestFailoverScenarios(t *testing.T) {
t.Parallel()

host1 := fixture.HostIDs()[0]
host2 := fixture.HostIDs()[1]
host3 := fixture.HostIDs()[2]

ctx, cancel := context.WithTimeout(context.Background(), 8*time.Minute)
hostIDs := fixture.HostIDs()
require.GreaterOrEqual(t, len(hostIDs), 4, "fixture must provide at least 4 hosts")
host1 := hostIDs[0]
host2 := hostIDs[1]
host3 := hostIDs[2]
host4 := hostIDs[3]

ctx, cancel := context.WithTimeout(context.Background(), 20*time.Minute)
defer cancel()

db := fixture.NewDatabaseFixture(ctx, t, &controlplane.CreateDatabaseRequest{
Expand All @@ -42,6 +45,10 @@ func TestFailoverScenarios(t *testing.T) {
Name: "n1",
HostIds: []controlplane.Identifier{controlplane.Identifier(host1), controlplane.Identifier(host2), controlplane.Identifier(host3)},
},
{
Name: "n2",
HostIds: []controlplane.Identifier{controlplane.Identifier(host4)},
},
},
},
})
Expand Down Expand Up @@ -138,6 +145,7 @@ func TestFailoverScenarios(t *testing.T) {
"[auto] primary did not change within timeout (still %s)", origPrimary)
newPrimary := getPrimaryInstanceID()
t.Logf("[auto] new primary: %s", newPrimary)
db.WaitForReplication(ctx, t, "admin", "password")
})

t.Run("failover to a specific candidate", func(t *testing.T) {
Expand Down
18 changes: 13 additions & 5 deletions e2e/switchover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ import (
func TestSwitchoverScenarios(t *testing.T) {
t.Parallel()

host1 := fixture.HostIDs()[0]
host2 := fixture.HostIDs()[1]
host3 := fixture.HostIDs()[2]

ctx, cancel := context.WithTimeout(context.Background(), 8*time.Minute)
hostIDs := fixture.HostIDs()
require.GreaterOrEqual(t, len(hostIDs), 4, "fixture must provide at least 4 hosts")
host1 := hostIDs[0]
host2 := hostIDs[1]
host3 := hostIDs[2]
host4 := hostIDs[3]

ctx, cancel := context.WithTimeout(context.Background(), 20*time.Minute)
defer cancel()

db := fixture.NewDatabaseFixture(ctx, t, &controlplane.CreateDatabaseRequest{
Expand All @@ -42,6 +45,10 @@ func TestSwitchoverScenarios(t *testing.T) {
Name: "n1",
HostIds: []controlplane.Identifier{controlplane.Identifier(host1), controlplane.Identifier(host2), controlplane.Identifier(host3)},
},
{
Name: "n2",
HostIds: []controlplane.Identifier{controlplane.Identifier(host4)},
},
},
},
})
Expand Down Expand Up @@ -140,6 +147,7 @@ func TestSwitchoverScenarios(t *testing.T) {
"[auto] primary did not change within timeout (still %s)", origPrimary)
newPrimary := getPrimaryInstanceID()
t.Logf("[auto] new primary: %s", newPrimary)
db.WaitForReplication(ctx, t, "admin", "password")
})

t.Run("switchover to a specific candidate", func(t *testing.T) {
Expand Down
8 changes: 7 additions & 1 deletion server/internal/workflows/activities/switchover.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,13 @@ func (a *Activities) PerformSwitchover(ctx context.Context, input *PerformSwitch
swReq.ScheduledAt = &input.ScheduledAt
}

if err := pClient.ScheduleSwitchover(ctx, swReq, false); err != nil {
// For immediate switchovers, wait for Patroni to complete the primary
// promotion before returning. This ensures the new primary is running
// before the workflow proceeds to refresh Spock subscriptions. Scheduled
// switchovers return immediately because the promotion happens at the
// scheduled time, which is after the workflow task completes.
wait := input.ScheduledAt.IsZero()
if err := pClient.ScheduleSwitchover(ctx, swReq, wait); err != nil {
return nil, fmt.Errorf("patroni scheduled switchover call failed: %w", err)
}

Expand Down
13 changes: 13 additions & 0 deletions server/internal/workflows/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,19 @@ func (w *Workflows) Failover(ctx workflow.Context, in *FailoverInput) (*Failover
return nil, handleError(fmt.Errorf("perform failover activity failed: %w", err))
}

// After the primary changes, logical replication slots used by Spock are
// missing from the new primary (Patroni does not replicate them). Run a
// resource refresh to recreate the missing slots and update subscription
// interfaces so that Spock reconnects without manual intervention.
logger.Info("refreshing database state to restore Spock subscriptions after failover")
refreshIn := &RefreshCurrentStateInput{
DatabaseID: in.DatabaseID,
TaskID: in.TaskID,
}
if _, err := w.ExecuteRefreshCurrentState(ctx, refreshIn).Get(ctx); err != nil {
logger.Warn("failed to refresh database state after failover; Spock replication may recover slowly", "error", err)
}

completeUpdate := &activities.UpdateTaskInput{
Scope: task.ScopeDatabase,
EntityID: in.DatabaseID,
Expand Down
17 changes: 17 additions & 0 deletions server/internal/workflows/switchover.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,23 @@ func (w *Workflows) Switchover(ctx workflow.Context, in *SwitchoverInput) (*Swit
return nil, handleError(fmt.Errorf("perform switchover activity failed: %w", err))
}

// After the primary changes, logical replication slots used by Spock are
// missing from the new primary (Patroni does not replicate them). Run a
// resource refresh to recreate the missing slots and update subscription
// interfaces so that Spock reconnects without manual intervention.
// Only do this for immediate switchovers; scheduled switchovers fire at a
// future time outside the scope of this workflow execution.
if in.ScheduledAt.IsZero() {
logger.Info("refreshing database state to restore Spock subscriptions after switchover")
refreshIn := &RefreshCurrentStateInput{
DatabaseID: in.DatabaseID,
TaskID: in.TaskID,
}
if _, err := w.ExecuteRefreshCurrentState(ctx, refreshIn).Get(ctx); err != nil {
logger.Warn("failed to refresh database state after switchover; Spock replication may recover slowly", "error", err)
}
}

completeUpdate := &activities.UpdateTaskInput{
Scope: task.ScopeDatabase,
EntityID: in.DatabaseID,
Expand Down