diff --git a/README.md b/README.md index 6c94b5e..9ab1cd1 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,7 @@ The Distributed Secrets Vault is a leaderless cluster system where secrets are s - [Distributed System Challenges](docs/challenges.md) - [Workflow Diagrams](docs/crud) - [Software Architecture Diagrams](docs/architecture.md) +- [Peer-Only Recovery](docs/peer-recovery.md) - [Tools & Technologies](docs/technologies.md) ## Installation and Usage Guide diff --git a/docs/architecture.md b/docs/architecture.md index 68a5a3b..0b1dffe 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -210,4 +210,6 @@ graph LR - If failure occurs in the **ordering phase**, no shard writes are committed and the request fails. - If failure occurs in the **writing phase**, partially written shards are rolled back. -- Recovered nodes rejoin automatically via ScaleCube and synchronize state from Kafka and peers. +- Recovered nodes rejoin automatically via ScaleCube, query peers for current shard state, and pull any missing shards locally. +- On the first cluster startup, there are no peers yet, so recovery exits cleanly as a no-op. +- After catch-up, the node can be queried for shards and can participate in Shamir reconstruction immediately. diff --git a/docs/challenges.md b/docs/challenges.md index 64df778..344ab28 100644 --- a/docs/challenges.md +++ b/docs/challenges.md @@ -36,4 +36,4 @@ Phase separation makes recovery behavior explicit and prevents ambiguous outcomes for in-flight writes. 12. **Recovery and Availability** - Nodes recover from durable storage, and rejoin automatically when healthy. Quorum rules determine whether reads/writes continue or fail fast during degraded periods. + Nodes recover from their peers by synchronizing the current shard inventory and fetching missing shards locally. On the first cluster startup, recovery no-ops because there are no peers yet. Quorum rules determine whether reads/writes continue or fail fast during degraded periods. diff --git a/docs/peer-recovery.md b/docs/peer-recovery.md new file mode 100644 index 0000000..cdcc363 --- /dev/null +++ b/docs/peer-recovery.md @@ -0,0 +1,64 @@ +# Peer Recovery + +When a node joins a running cluster, it catches up from its peers by: + +1. Discovering peer nodes via ScaleCube. +2. Asking peers for the secret keys and versions they currently store. +3. Comparing that cluster inventory with its own local Redis data. +4. Requesting any missing shards from peers. +5. Storing recovered shards locally so it can participate in Shamir reconstruction. + +--- + +## First cluster startup + +When the very first node starts a brand-new cluster, there are no peers yet. In that case: + +- peer discovery returns empty +- recovery exits cleanly +- nothing is fetched or stored +- the node continues normal startup + +So first-time boot is safe and effectively a no-op. + +--- + +## New node joining an existing cluster + +When a new node joins a cluster that already has data: + +- the node discovers existing peers +- peers export their current state +- the joining node downloads missing shards +- the joining node stores those shards in Redis +- the node becomes queryable for shard reads and Shamir reconstruction + +That means newly added nodes are automatically brought up to date. + +--- + +## Internal endpoints + +Each node exposes these internal peer-only endpoints: + +- `GET /internal/recovery/state` +- `GET /internal/recovery/shard/{user}/{key}/{version}` +- `GET /internal/recovery/health` + +These are used only by other cluster nodes. + +--- + +## Recovery config usage + +Recovery is always on. There is no on/off switch for peer recovery. + +The recovery config only tunes how startup behaves: + +- `delay-seconds` — how long to wait after startup before checking peers +- `peer-connectivity-timeout-seconds` — how long to wait for peers to appear +- `min-required-peers` — how many peers must be visible before catch-up starts + +For a brand-new cluster, these settings do not cause problems because the node simply finds no peers and exits recovery cleanly. When a new node joins an existing cluster, the same settings let it wait briefly, discover peers, and pull the missing shards it needs. + +--- diff --git a/src/main/java/edu/yu/capstone/DistributedSecretsVault/config/RecoveryConfig.java b/src/main/java/edu/yu/capstone/DistributedSecretsVault/config/RecoveryConfig.java new file mode 100644 index 0000000..6bc7925 --- /dev/null +++ b/src/main/java/edu/yu/capstone/DistributedSecretsVault/config/RecoveryConfig.java @@ -0,0 +1,36 @@ +package edu.yu.capstone.DistributedSecretsVault.config; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.validation.annotation.Validated; +import lombok.Data; + +/** + * Configuration for peer-only cluster recovery. + *

