Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ The Distributed Secrets Vault is a leaderless cluster system where secrets are s
- [Distributed System Challenges](docs/challenges.md)
- [Workflow Diagrams](docs/crud)
- [Software Architecture Diagrams](docs/architecture.md)
- [Peer-Only Recovery](docs/peer-recovery.md)
- [Tools & Technologies](docs/technologies.md)

## Installation and Usage Guide
Expand Down
4 changes: 3 additions & 1 deletion docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,4 +210,6 @@ graph LR

- If failure occurs in the **ordering phase**, no shard writes are committed and the request fails.
- If failure occurs in the **writing phase**, partially written shards are rolled back.
- Recovered nodes rejoin automatically via ScaleCube and synchronize state from Kafka and peers.
- Recovered nodes rejoin automatically via ScaleCube, query peers for current shard state, and pull any missing shards locally.
- On the first cluster startup, there are no peers yet, so recovery exits cleanly as a no-op.
- After catch-up, the node can be queried for shards and can participate in Shamir reconstruction immediately.
2 changes: 1 addition & 1 deletion docs/challenges.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,4 @@
Phase separation makes recovery behavior explicit and prevents ambiguous outcomes for in-flight writes.

12. **Recovery and Availability**
Nodes recover from durable storage, and rejoin automatically when healthy. Quorum rules determine whether reads/writes continue or fail fast during degraded periods.
Nodes recover from their peers by synchronizing the current shard inventory and fetching missing shards locally. On the first cluster startup, recovery no-ops because there are no peers yet. Quorum rules determine whether reads/writes continue or fail fast during degraded periods.
64 changes: 64 additions & 0 deletions docs/peer-recovery.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Peer Recovery

When a node joins a running cluster, it catches up from its peers by:

1. Discovering peer nodes via ScaleCube.
2. Asking peers for the secret keys and versions they currently store.
3. Comparing that cluster inventory with its own local Redis data.
4. Requesting any missing shards from peers.
5. Storing recovered shards locally so it can participate in Shamir reconstruction.

---

## First cluster startup

When the very first node starts a brand-new cluster, there are no peers yet. In that case:

- peer discovery returns empty
- recovery exits cleanly
- nothing is fetched or stored
- the node continues normal startup

So first-time boot is safe and effectively a no-op.

---

## New node joining an existing cluster

When a new node joins a cluster that already has data:

- the node discovers existing peers
- peers export their current state
- the joining node downloads missing shards
- the joining node stores those shards in Redis
- the node becomes queryable for shard reads and Shamir reconstruction

That means newly added nodes are automatically brought up to date.

---

## Internal endpoints

Each node exposes these internal peer-only endpoints:

- `GET /internal/recovery/state`
- `GET /internal/recovery/shard/{user}/{key}/{version}`
- `GET /internal/recovery/health`

These are used only by other cluster nodes.

---

## Recovery config usage

Recovery is always on. There is no on/off switch for peer recovery.

The recovery config only tunes how startup behaves:

- `delay-seconds` — how long to wait after startup before checking peers
- `peer-connectivity-timeout-seconds` — how long to wait for peers to appear
- `min-required-peers` — how many peers must be visible before catch-up starts

For a brand-new cluster, these settings do not cause problems because the node simply finds no peers and exits recovery cleanly. When a new node joins an existing cluster, the same settings let it wait briefly, discover peers, and pull the missing shards it needs.

---
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package edu.yu.capstone.DistributedSecretsVault.config;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.validation.annotation.Validated;
import lombok.Data;