+ * Recovery is always on. These settings only tune startup timing and logging. + */ +@Validated +@ConfigurationProperties(prefix = "cluster.peer-recovery") +@Data +public class RecoveryConfig { + + /** + * Delay in seconds before starting recovery after node startup. + * Gives Redis and ScaleCube time to initialize. + * Default: 2 seconds + */ + private int delaySeconds = 2; + + /** + * Maximum time in seconds to wait for peer connectivity during recovery. + * Default: 15 seconds + */ + private int peerConnectivityTimeoutSeconds = 15; + + /** + * Minimum number of peers required for recovery to proceed. + * Default: 1 (at least one other node in the cluster) + */ + private int minRequiredPeers = 1; + +} diff --git a/src/main/java/edu/yu/capstone/DistributedSecretsVault/controller/PeerRecoveryController.java b/src/main/java/edu/yu/capstone/DistributedSecretsVault/controller/PeerRecoveryController.java new file mode 100644 index 0000000..4439c6f --- /dev/null +++ b/src/main/java/edu/yu/capstone/DistributedSecretsVault/controller/PeerRecoveryController.java @@ -0,0 +1,97 @@ +package edu.yu.capstone.DistributedSecretsVault.controller; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import edu.yu.capstone.DistributedSecretsVault.domain.model.SecretKey; +import edu.yu.capstone.DistributedSecretsVault.domain.model.SecretPart; +import edu.yu.capstone.DistributedSecretsVault.dto.recovery.NodeStateResponse; +import edu.yu.capstone.DistributedSecretsVault.dto.recovery.RecoveryHealthResponse; +import edu.yu.capstone.DistributedSecretsVault.dto.recovery.StateSummary; +import edu.yu.capstone.DistributedSecretsVault.repository.SecretPartRepository; + +/** + * Internal peer-recovery RPC endpoints. + * These endpoints are always available to other nodes in the cluster. + */ +@RestController +@RequestMapping("/internal/recovery") +public class PeerRecoveryController { + private static final Logger log = LoggerFactory.getLogger(PeerRecoveryController.class); + + private final SecretPartRepository secretPartRepository; + private final StringRedisTemplate redisTemplate; + + public PeerRecoveryController(SecretPartRepository secretPartRepository, StringRedisTemplate redisTemplate) { + this.secretPartRepository = secretPartRepository; + this.redisTemplate = redisTemplate; + } + + @GetMapping("/state") + public ResponseEntity exportNodeState() { + try { + Set state = scanLocalState(); + NodeStateResponse response = new NodeStateResponse(); + response.setNodeState(new ArrayList<>(state)); + return ResponseEntity.ok(response); + } catch (Exception ex) { + log.error("Error exporting peer recovery state", ex); + return ResponseEntity.status(500).build(); + } + } + + @GetMapping("/shard/{user}/{key}/{version}") + public ResponseEntity getShard( + @PathVariable String user, + @PathVariable String key, + @PathVariable long version) { + try { + Optional part = secretPartRepository.findPart(new SecretKey(user, key), version); + return part.map(ResponseEntity::ok).orElseGet(() -> ResponseEntity.notFound().build()); + } catch (Exception ex) { + log.error("Error retrieving recovery shard for {}:{}:{}", user, key, version, ex); + return ResponseEntity.status(500).build(); + } + } + + @GetMapping("/health") + public ResponseEntity recoveryHealth() { + return ResponseEntity.ok(new RecoveryHealthResponse(true, "Ready to serve peer recovery requests")); + } + + private Set scanLocalState() { + Set state = new HashSet<>(); + Set redisKeys = redisTemplate.keys("*:*"); + if (redisKeys == null || redisKeys.isEmpty()) { + return state; + } + + for (String redisKey : redisKeys) { + if (redisKey == null) { + continue; + } + String[] parts = redisKey.split(":", 2); + if (parts.length != 2) { + continue; + } + + List versions = secretPartRepository.listVersions(new SecretKey(parts[0], parts[1])); + for (Long version : versions) { + state.add(new StateSummary(parts[0], parts[1], version, "local")); + } + } + return state; + } +} diff --git a/src/main/java/edu/yu/capstone/DistributedSecretsVault/dto/recovery/NodeStateResponse.java b/src/main/java/edu/yu/capstone/DistributedSecretsVault/dto/recovery/NodeStateResponse.java new file mode 100644 index 0000000..19c643a --- /dev/null +++ b/src/main/java/edu/yu/capstone/DistributedSecretsVault/dto/recovery/NodeStateResponse.java @@ -0,0 +1,18 @@ +package edu.yu.capstone.DistributedSecretsVault.dto.recovery; + +import java.util.ArrayList; +import java.util.List; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Recovery response listing all secret state known by a node. + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class NodeStateResponse { + private List nodeState = new ArrayList<>(); +} diff --git a/src/main/java/edu/yu/capstone/DistributedSecretsVault/dto/recovery/RecoveryHealthResponse.java b/src/main/java/edu/yu/capstone/DistributedSecretsVault/dto/recovery/RecoveryHealthResponse.java new file mode 100644 index 0000000..1b624fe --- /dev/null +++ b/src/main/java/edu/yu/capstone/DistributedSecretsVault/dto/recovery/RecoveryHealthResponse.java @@ -0,0 +1,16 @@ +package edu.yu.capstone.DistributedSecretsVault.dto.recovery; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Recovery endpoint health response. + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class RecoveryHealthResponse { + private boolean ready; + private String message; +} diff --git a/src/main/java/edu/yu/capstone/DistributedSecretsVault/dto/recovery/StateSummary.java b/src/main/java/edu/yu/capstone/DistributedSecretsVault/dto/recovery/StateSummary.java new file mode 100644 index 0000000..5ff6fa1 --- /dev/null +++ b/src/main/java/edu/yu/capstone/DistributedSecretsVault/dto/recovery/StateSummary.java @@ -0,0 +1,11 @@ +package edu.yu.capstone.DistributedSecretsVault.dto.recovery; + +/** + * Recovery inventory entry for a secret shard. + * Used to advertise which (user:key:version) values exist in the cluster. + */ +public record StateSummary(String ownerId, String keyName, long version, String sourceNodeId) { + public String toRedisKey() { + return ownerId + ":" + keyName + ":" + version; + } +} diff --git a/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/NodeClient.java b/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/NodeClient.java index 36b0c6f..d09f158 100644 --- a/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/NodeClient.java +++ b/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/NodeClient.java @@ -19,10 +19,11 @@ import edu.yu.capstone.DistributedSecretsVault.dto.internal.DeletePrepareRequest; import edu.yu.capstone.DistributedSecretsVault.dto.internal.PostPrepareRequest; import edu.yu.capstone.DistributedSecretsVault.dto.internal.PutPrepareRequest; +import edu.yu.capstone.DistributedSecretsVault.dto.recovery.NodeStateResponse; import io.scalecube.services.Microservices; /** - * Outbound HTTP client for inter-node communication within the cluster. + * Outbound HTTP client for internode communication within the cluster. *

* Peer discovery is handled via ScaleCube's * {@link Microservices#serviceEndpoints()}, @@ -159,7 +160,7 @@ public SecretPartsResponse fetchAllSecretParts(String peerUrl, SecretKey key) { Map parts = restClient.get() .uri(peerUrl + "/internal/{id}/all?user={user}", key.getName(), key.getOwnerId()) .retrieve() - .body(new ParameterizedTypeReference>() { + .body(new ParameterizedTypeReference<>() { }); log.debug("Secret version parts received from {}", peerUrl); return SecretPartsResponse.found(peerUrl, parts == null ? Map.of() : parts); @@ -199,7 +200,7 @@ public List resolvePeerUrls() { Set peerUrls = new LinkedHashSet<>(); ms.serviceEndpoints().forEach(endpoint -> { - // endpoint.address() returns "host:port" (ScaleCube RSocket port) + // endpoint.address() returns "host:port" (ScaleCube Rsocket port) // We extract just the host and combine with the HTTP port String address = endpoint.address().toString(); String host = extractHost(address); @@ -286,4 +287,63 @@ public static SecretPartsResponse failed(String peerUrl, String errorMessage) { return new SecretPartsResponse(peerUrl, null, null, errorMessage); } } + + /** + * Retrieve complete state from a peer node for recovery. + * Returns all (user:key:version) entries the peer knows about. + * + * @param peerUrl base URL of the peer + * @return NodeStateResponse with list of all secrets on that peer + */ + public NodeStateResponse getNodeState(String peerUrl) { + try { + NodeStateResponse response = restClient.get() + .uri(peerUrl + "/internal/recovery/state") + .retrieve() + .body(NodeStateResponse.class); + + log.debug("Retrieved state from peer {}: {} entries", peerUrl, + response != null && response.getNodeState() != null ? response.getNodeState().size() : 0); + return response != null ? response : new NodeStateResponse(); + } catch (RestClientResponseException ex) { + log.warn("Failed to get state from {} with HTTP {}", peerUrl, ex.getStatusCode().value()); + return new NodeStateResponse(); + } catch (Exception ex) { + log.warn("Failed to get state from {}: {}", peerUrl, ex.getMessage()); + return new NodeStateResponse(); + } + } + + /** + * Request a specific shard from a peer for recovery. + * + * @param peerUrl base URL of the peer + * @param user user/owner ID + * @param key secret key name + * @param version secret version + * @return SecretPart if found, null otherwise + */ + public SecretPart requestShard(String peerUrl, String user, String key, long version) { + try { + String uri = peerUrl + "/internal/recovery/shard/{user}/{key}/{version}"; + SecretPart part = restClient.get() + .uri(uri, user, key, version) + .retrieve() + .body(SecretPart.class); + + log.debug("Retrieved shard {}:{}:{} from peer {}", user, key, version, peerUrl); + return part; + } catch (RestClientResponseException ex) { + if (ex.getStatusCode().is4xxClientError()) { + log.debug("Shard not found on {} ({}:{}:{})", peerUrl, user, key, version); + } else { + log.warn("Failed to retrieve shard from {} with HTTP {}", peerUrl, ex.getStatusCode().value()); + } + return null; + } catch (Exception ex) { + log.warn("Failed to retrieve shard from {}: {}", peerUrl, ex.getMessage()); + return null; + } + } + } diff --git a/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/recovery/PeerBasedRecoveryService.java b/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/recovery/PeerBasedRecoveryService.java new file mode 100644 index 0000000..3ec5806 --- /dev/null +++ b/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/recovery/PeerBasedRecoveryService.java @@ -0,0 +1,241 @@ +package edu.yu.capstone.DistributedSecretsVault.service.recovery; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import lombok.Getter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Service; + +import edu.yu.capstone.DistributedSecretsVault.config.RecoveryConfig; +import edu.yu.capstone.DistributedSecretsVault.domain.model.SecretPart; +import edu.yu.capstone.DistributedSecretsVault.dto.recovery.NodeStateResponse; +import edu.yu.capstone.DistributedSecretsVault.dto.recovery.StateSummary; +import edu.yu.capstone.DistributedSecretsVault.repository.SecretPartRepository; +import edu.yu.capstone.DistributedSecretsVault.service.internal.NodeClient; +import jakarta.annotation.PostConstruct; +import io.scalecube.services.Microservices; + +/** + * Peer-only recovery: every node catches up from peers when it joins a running cluster. + * If there are no peers yet, startup is treated as the first cluster boot and recovery no-ops. + */ +@Service +public class PeerBasedRecoveryService { + private static final Logger log = LoggerFactory.getLogger(PeerBasedRecoveryService.class); + private static final int SHARD_REQUEST_BATCH_SIZE = 50; + + private final Microservices microservices; + private final SecretPartRepository secretPartRepository; + private final NodeClient nodeClient; + private final StringRedisTemplate redisTemplate; + private final RecoveryConfig recoveryConfig; + + @Getter + private volatile RecoveryState recoveryState = RecoveryState.READY; + + public PeerBasedRecoveryService(Microservices microservices, + SecretPartRepository secretPartRepository, + NodeClient nodeClient, + StringRedisTemplate redisTemplate, + RecoveryConfig recoveryConfig) { + this.microservices = microservices; + this.secretPartRepository = secretPartRepository; + this.nodeClient = nodeClient; + this.redisTemplate = redisTemplate; + this.recoveryConfig = recoveryConfig; + } + + @PostConstruct + public void onNodeStartup() { + if (microservices == null || nodeClient == null || redisTemplate == null) { + log.debug("Recovery prerequisites are unavailable; skipping peer catch-up"); + return; + } + + try { + if (recoveryConfig.getDelaySeconds() > 0) { + Thread.sleep(recoveryConfig.getDelaySeconds() * 1000L); + } + performRecovery(); + } catch (InterruptedException interruptedException) { + Thread.currentThread().interrupt(); + recoveryState = RecoveryState.FAILED; + log.error("Peer recovery interrupted", interruptedException); + } catch (Exception ex) { + recoveryState = RecoveryState.FAILED; + log.error("Peer recovery failed", ex); + } + } + + private void performRecovery() { + recoveryState = RecoveryState.IN_PROGRESS; + + List peerUrls = waitForPeerUrls(); + if (peerUrls.isEmpty()) { + log.info("No peers discovered during startup; treating this as the first cluster boot and skipping catch-up"); + recoveryState = RecoveryState.COMPLETE; + return; + } + + Map> peerState = discoverPeerState(peerUrls); + if (peerState.isEmpty()) { + log.info("Peers were discovered but none returned recoverable state; nothing to catch up"); + recoveryState = RecoveryState.COMPLETE; + return; + } + + Set missingShards = determineMissingShards(peerState); + if (missingShards.isEmpty()) { + log.info("Local node is already caught up with the cluster"); + recoveryState = RecoveryState.COMPLETE; + return; + } + + int recovered = requestAndStoreMissingShards(missingShards, peerUrls); + if (recovered == missingShards.size()) { + recoveryState = RecoveryState.COMPLETE; + log.info("Peer recovery complete: {} shard(s) synchronized", recovered); + } else { + recoveryState = RecoveryState.PARTIAL; + log.warn("Peer recovery completed partially: {}/{} shard(s) synchronized", recovered, + missingShards.size()); + } + } + + private List waitForPeerUrls() { + long deadline = System.currentTimeMillis() + (recoveryConfig.getPeerConnectivityTimeoutSeconds() * 1000L); + long sleepMillis = 500L; + int minRequiredPeers = Math.max(1, recoveryConfig.getMinRequiredPeers()); + + while (System.currentTimeMillis() < deadline) { + List peerUrls = nodeClient.resolvePeerUrls(); + if (peerUrls.size() >= minRequiredPeers) { + log.info("Discovered {} peer(s) for recovery", peerUrls.size()); + return peerUrls; + } + + try { + Thread.sleep(sleepMillis); + } catch (InterruptedException interruptedException) { + Thread.currentThread().interrupt(); + return List.of(); + } + } + + return List.of(); + } + + private Map> discoverPeerState(List peerUrls) { + Map> peerState = new HashMap<>(); + + for (String peerUrl : peerUrls) { + try { + NodeStateResponse response = nodeClient.getNodeState(peerUrl); + Set summaries = response.getNodeState() == null + ? Set.of() + : response.getNodeState().stream() + .map(summary -> new StateSummary(summary.ownerId(), summary.keyName(), summary.version(), + peerUrl)) + .collect(Collectors.toSet()); + peerState.put(peerUrl, summaries); + log.debug("Peer {} exported {} secret entry/entries", peerUrl, summaries.size()); + } catch (Exception ex) { + log.warn("Failed to query recovery state from peer {}: {}", peerUrl, ex.getMessage()); + } + } + + return peerState; + } + + private Set determineMissingShards(Map> peerState) { + Set allRemoteShards = peerState.values().stream().flatMap(Set::stream).collect(Collectors.toSet()); + Set localKeys = getLocalKeys(); + + return allRemoteShards.stream() + .filter(summary -> !localKeys.contains(summary.toRedisKey())) + .collect(Collectors.toSet()); + } + + private Set getLocalKeys() { + Set redisKeys = redisTemplate.keys("*:*" ); + if (redisKeys == null || redisKeys.isEmpty()) { + return Set.of(); + } + Set normalized = new HashSet<>(); + for (String rawKey : redisKeys) { + if (rawKey != null && rawKey.indexOf(':') > 0) { + String[] parts = rawKey.split(":", 2); + if (parts.length != 2) { + continue; + } + List versions = secretPartRepository.listVersions(new edu.yu.capstone.DistributedSecretsVault.domain.model.SecretKey(parts[0], parts[1])); + for (Long version : versions) { + normalized.add(parts[0] + ":" + parts[1] + ":" + version); + } + } + } + return normalized; + } + + private int requestAndStoreMissingShards(Set missingShards, List peerUrls) { + int recovered = 0; + List shards = new ArrayList<>(missingShards); + + for (int i = 0; i < shards.size(); i += SHARD_REQUEST_BATCH_SIZE) { + int end = Math.min(i + SHARD_REQUEST_BATCH_SIZE, shards.size()); + List batch = shards.subList(i, end); + + for (StateSummary summary : batch) { + Optional shard = requestShardFromPeers(summary, peerUrls); + if (shard.isPresent()) { + secretPartRepository.savePart(shard.get()); + recovered++; + log.debug("Recovered shard {}", summary.toRedisKey()); + } else { + log.warn("Could not recover shard {} from any peer", summary.toRedisKey()); + } + } + } + + return recovered; + } + + private Optional requestShardFromPeers(StateSummary summary, List peerUrls) { + for (String peerUrl : peerUrls) { + try { + SecretPart shard = nodeClient.requestShard(peerUrl, summary.ownerId(), summary.keyName(), summary.version()); + if (shard != null) { + return Optional.of(shard); + } + } catch (Exception ex) { + log.debug("Peer {} could not provide shard {}: {}", peerUrl, summary.toRedisKey(), ex.getMessage()); + } + } + return Optional.empty(); + } + + @Getter + public enum RecoveryState { + READY("Ready to recover"), + IN_PROGRESS("Recovery in progress"), + COMPLETE("Recovery complete"), + PARTIAL("Recovery partially complete"), + FAILED("Recovery failed"); + + private final String description; + + RecoveryState(String description) { + this.description = description; + } + + } +} diff --git a/src/test/java/edu/yu/capstone/DistributedSecretsVault/controller/PeerRecoveryControllerTest.java b/src/test/java/edu/yu/capstone/DistributedSecretsVault/controller/PeerRecoveryControllerTest.java new file mode 100644 index 0000000..80d2bec --- /dev/null +++ b/src/test/java/edu/yu/capstone/DistributedSecretsVault/controller/PeerRecoveryControllerTest.java @@ -0,0 +1,64 @@ +package edu.yu.capstone.DistributedSecretsVault.controller; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.Set; + +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; + +import edu.yu.capstone.DistributedSecretsVault.domain.model.SecretKey; +import edu.yu.capstone.DistributedSecretsVault.domain.model.SecretPart; +import edu.yu.capstone.DistributedSecretsVault.dto.recovery.NodeStateResponse; +import edu.yu.capstone.DistributedSecretsVault.dto.recovery.StateSummary; +import edu.yu.capstone.DistributedSecretsVault.repository.SecretPartRepository; + +@ExtendWith(MockitoExtension.class) +@Tag("unit") +class PeerRecoveryControllerTest { + + @Mock + private SecretPartRepository secretPartRepository; + + @Mock + private StringRedisTemplate redisTemplate; + + @Test + void exportNodeStateReturnsAllKnownVersions() { + when(redisTemplate.keys("*:*" )).thenReturn(Set.of("alice:db-password")); + when(secretPartRepository.listVersions(new SecretKey("alice", "db-password"))).thenReturn(List.of(1L, 2L)); + + PeerRecoveryController controller = new PeerRecoveryController(secretPartRepository, redisTemplate); + ResponseEntity response = controller.exportNodeState(); + + assertEquals(HttpStatus.OK, response.getStatusCode()); + assertNotNull(response.getBody()); + assertEquals(2, response.getBody().getNodeState().size()); + assertTrue(response.getBody().getNodeState().contains(new StateSummary("alice", "db-password", 1L, "local"))); + assertTrue(response.getBody().getNodeState().contains(new StateSummary("alice", "db-password", 2L, "local"))); + verify(secretPartRepository).listVersions(new SecretKey("alice", "db-password")); + } + + @Test + void getShardReturnsStoredShard() { + SecretPart part = new SecretPart(new SecretKey("alice", "db-password"), 1L, 1, new byte[] { 9 }); + when(secretPartRepository.findPart(new SecretKey("alice", "db-password"), 1L)).thenReturn(java.util.Optional.of(part)); + + PeerRecoveryController controller = new PeerRecoveryController(secretPartRepository, redisTemplate); + ResponseEntity response = controller.getShard("alice", "db-password", 1L); + + assertEquals(HttpStatus.OK, response.getStatusCode()); + assertEquals(part, response.getBody()); + } +} diff --git a/src/test/java/edu/yu/capstone/DistributedSecretsVault/service/recovery/PeerBasedRecoveryServiceTest.java b/src/test/java/edu/yu/capstone/DistributedSecretsVault/service/recovery/PeerBasedRecoveryServiceTest.java new file mode 100644 index 0000000..653cc2a --- /dev/null +++ b/src/test/java/edu/yu/capstone/DistributedSecretsVault/service/recovery/PeerBasedRecoveryServiceTest.java @@ -0,0 +1,91 @@ +package edu.yu.capstone.DistributedSecretsVault.service.recovery; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.ArgumentMatchers.anyLong; + +import java.util.List; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import io.scalecube.services.Microservices; +import edu.yu.capstone.DistributedSecretsVault.config.RecoveryConfig; +import edu.yu.capstone.DistributedSecretsVault.domain.model.SecretKey; +import edu.yu.capstone.DistributedSecretsVault.domain.model.SecretPart; +import edu.yu.capstone.DistributedSecretsVault.dto.recovery.NodeStateResponse; +import edu.yu.capstone.DistributedSecretsVault.dto.recovery.StateSummary; +import edu.yu.capstone.DistributedSecretsVault.repository.SecretPartRepository; +import edu.yu.capstone.DistributedSecretsVault.service.internal.NodeClient; +import org.springframework.data.redis.core.StringRedisTemplate; + +@ExtendWith(MockitoExtension.class) +@Tag("unit") +class PeerBasedRecoveryServiceTest { + + @Mock + private Microservices microservices; + + @Mock + private SecretPartRepository secretPartRepository; + + @Mock + private NodeClient nodeClient; + + @Mock + private StringRedisTemplate redisTemplate; + + private PeerBasedRecoveryService recoveryService; + + @BeforeEach + void setUp() { + RecoveryConfig recoveryConfig = new RecoveryConfig(); + recoveryConfig.setDelaySeconds(0); + recoveryConfig.setPeerConnectivityTimeoutSeconds(1); + recoveryConfig.setMinRequiredPeers(1); + + recoveryService = new PeerBasedRecoveryService( + microservices, + secretPartRepository, + nodeClient, + redisTemplate, + recoveryConfig); + } + + @Test + void firstClusterStartupNoPeersShouldNoOp() { + when(nodeClient.resolvePeerUrls()).thenReturn(List.of()); + + recoveryService.onNodeStartup(); + + assertEquals(PeerBasedRecoveryService.RecoveryState.COMPLETE, recoveryService.getRecoveryState()); + verify(nodeClient, never()).getNodeState(any()); + verify(nodeClient, never()).requestShard(any(), any(), any(), anyLong()); + verify(secretPartRepository, never()).savePart(any()); + } + + @Test + void joiningNodeShouldCatchUpFromPeersAndStoreMissingShard() { + when(nodeClient.resolvePeerUrls()).thenReturn(List.of("http://peer1:8080")); + when(nodeClient.getNodeState("http://peer1:8080")) + .thenReturn(new NodeStateResponse(List.of( + new StateSummary("alice", "db-password", 1L, "http://peer1:8080")))); + + SecretPart shard = new SecretPart(new SecretKey("alice", "db-password"), 1L, 1, new byte[] { 1, 2, 3 }); + when(nodeClient.requestShard("http://peer1:8080", "alice", "db-password", 1L)).thenReturn(shard); + + recoveryService.onNodeStartup(); + + assertSame(PeerBasedRecoveryService.RecoveryState.COMPLETE, recoveryService.getRecoveryState()); + verify(secretPartRepository).savePart(shard); + verify(nodeClient).getNodeState("http://peer1:8080"); + verify(nodeClient).requestShard("http://peer1:8080", "alice", "db-password", 1L); + } +}