/**
* Configuration for peer-only cluster recovery.
* <p>
* Recovery is always on. These settings only tune startup timing and logging.
*/
@Validated
@ConfigurationProperties(prefix = "cluster.peer-recovery")
@Data
public class RecoveryConfig {

/**
* Delay in seconds before starting recovery after node startup.
* Gives Redis and ScaleCube time to initialize.
* Default: 2 seconds
*/
private int delaySeconds = 2;

/**
* Maximum time in seconds to wait for peer connectivity during recovery.
* Default: 15 seconds
*/
private int peerConnectivityTimeoutSeconds = 15;

/**
* Minimum number of peers required for recovery to proceed.
* Default: 1 (at least one other node in the cluster)
*/
private int minRequiredPeers = 1;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package edu.yu.capstone.DistributedSecretsVault.controller;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import edu.yu.capstone.DistributedSecretsVault.domain.model.SecretKey;
import edu.yu.capstone.DistributedSecretsVault.domain.model.SecretPart;
import edu.yu.capstone.DistributedSecretsVault.dto.recovery.NodeStateResponse;
import edu.yu.capstone.DistributedSecretsVault.dto.recovery.RecoveryHealthResponse;
import edu.yu.capstone.DistributedSecretsVault.dto.recovery.StateSummary;
import edu.yu.capstone.DistributedSecretsVault.repository.SecretPartRepository;

/**
* Internal peer-recovery RPC endpoints.
* These endpoints are always available to other nodes in the cluster.
*/
@RestController
@RequestMapping("/internal/recovery")
public class PeerRecoveryController {
private static final Logger log = LoggerFactory.getLogger(PeerRecoveryController.class);

private final SecretPartRepository secretPartRepository;
private final StringRedisTemplate redisTemplate;

public PeerRecoveryController(SecretPartRepository secretPartRepository, StringRedisTemplate redisTemplate) {
this.secretPartRepository = secretPartRepository;
this.redisTemplate = redisTemplate;
}

@GetMapping("/state")
public ResponseEntity<NodeStateResponse> exportNodeState() {
try {
Set<StateSummary> state = scanLocalState();
NodeStateResponse response = new NodeStateResponse();
response.setNodeState(new ArrayList<>(state));
return ResponseEntity.ok(response);
} catch (Exception ex) {
log.error("Error exporting peer recovery state", ex);
return ResponseEntity.status(500).build();
}
}

@GetMapping("/shard/{user}/{key}/{version}")
public ResponseEntity<SecretPart> getShard(
@PathVariable String user,
@PathVariable String key,
@PathVariable long version) {
try {
Optional<SecretPart> part = secretPartRepository.findPart(new SecretKey(user, key), version);
return part.map(ResponseEntity::ok).orElseGet(() -> ResponseEntity.notFound().build());
} catch (Exception ex) {
log.error("Error retrieving recovery shard for {}:{}:{}", user, key, version, ex);
return ResponseEntity.status(500).build();
}
}

@GetMapping("/health")
public ResponseEntity<RecoveryHealthResponse> recoveryHealth() {
return ResponseEntity.ok(new RecoveryHealthResponse(true, "Ready to serve peer recovery requests"));
}

private Set<StateSummary> scanLocalState() {
Set<StateSummary> state = new HashSet<>();
Set<String> redisKeys = redisTemplate.keys("*:*");
if (redisKeys == null || redisKeys.isEmpty()) {
return state;
}

for (String redisKey : redisKeys) {
if (redisKey == null) {
continue;
}
String[] parts = redisKey.split(":", 2);
if (parts.length != 2) {
continue;
}

List<Long> versions = secretPartRepository.listVersions(new SecretKey(parts[0], parts[1]));
for (Long version : versions) {
state.add(new StateSummary(parts[0], parts[1], version, "local"));
}
}
return state;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package edu.yu.capstone.DistributedSecretsVault.dto.recovery;

import java.util.ArrayList;
import java.util.List;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
* Recovery response listing all secret state known by a node.
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class NodeStateResponse {
private List<StateSummary> nodeState = new ArrayList<>();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package edu.yu.capstone.DistributedSecretsVault.dto.recovery;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
* Recovery endpoint health response.
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class RecoveryHealthResponse {
private boolean ready;
private String message;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package edu.yu.capstone.DistributedSecretsVault.dto.recovery;

/**
* Recovery inventory entry for a secret shard.
* Used to advertise which (user:key:version) values exist in the cluster.
*/
public record StateSummary(String ownerId, String keyName, long version, String sourceNodeId) {
public String toRedisKey() {
return ownerId + ":" + keyName + ":" + version;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@
import edu.yu.capstone.DistributedSecretsVault.dto.internal.DeletePrepareRequest;
import edu.yu.capstone.DistributedSecretsVault.dto.internal.PostPrepareRequest;
import edu.yu.capstone.DistributedSecretsVault.dto.internal.PutPrepareRequest;
import edu.yu.capstone.DistributedSecretsVault.dto.recovery.NodeStateResponse;
import io.scalecube.services.Microservices;

/**
* Outbound HTTP client for inter-node communication within the cluster.
* Outbound HTTP client for internode communication within the cluster.
* <p>
* Peer discovery is handled via ScaleCube's
* {@link Microservices#serviceEndpoints()},
Expand Down Expand Up @@ -159,7 +160,7 @@ public SecretPartsResponse fetchAllSecretParts(String peerUrl, SecretKey key) {
Map<Long, SecretPart> parts = restClient.get()
.uri(peerUrl + "/internal/{id}/all?user={user}", key.getName(), key.getOwnerId())
.retrieve()
.body(new ParameterizedTypeReference<Map<Long, SecretPart>>() {
.body(new ParameterizedTypeReference<>() {
});
log.debug("Secret version parts received from {}", peerUrl);
return SecretPartsResponse.found(peerUrl, parts == null ? Map.of() : parts);
Expand Down Expand Up @@ -199,7 +200,7 @@ public List<String> resolvePeerUrls() {
Set<String> peerUrls = new LinkedHashSet<>();

ms.serviceEndpoints().forEach(endpoint -> {
// endpoint.address() returns "host:port" (ScaleCube RSocket port)
// endpoint.address() returns "host:port" (ScaleCube Rsocket port)
// We extract just the host and combine with the HTTP port
String address = endpoint.address().toString();
String host = extractHost(address);
Expand Down Expand Up @@ -286,4 +287,63 @@ public static SecretPartsResponse failed(String peerUrl, String errorMessage) {
return new SecretPartsResponse(peerUrl, null, null, errorMessage);
}
}

/**
* Retrieve complete state from a peer node for recovery.
* Returns all (user:key:version) entries the peer knows about.
*
* @param peerUrl base URL of the peer
* @return NodeStateResponse with list of all secrets on that peer
*/
public NodeStateResponse getNodeState(String peerUrl) {
try {
NodeStateResponse response = restClient.get()
.uri(peerUrl + "/internal/recovery/state")
.retrieve()
.body(NodeStateResponse.class);

log.debug("Retrieved state from peer {}: {} entries", peerUrl,
response != null && response.getNodeState() != null ? response.getNodeState().size() : 0);
return response != null ? response : new NodeStateResponse();
} catch (RestClientResponseException ex) {
log.warn("Failed to get state from {} with HTTP {}", peerUrl, ex.getStatusCode().value());
return new NodeStateResponse();
} catch (Exception ex) {
log.warn("Failed to get state from {}: {}", peerUrl, ex.getMessage());
return new NodeStateResponse();
}
}

/**
* Request a specific shard from a peer for recovery.
*
* @param peerUrl base URL of the peer
* @param user user/owner ID
* @param key secret key name
* @param version secret version
* @return SecretPart if found, null otherwise
*/
public SecretPart requestShard(String peerUrl, String user, String key, long version) {
try {
String uri = peerUrl + "/internal/recovery/shard/{user}/{key}/{version}";
SecretPart part = restClient.get()
.uri(uri, user, key, version)
.retrieve()
.body(SecretPart.class);

log.debug("Retrieved shard {}:{}:{} from peer {}", user, key, version, peerUrl);
return part;
} catch (RestClientResponseException ex) {
if (ex.getStatusCode().is4xxClientError()) {
log.debug("Shard not found on {} ({}:{}:{})", peerUrl, user, key, version);
} else {
log.warn("Failed to retrieve shard from {} with HTTP {}", peerUrl, ex.getStatusCode().value());
}
return null;
} catch (Exception ex) {
log.warn("Failed to retrieve shard from {}: {}", peerUrl, ex.getMessage());
return null;
}
}

}
Loading
Loading