From 48398beef9d283e6dd74c01bb9d162f39ab29919 Mon Sep 17 00:00:00 2001 From: Noam Ben Simon Date: Wed, 20 May 2026 00:39:37 -0400 Subject: [PATCH 1/3] repair system in-place, works as idempotent internal re-write --- docs/read-repair.md | 53 ++++++ .../config/ClusterConfig.java | 2 + .../controller/InternalController.java | 13 +- .../dto/internal/RepairCommitRequest.java | 16 ++ .../dto/internal/RepairPrepareRequest.java | 16 ++ .../communication/CommitDispatcher.java | 9 +- .../service/internal/ActionType.java | 3 +- .../service/internal/InternalGetService.java | 38 ++++- .../internal/InternalRepairService.java | 152 ++++++++++++++++++ .../service/internal/NodeClient.java | 19 +++ .../service/internal/RepairCommitHandler.java | 58 +++++++ .../internal/RepairPrepareHandler.java | 51 ++++++ .../InternalControllerDeleteTest.java | 4 + .../InternalControllerPostTest.java | 4 + .../controller/InternalControllerPutTest.java | 4 + .../communication/CommitDispatcherTest.java | 30 +++- .../internal/InternalGetServiceTest.java | 39 ++++- .../internal/InternalRepairServiceTest.java | 107 ++++++++++++ .../internal/RepairCommitHandlerTest.java | 73 +++++++++ .../internal/RepairPrepareHandlerTest.java | 56 +++++++ 20 files changed, 737 insertions(+), 10 deletions(-) create mode 100644 docs/read-repair.md create mode 100644 src/main/java/edu/yu/capstone/DistributedSecretsVault/dto/internal/RepairCommitRequest.java create mode 100644 src/main/java/edu/yu/capstone/DistributedSecretsVault/dto/internal/RepairPrepareRequest.java create mode 100644 src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalRepairService.java create mode 100644 src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/RepairCommitHandler.java create mode 100644 src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/RepairPrepareHandler.java create mode 100644 src/test/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalRepairServiceTest.java create mode 100644 src/test/java/edu/yu/capstone/DistributedSecretsVault/service/internal/RepairCommitHandlerTest.java create mode 100644 src/test/java/edu/yu/capstone/DistributedSecretsVault/service/internal/RepairPrepareHandlerTest.java diff --git a/docs/read-repair.md b/docs/read-repair.md new file mode 100644 index 0000000..7adbeee --- /dev/null +++ b/docs/read-repair.md @@ -0,0 +1,53 @@ +# Read Repair Design + +## Goal + +GET requests should keep the cluster healthy when reconstruction is still possible but shard availability is close to the Shamir threshold. + +If a latest-version GET collects only `k` or `k + repairTriggerBuffer` shards, the coordinating node reconstructs the plaintext value in memory, re-splits it into the configured `n` shards for the same version, and stages a best-effort repair before returning the value. + +Repair does not create a new secret version. + +## Trigger + +Read repair is controlled by `ClusterConfig`: + +- `cluster.repairEnabled=true` +- `cluster.repairTriggerBuffer=1` + +Repair is considered only for latest-version reads. Explicit historical version reads and all-version reads do not rewrite stored shards. + +The trigger condition is: + +```text +repairEnabled +AND totalNodes > thresholdK +AND availableShardCount >= thresholdK +AND availableShardCount <= thresholdK + repairTriggerBuffer +``` + +## Flow + +1. `InternalGetService` collects local and peer shards for the latest version. +2. If fewer than `k` shards are available, GET fails as before. +3. If at least `k` shards are available, GET reconstructs the plaintext as before. +4. If shard availability is near threshold, `InternalRepairService`: + - re-splits the plaintext into `n` shards, + - stages the local shard in `PendingActionsBuffer` as `ActionType.REPAIR`, + - sends repair prepare requests to peers, + - publishes a Kafka commit if repair quorum is reached. +5. `RepairCommitHandler` saves the staged shard at the same version. +6. GET returns the reconstructed value even if repair cannot complete. + +Plaintext is never written to durable storage or sent to peer nodes. Peers receive only Shamir shards. + +## Concurrency Semantics + +Repair follows the same prepare + Kafka commit shape used by create, update, and delete. It is intentionally best-effort under the chosen snapshot-style GET semantics: + +- A successful GET returns the value it reconstructed. +- A concurrent PUT may create a newer version while repair is running. Repair still writes only the old version number, so it does not create or overwrite the newer latest version. +- A concurrent DELETE is not rechecked immediately before returning the reconstructed GET value. +- If a repair prepare, quorum, or Kafka publish fails, the repair is discarded and the GET still succeeds. + +This keeps GET latency and behavior predictable while allowing reads to heal weakly replicated latest versions. diff --git a/src/main/java/edu/yu/capstone/DistributedSecretsVault/config/ClusterConfig.java b/src/main/java/edu/yu/capstone/DistributedSecretsVault/config/ClusterConfig.java index 1646407..cf80ac0 100644 --- a/src/main/java/edu/yu/capstone/DistributedSecretsVault/config/ClusterConfig.java +++ b/src/main/java/edu/yu/capstone/DistributedSecretsVault/config/ClusterConfig.java @@ -14,4 +14,6 @@ public class ClusterConfig { private int quorumM; private long lockTimeoutMillis; private long writeTimeoutMillis; + private boolean repairEnabled = true; + private int repairTriggerBuffer = 1; } diff --git a/src/main/java/edu/yu/capstone/DistributedSecretsVault/controller/InternalController.java b/src/main/java/edu/yu/capstone/DistributedSecretsVault/controller/InternalController.java index 960ce78..8dc18ad 100644 --- a/src/main/java/edu/yu/capstone/DistributedSecretsVault/controller/InternalController.java +++ b/src/main/java/edu/yu/capstone/DistributedSecretsVault/controller/InternalController.java @@ -5,10 +5,12 @@ import edu.yu.capstone.DistributedSecretsVault.dto.internal.DeletePrepareRequest; import edu.yu.capstone.DistributedSecretsVault.dto.internal.PostPrepareRequest; import edu.yu.capstone.DistributedSecretsVault.dto.internal.PutPrepareRequest; +import edu.yu.capstone.DistributedSecretsVault.dto.internal.RepairPrepareRequest; import edu.yu.capstone.DistributedSecretsVault.service.internal.DeletePrepareHandler; import edu.yu.capstone.DistributedSecretsVault.service.internal.InternalGetService; import edu.yu.capstone.DistributedSecretsVault.service.internal.PostPrepareHandler; import edu.yu.capstone.DistributedSecretsVault.service.internal.PutPrepareHandler; +import edu.yu.capstone.DistributedSecretsVault.service.internal.RepairPrepareHandler; import java.util.Map; import java.util.UUID; @@ -32,15 +34,18 @@ public class InternalController { private final PostPrepareHandler postPrepareHandler; private final PutPrepareHandler putPrepareHandler; private final DeletePrepareHandler deletePrepareHandler; + private final RepairPrepareHandler repairPrepareHandler; public InternalController(InternalGetService internalGetService, PostPrepareHandler postPrepareHandler, PutPrepareHandler putPrepareHandler, - DeletePrepareHandler deletePrepareHandler) { + DeletePrepareHandler deletePrepareHandler, + RepairPrepareHandler repairPrepareHandler) { this.internalGetService = internalGetService; this.postPrepareHandler = postPrepareHandler; this.putPrepareHandler = putPrepareHandler; this.deletePrepareHandler = deletePrepareHandler; + this.repairPrepareHandler = repairPrepareHandler; } @GetMapping("/{id}") @@ -68,6 +73,12 @@ public ResponseEntity preparePut(@RequestBody PutPrepareRequest request) { return ResponseEntity.noContent().build(); } + @PostMapping("/repair/prepare") + public ResponseEntity prepareRepair(@RequestBody RepairPrepareRequest request) { + repairPrepareHandler.handle(request); + return ResponseEntity.noContent().build(); + } + @DeleteMapping("/prepare") public ResponseEntity prepareDelete( @RequestParam("originatorNodeId") String originatorNodeId, diff --git a/src/main/java/edu/yu/capstone/DistributedSecretsVault/dto/internal/RepairCommitRequest.java b/src/main/java/edu/yu/capstone/DistributedSecretsVault/dto/internal/RepairCommitRequest.java new file mode 100644 index 0000000..23697d5 --- /dev/null +++ b/src/main/java/edu/yu/capstone/DistributedSecretsVault/dto/internal/RepairCommitRequest.java @@ -0,0 +1,16 @@ +package edu.yu.capstone.DistributedSecretsVault.dto.internal; + +import java.util.UUID; + +import edu.yu.capstone.DistributedSecretsVault.domain.model.SecretKey; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class RepairCommitRequest { + private UUID operationId; + private SecretKey secretKey; +} diff --git a/src/main/java/edu/yu/capstone/DistributedSecretsVault/dto/internal/RepairPrepareRequest.java b/src/main/java/edu/yu/capstone/DistributedSecretsVault/dto/internal/RepairPrepareRequest.java new file mode 100644 index 0000000..5751aad --- /dev/null +++ b/src/main/java/edu/yu/capstone/DistributedSecretsVault/dto/internal/RepairPrepareRequest.java @@ -0,0 +1,16 @@ +package edu.yu.capstone.DistributedSecretsVault.dto.internal; + +import java.util.UUID; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class RepairPrepareRequest { + private String originatorNodeId; + private UUID operationId; + private SecretPartMessage secretPartMessage; +} diff --git a/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/communication/CommitDispatcher.java b/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/communication/CommitDispatcher.java index e240d5e..aff60c1 100644 --- a/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/communication/CommitDispatcher.java +++ b/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/communication/CommitDispatcher.java @@ -8,11 +8,13 @@ import edu.yu.capstone.DistributedSecretsVault.dto.internal.DeleteCommitRequest; import edu.yu.capstone.DistributedSecretsVault.dto.internal.PostCommitRequest; import edu.yu.capstone.DistributedSecretsVault.dto.internal.PutCommitRequest; +import edu.yu.capstone.DistributedSecretsVault.dto.internal.RepairCommitRequest; import edu.yu.capstone.DistributedSecretsVault.exceptions.InternalOperationConflictException; import edu.yu.capstone.DistributedSecretsVault.service.internal.ActionType; import edu.yu.capstone.DistributedSecretsVault.service.internal.DeleteCommitHandler; import edu.yu.capstone.DistributedSecretsVault.service.internal.PostCommitHandler; import edu.yu.capstone.DistributedSecretsVault.service.internal.PutCommitHandler; +import edu.yu.capstone.DistributedSecretsVault.service.internal.RepairCommitHandler; @Service public class CommitDispatcher { @@ -21,13 +23,16 @@ public class CommitDispatcher { private final DeleteCommitHandler deleteCommitHandler; private final PostCommitHandler postCommitHandler; private final PutCommitHandler putCommitHandler; + private final RepairCommitHandler repairCommitHandler; public CommitDispatcher(DeleteCommitHandler deleteCommitHandler, PostCommitHandler postCommitHandler, - PutCommitHandler putCommitHandler) { + PutCommitHandler putCommitHandler, + RepairCommitHandler repairCommitHandler) { this.deleteCommitHandler = deleteCommitHandler; this.postCommitHandler = postCommitHandler; this.putCommitHandler = putCommitHandler; + this.repairCommitHandler = repairCommitHandler; } public void dispatch(CommitMessage message) { @@ -39,6 +44,8 @@ public void dispatch(CommitMessage message) { postCommitHandler.handle(new PostCommitRequest(message.getOperationId(), message.getSecretKey())); } else if (message.getActionType() == ActionType.PUT) { putCommitHandler.handle(new PutCommitRequest(message.getOperationId(), message.getSecretKey())); + } else if (message.getActionType() == ActionType.REPAIR) { + repairCommitHandler.handle(new RepairCommitRequest(message.getOperationId(), message.getSecretKey())); } else { log.warn("Ignoring unsupported commit action type: operationId={}, actionType={}", message.getOperationId(), message.getActionType()); diff --git a/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/ActionType.java b/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/ActionType.java index 47e4c27..edc5c08 100644 --- a/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/ActionType.java +++ b/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/ActionType.java @@ -7,5 +7,6 @@ public enum ActionType { POST, DELETE, - PUT + PUT, + REPAIR } diff --git a/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalGetService.java b/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalGetService.java index 02d3cfc..a292d11 100644 --- a/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalGetService.java +++ b/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalGetService.java @@ -1,6 +1,8 @@ package edu.yu.capstone.DistributedSecretsVault.service.internal; import org.springframework.http.ResponseEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import edu.yu.capstone.DistributedSecretsVault.config.ClusterConfig; @@ -21,25 +23,32 @@ @Service public class InternalGetService { + private static final Logger log = LoggerFactory.getLogger(InternalGetService.class); + private final SecretPartRepository secretPartRepository; private final SecretReconstructionService secretReconstructionService; private final NodeClient nodeClient; private final ClusterConfig clusterConfig; + private final InternalRepairService internalRepairService; public InternalGetService(SecretPartRepository secretPartRepository, SecretReconstructionService secretReconstructionService, NodeClient nodeClient, - ClusterConfig clusterConfig) { + ClusterConfig clusterConfig, + InternalRepairService internalRepairService) { this.secretPartRepository = secretPartRepository; this.secretReconstructionService = secretReconstructionService; this.nodeClient = nodeClient; this.clusterConfig = clusterConfig; + this.internalRepairService = internalRepairService; } public String getAcrossCluster(SecretKey key, Long version) { validateKey(key); - List selected = collectPartsForReconstruction(key, version); - return secretReconstructionService.reconstruct(selected); + ReconstructionParts reconstructionParts = collectPartsForReconstruction(key, version); + String value = secretReconstructionService.reconstruct(reconstructionParts.selectedParts()); + maybeRepairLatestRead(key, version, reconstructionParts, value); + return value; } public Map getAllVersionsAcrossCluster(SecretKey key) { @@ -103,7 +112,7 @@ private void validateKey(SecretKey key) { } } - private List collectPartsForReconstruction(SecretKey key, Long requestedVersion) { + private ReconstructionParts collectPartsForReconstruction(SecretKey key, Long requestedVersion) { Map> partsByVersion = new LinkedHashMap<>(); addLocalPart(partsByVersion, key, requestedVersion); @@ -127,7 +136,7 @@ private List collectPartsForReconstruction(SecretKey key, Long reque if (selectedParts == null || selectedParts.isEmpty()) { throw new SecretNotFoundException(); } - return requireThreshold(selectedParts); + return new ReconstructionParts(requireThreshold(selectedParts), resolvedVersion, selectedParts.size()); } private Map> collectAllPartsByVersion(SecretKey key) { @@ -183,6 +192,22 @@ private List resolvePeerUrls() { return nodeClient == null ? List.of() : nodeClient.resolvePeerUrls(); } + private void maybeRepairLatestRead(SecretKey key, Long requestedVersion, + ReconstructionParts reconstructionParts, String value) { + if (requestedVersion != null || internalRepairService == null) { + return; + } + if (!internalRepairService.shouldRepairLatestRead(reconstructionParts.availableParts())) { + return; + } + try { + internalRepairService.repairLatestVersion(key, reconstructionParts.version(), value); + } catch (RuntimeException e) { + log.warn("Read repair skipped after successful reconstruction: key={}, version={}, reason={}", + key, reconstructionParts.version(), e.getMessage()); + } + } + private int resolveTotalParts() { if (clusterConfig == null || clusterConfig.getTotalNodes() <= 0) { return 1; @@ -200,4 +225,7 @@ private int resolveThreshold(int totalParts) { } return threshold; } + + private record ReconstructionParts(List selectedParts, long version, int availableParts) { + } } diff --git a/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalRepairService.java b/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalRepairService.java new file mode 100644 index 0000000..c9dafd2 --- /dev/null +++ b/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalRepairService.java @@ -0,0 +1,152 @@ +package edu.yu.capstone.DistributedSecretsVault.service.internal; + +import java.util.Comparator; +import java.util.List; +import java.util.UUID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +import edu.yu.capstone.DistributedSecretsVault.config.ClusterConfig; +import edu.yu.capstone.DistributedSecretsVault.domain.model.SecretKey; +import edu.yu.capstone.DistributedSecretsVault.domain.model.SecretPart; +import edu.yu.capstone.DistributedSecretsVault.dto.internal.CommitMessage; +import edu.yu.capstone.DistributedSecretsVault.dto.internal.RepairPrepareRequest; +import edu.yu.capstone.DistributedSecretsVault.dto.internal.SecretPartMessage; +import edu.yu.capstone.DistributedSecretsVault.service.communication.CommitPublisher; +import edu.yu.capstone.DistributedSecretsVault.service.internal.NodeClient.PeerResponse; +import edu.yu.capstone.DistributedSecretsVault.service.secret.SecretSharingService; + +@Service +public class InternalRepairService { + private static final Logger log = LoggerFactory.getLogger(InternalRepairService.class); + + private final NodeClient nodeClient; + private final SecretSharingService secretSharingService; + private final PendingActionsBuffer pendingActionsBuffer; + private final CommitPublisher commitPublisher; + private final ClusterConfig clusterConfig; + private final String nodeId; + + public InternalRepairService(NodeClient nodeClient, + SecretSharingService secretSharingService, + PendingActionsBuffer pendingActionsBuffer, + CommitPublisher commitPublisher, + ClusterConfig clusterConfig) { + this.nodeClient = nodeClient; + this.secretSharingService = secretSharingService; + this.pendingActionsBuffer = pendingActionsBuffer; + this.commitPublisher = commitPublisher; + this.clusterConfig = clusterConfig; + + String envNodeId = System.getenv("NODE_NAME"); + if (envNodeId == null || envNodeId.isBlank()) { + envNodeId = System.getProperty("NODE_NAME"); + } + this.nodeId = (envNodeId != null && !envNodeId.isBlank()) + ? envNodeId : "local-node"; + } + + public boolean shouldRepairLatestRead(int availableParts) { + if (clusterConfig == null || !clusterConfig.isRepairEnabled()) { + return false; + } + int totalParts = resolveTotalParts(); + int threshold = resolveThreshold(totalParts); + if (totalParts <= threshold || availableParts < threshold) { + return false; + } + int buffer = Math.max(clusterConfig.getRepairTriggerBuffer(), 0); + return availableParts <= threshold + buffer; + } + + public void repairLatestVersion(SecretKey key, long version, String value) { + if (key == null || value == null) { + throw new IllegalArgumentException("Secret key and value are required for repair"); + } + + UUID operationId = UUID.randomUUID(); + List parts = createParts(key, value, version); + SecretPart localPart = parts.get(0); + List peerParts = parts.subList(1, parts.size()); + List peerUrls = nodeClient.resolvePeerUrls(); + + log.info("Starting read repair: operationId={}, key={}, version={}", operationId, key, version); + pendingActionsBuffer.bufferAction(operationId, key, ActionType.REPAIR, localPart); + + int peerAcks = broadcastPrepare(peerUrls, peerParts, operationId); + int totalAcks = peerAcks + 1; + int requiredAcks = computeRequiredAcks(); + if (totalAcks < requiredAcks) { + pendingActionsBuffer.discard(operationId); + log.warn("Read repair skipped because quorum was not reached: operationId={}, totalAcks={}, requiredAcks={}", + operationId, totalAcks, requiredAcks); + return; + } + + try { + commitPublisher.broadcastCommit(new CommitMessage(operationId, key, ActionType.REPAIR)); + log.info("Read repair commit submitted to Kafka: operationId={}, version={}", operationId, version); + } catch (RuntimeException e) { + pendingActionsBuffer.discard(operationId); + log.warn("Read repair skipped because commit publish failed: operationId={}, reason={}", + operationId, e.getMessage()); + } + } + + private int broadcastPrepare(List peerUrls, List peerParts, UUID operationId) { + int acks = 0; + int count = Math.min(peerUrls.size(), peerParts.size()); + for (int i = 0; i < count; i++) { + SecretPart part = peerParts.get(i); + SecretPartMessage message = new SecretPartMessage( + part.getKey(), + part.getVersion(), + part.getShard(), + System.currentTimeMillis(), + part.getPartIndex()); + RepairPrepareRequest request = new RepairPrepareRequest(nodeId, operationId, message); + PeerResponse response = nodeClient.sendRepairPrepare(peerUrls.get(i), request); + if (response.acknowledged()) { + acks++; + } else { + log.debug("Repair prepare was not acknowledged by peer={}, status={}, error={}", + response.peerUrl(), response.statusCode(), response.errorMessage()); + } + } + return acks; + } + + private List createParts(SecretKey key, String value, long version) { + int totalParts = resolveTotalParts(); + int threshold = resolveThreshold(totalParts); + return secretSharingService.split(key, value, threshold, totalParts).stream() + .peek(part -> part.setVersion(version)) + .sorted(Comparator.comparingInt(SecretPart::getPartIndex)) + .toList(); + } + + private int computeRequiredAcks() { + int required = clusterConfig == null ? 0 : clusterConfig.getQuorumM(); + return Math.max(required, 1); + } + + private int resolveTotalParts() { + if (clusterConfig == null || clusterConfig.getTotalNodes() <= 0) { + return 1; + } + return clusterConfig.getTotalNodes(); + } + + private int resolveThreshold(int totalParts) { + int threshold = clusterConfig == null ? 0 : clusterConfig.getThresholdK(); + if (threshold <= 0) { + threshold = 1; + } + if (threshold > totalParts) { + threshold = totalParts; + } + return threshold; + } +} diff --git a/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/NodeClient.java b/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/NodeClient.java index 36b0c6f..d2280bd 100644 --- a/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/NodeClient.java +++ b/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/NodeClient.java @@ -19,6 +19,7 @@ import edu.yu.capstone.DistributedSecretsVault.dto.internal.DeletePrepareRequest; import edu.yu.capstone.DistributedSecretsVault.dto.internal.PostPrepareRequest; import edu.yu.capstone.DistributedSecretsVault.dto.internal.PutPrepareRequest; +import edu.yu.capstone.DistributedSecretsVault.dto.internal.RepairPrepareRequest; import io.scalecube.services.Microservices; /** @@ -124,6 +125,24 @@ public PeerResponse sendPutPrepare(String peerUrl, PutPrepareRequest request) { } } + public PeerResponse sendRepairPrepare(String peerUrl, RepairPrepareRequest request) { + try { + restClient.post() + .uri(peerUrl + "/internal/repair/prepare") + .body(request) + .retrieve() + .toBodilessEntity(); + log.debug("Repair prepare ACK received from {}", peerUrl); + return PeerResponse.acknowledged(peerUrl); + } catch (RestClientResponseException ex) { + log.warn("Repair prepare rejected by {} with HTTP {}", peerUrl, ex.getStatusCode().value()); + return PeerResponse.rejected(peerUrl, ex.getStatusCode().value(), ex.getResponseBodyAsString()); + } catch (Exception ex) { + log.warn("Failed to send repair prepare to {}: {}", peerUrl, ex.getMessage()); + return PeerResponse.failed(peerUrl, ex.getMessage()); + } + } + public SecretPartResponse fetchSecretPart(String peerUrl, SecretKey key, Long version) { try { SecretPart part; diff --git a/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/RepairCommitHandler.java b/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/RepairCommitHandler.java new file mode 100644 index 0000000..7c5dbd8 --- /dev/null +++ b/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/RepairCommitHandler.java @@ -0,0 +1,58 @@ +package edu.yu.capstone.DistributedSecretsVault.service.internal; + +import org.springframework.stereotype.Service; + +import edu.yu.capstone.DistributedSecretsVault.domain.model.SecretPart; +import edu.yu.capstone.DistributedSecretsVault.dto.internal.RepairCommitRequest; +import edu.yu.capstone.DistributedSecretsVault.exceptions.InternalOperationConflictException; +import edu.yu.capstone.DistributedSecretsVault.repository.SecretPartRepository; +import edu.yu.capstone.DistributedSecretsVault.service.internal.PendingActionsBuffer.PendingAction; + +@Service +public class RepairCommitHandler { + private final PendingActionsBuffer pendingActionsBuffer; + private final SecretPartRepository secretPartRepository; + + public RepairCommitHandler(PendingActionsBuffer pendingActionsBuffer, + SecretPartRepository secretPartRepository) { + this.pendingActionsBuffer = pendingActionsBuffer; + this.secretPartRepository = secretPartRepository; + } + + public void handle(RepairCommitRequest request) { + validateRequest(request); + + PendingAction committed = pendingActionsBuffer.commitAndRemove(request.getOperationId()); + if (committed == null) { + throw new InternalOperationConflictException( + "No staged operation found for repair commit: " + request.getOperationId()); + } + if (committed.actionType() != ActionType.REPAIR) { + throw new InternalOperationConflictException( + "Staged operation is not a repair: " + request.getOperationId()); + } + if (!committed.secretKey().equals(request.getSecretKey())) { + throw new InternalOperationConflictException( + "Commit secret key does not match staged repair: " + request.getOperationId()); + } + SecretPart part = committed.secretPart(); + if (part == null || part.getVersion() == null) { + throw new InternalOperationConflictException( + "No staged shard found for repair commit: " + request.getOperationId()); + } + + secretPartRepository.savePart(part); + } + + private void validateRequest(RepairCommitRequest request) { + if (request == null) { + throw new IllegalArgumentException("Repair commit request is required"); + } + if (request.getOperationId() == null) { + throw new IllegalArgumentException("Operation ID is required"); + } + if (request.getSecretKey() == null) { + throw new IllegalArgumentException("Secret key is required"); + } + } +} diff --git a/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/RepairPrepareHandler.java b/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/RepairPrepareHandler.java new file mode 100644 index 0000000..28803a1 --- /dev/null +++ b/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/RepairPrepareHandler.java @@ -0,0 +1,51 @@ +package edu.yu.capstone.DistributedSecretsVault.service.internal; + +import org.springframework.stereotype.Service; + +import edu.yu.capstone.DistributedSecretsVault.domain.model.SecretPart; +import edu.yu.capstone.DistributedSecretsVault.dto.internal.RepairPrepareRequest; +import edu.yu.capstone.DistributedSecretsVault.dto.internal.SecretPartMessage; + +@Service +public class RepairPrepareHandler { + private final PendingActionsBuffer pendingActionsBuffer; + + public RepairPrepareHandler(PendingActionsBuffer pendingActionsBuffer) { + this.pendingActionsBuffer = pendingActionsBuffer; + } + + public void handle(RepairPrepareRequest request) { + validateRequest(request); + + SecretPartMessage message = request.getSecretPartMessage(); + SecretPart part = new SecretPart( + message.getKey(), + message.getVersion(), + message.getPartIndex(), + message.getShard()); + pendingActionsBuffer.bufferAction( + request.getOperationId(), message.getKey(), ActionType.REPAIR, part); + } + + private void validateRequest(RepairPrepareRequest request) { + if (request == null) { + throw new IllegalArgumentException("Repair prepare request is required"); + } + if (request.getOperationId() == null) { + throw new IllegalArgumentException("Operation ID is required"); + } + SecretPartMessage message = request.getSecretPartMessage(); + if (message == null) { + throw new IllegalArgumentException("Secret part message is required"); + } + if (message.getKey() == null) { + throw new IllegalArgumentException("Secret key is required"); + } + if (message.getVersion() == null) { + throw new IllegalArgumentException("Secret version is required"); + } + if (message.getShard() == null) { + throw new IllegalArgumentException("Secret value is required"); + } + } +} diff --git a/src/test/java/edu/yu/capstone/DistributedSecretsVault/controller/InternalControllerDeleteTest.java b/src/test/java/edu/yu/capstone/DistributedSecretsVault/controller/InternalControllerDeleteTest.java index 09fbdd1..815ed39 100644 --- a/src/test/java/edu/yu/capstone/DistributedSecretsVault/controller/InternalControllerDeleteTest.java +++ b/src/test/java/edu/yu/capstone/DistributedSecretsVault/controller/InternalControllerDeleteTest.java @@ -5,6 +5,7 @@ import edu.yu.capstone.DistributedSecretsVault.service.internal.InternalGetService; import edu.yu.capstone.DistributedSecretsVault.service.internal.PostPrepareHandler; import edu.yu.capstone.DistributedSecretsVault.service.internal.PutPrepareHandler; +import edu.yu.capstone.DistributedSecretsVault.service.internal.RepairPrepareHandler; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; @@ -39,6 +40,9 @@ public class InternalControllerDeleteTest { @MockitoBean private DeletePrepareHandler deletePrepareHandler; + @MockitoBean + private RepairPrepareHandler repairPrepareHandler; + @Test void testPrepareDeleteReturnsNoContent() throws Exception { doNothing().when(deletePrepareHandler).handle(any(DeletePrepareRequest.class)); diff --git a/src/test/java/edu/yu/capstone/DistributedSecretsVault/controller/InternalControllerPostTest.java b/src/test/java/edu/yu/capstone/DistributedSecretsVault/controller/InternalControllerPostTest.java index 0b78040..b02ae86 100644 --- a/src/test/java/edu/yu/capstone/DistributedSecretsVault/controller/InternalControllerPostTest.java +++ b/src/test/java/edu/yu/capstone/DistributedSecretsVault/controller/InternalControllerPostTest.java @@ -19,6 +19,7 @@ import edu.yu.capstone.DistributedSecretsVault.service.internal.InternalGetService; import edu.yu.capstone.DistributedSecretsVault.service.internal.PostPrepareHandler; import edu.yu.capstone.DistributedSecretsVault.service.internal.PutPrepareHandler; +import edu.yu.capstone.DistributedSecretsVault.service.internal.RepairPrepareHandler; @WebMvcTest(InternalController.class) @Tag("slice") @@ -38,6 +39,9 @@ public class InternalControllerPostTest { @MockitoBean private DeletePrepareHandler deletePrepareHandler; + @MockitoBean + private RepairPrepareHandler repairPrepareHandler; + @Test void testPreparePostReturnsNoContent() throws Exception { doNothing().when(postPrepareHandler).handle(any(PostPrepareRequest.class)); diff --git a/src/test/java/edu/yu/capstone/DistributedSecretsVault/controller/InternalControllerPutTest.java b/src/test/java/edu/yu/capstone/DistributedSecretsVault/controller/InternalControllerPutTest.java index c627b0e..8551fad 100644 --- a/src/test/java/edu/yu/capstone/DistributedSecretsVault/controller/InternalControllerPutTest.java +++ b/src/test/java/edu/yu/capstone/DistributedSecretsVault/controller/InternalControllerPutTest.java @@ -19,6 +19,7 @@ import edu.yu.capstone.DistributedSecretsVault.service.internal.InternalGetService; import edu.yu.capstone.DistributedSecretsVault.service.internal.PostPrepareHandler; import edu.yu.capstone.DistributedSecretsVault.service.internal.PutPrepareHandler; +import edu.yu.capstone.DistributedSecretsVault.service.internal.RepairPrepareHandler; @WebMvcTest(InternalController.class) @Tag("slice") @@ -38,6 +39,9 @@ public class InternalControllerPutTest { @MockitoBean private DeletePrepareHandler deletePrepareHandler; + @MockitoBean + private RepairPrepareHandler repairPrepareHandler; + @Test void testPreparePutReturnsNoContent() throws Exception { doNothing().when(putPrepareHandler).handle(any(PutPrepareRequest.class)); diff --git a/src/test/java/edu/yu/capstone/DistributedSecretsVault/service/communication/CommitDispatcherTest.java b/src/test/java/edu/yu/capstone/DistributedSecretsVault/service/communication/CommitDispatcherTest.java index c1a15d7..02b86d0 100644 --- a/src/test/java/edu/yu/capstone/DistributedSecretsVault/service/communication/CommitDispatcherTest.java +++ b/src/test/java/edu/yu/capstone/DistributedSecretsVault/service/communication/CommitDispatcherTest.java @@ -18,11 +18,13 @@ import edu.yu.capstone.DistributedSecretsVault.dto.internal.DeleteCommitRequest; import edu.yu.capstone.DistributedSecretsVault.dto.internal.PostCommitRequest; import edu.yu.capstone.DistributedSecretsVault.dto.internal.PutCommitRequest; +import edu.yu.capstone.DistributedSecretsVault.dto.internal.RepairCommitRequest; import edu.yu.capstone.DistributedSecretsVault.exceptions.InternalOperationConflictException; import edu.yu.capstone.DistributedSecretsVault.service.internal.ActionType; import edu.yu.capstone.DistributedSecretsVault.service.internal.DeleteCommitHandler; import edu.yu.capstone.DistributedSecretsVault.service.internal.PostCommitHandler; import edu.yu.capstone.DistributedSecretsVault.service.internal.PutCommitHandler; +import edu.yu.capstone.DistributedSecretsVault.service.internal.RepairCommitHandler; /** * Unit tests for {@link CommitDispatcher}. @@ -41,11 +43,14 @@ public class CommitDispatcherTest { @Mock private PutCommitHandler putCommitHandler; + @Mock + private RepairCommitHandler repairCommitHandler; + private CommitDispatcher dispatcher; @BeforeEach void setUp() { - dispatcher = new CommitDispatcher(deleteCommitHandler, postCommitHandler, putCommitHandler); + dispatcher = new CommitDispatcher(deleteCommitHandler, postCommitHandler, putCommitHandler, repairCommitHandler); } // ── Routing ───────────────────────────────────────────────────────── @@ -86,6 +91,19 @@ void testDispatchesPutToPutHandler() { verify(postCommitHandler, never()).handle(any()); } + @Test + void testDispatchesRepairToRepairHandler() { + CommitMessage msg = new CommitMessage( + UUID.randomUUID(), new SecretKey("user1", "secret1"), ActionType.REPAIR); + + dispatcher.dispatch(msg); + + verify(repairCommitHandler).handle(any(RepairCommitRequest.class)); + verify(deleteCommitHandler, never()).handle(any()); + verify(postCommitHandler, never()).handle(any()); + verify(putCommitHandler, never()).handle(any()); + } + // ── Stale / Conflicting Commits ───────────────────────────────────── @Test @@ -119,6 +137,16 @@ void testStalePutCommitIsCaughtAndLogged() { assertDoesNotThrow(() -> dispatcher.dispatch(msg)); } + @Test + void testStaleRepairCommitIsCaughtAndLogged() { + CommitMessage msg = new CommitMessage( + UUID.randomUUID(), new SecretKey("user1", "secret1"), ActionType.REPAIR); + doThrow(new InternalOperationConflictException("No staged operation found")) + .when(repairCommitHandler).handle(any(RepairCommitRequest.class)); + + assertDoesNotThrow(() -> dispatcher.dispatch(msg)); + } + // ── Validation ────────────────────────────────────────────────────── @Test diff --git a/src/test/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalGetServiceTest.java b/src/test/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalGetServiceTest.java index c97bed8..a14d049 100644 --- a/src/test/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalGetServiceTest.java +++ b/src/test/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalGetServiceTest.java @@ -26,6 +26,7 @@ import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -40,6 +41,9 @@ public class InternalGetServiceTest { @Mock private NodeClient nodeClient; + @Mock + private InternalRepairService internalRepairService; + private ClusterConfig clusterConfig; private InternalGetService internalGetService; @@ -52,7 +56,7 @@ void setUp() { clusterConfig.setTotalNodes(3); clusterConfig.setThresholdK(2); internalGetService = new InternalGetService(secretPartRepository, secretReconstructionService, - nodeClient, clusterConfig); + nodeClient, clusterConfig, internalRepairService); lenient().when(nodeClient.resolvePeerUrls()).thenReturn(List.of()); validKey = new SecretKey("user1", "secret1"); } @@ -73,6 +77,39 @@ void testGetAcrossClusterLatestVersion() { verify(secretReconstructionService).reconstruct(argThat(parts -> parts.size() == 2)); } + @Test + void testGetAcrossClusterRepairsWeakLatestRead() { + SecretPart part = new SecretPart(validKey, 2L, 1, new byte[] { 1, 2 }); + SecretPart peerPart = new SecretPart(validKey, 2L, 2, new byte[] { 3, 4 }); + when(secretPartRepository.findLatest(validKey)).thenReturn(Optional.of(part)); + when(nodeClient.resolvePeerUrls()).thenReturn(List.of("http://peer1:8080")); + when(nodeClient.fetchSecretPart("http://peer1:8080", validKey, null)) + .thenReturn(SecretPartResponse.found("http://peer1:8080", peerPart)); + when(secretReconstructionService.reconstruct(anyList())).thenReturn("reconstructed"); + when(internalRepairService.shouldRepairLatestRead(2)).thenReturn(true); + + String result = internalGetService.getAcrossCluster(validKey, null); + + assertEquals("reconstructed", result); + verify(internalRepairService).repairLatestVersion(validKey, 2L, "reconstructed"); + } + + @Test + void testGetAcrossClusterDoesNotRepairSpecificVersionRead() { + SecretPart part = new SecretPart(validKey, 1L, 1, new byte[] { 1 }); + SecretPart peerPart = new SecretPart(validKey, 1L, 2, new byte[] { 2 }); + when(secretPartRepository.findPart(validKey, 1L)).thenReturn(Optional.of(part)); + when(nodeClient.resolvePeerUrls()).thenReturn(List.of("http://peer1:8080")); + when(nodeClient.fetchSecretPart("http://peer1:8080", validKey, 1L)) + .thenReturn(SecretPartResponse.found("http://peer1:8080", peerPart)); + when(secretReconstructionService.reconstruct(anyList())).thenReturn("v1-secret"); + + String result = internalGetService.getAcrossCluster(validKey, 1L); + + assertEquals("v1-secret", result); + verify(internalRepairService, never()).repairLatestVersion(validKey, 1L, "v1-secret"); + } + @Test void testGetAcrossClusterSpecificVersion() { SecretPart part = new SecretPart(validKey, 1L, 1, new byte[] { 1 }); diff --git a/src/test/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalRepairServiceTest.java b/src/test/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalRepairServiceTest.java new file mode 100644 index 0000000..dc2cdc8 --- /dev/null +++ b/src/test/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalRepairServiceTest.java @@ -0,0 +1,107 @@ +package edu.yu.capstone.DistributedSecretsVault.service.internal; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.List; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import edu.yu.capstone.DistributedSecretsVault.config.ClusterConfig; +import edu.yu.capstone.DistributedSecretsVault.domain.model.SecretKey; +import edu.yu.capstone.DistributedSecretsVault.domain.model.SecretPart; +import edu.yu.capstone.DistributedSecretsVault.dto.internal.CommitMessage; +import edu.yu.capstone.DistributedSecretsVault.dto.internal.RepairPrepareRequest; +import edu.yu.capstone.DistributedSecretsVault.service.communication.CommitPublisher; +import edu.yu.capstone.DistributedSecretsVault.service.internal.NodeClient.PeerResponse; +import edu.yu.capstone.DistributedSecretsVault.service.secret.SecretSharingService; + +@ExtendWith(MockitoExtension.class) +@Tag("unit") +public class InternalRepairServiceTest { + @Mock + private NodeClient nodeClient; + + @Mock + private SecretSharingService secretSharingService; + + @Mock + private PendingActionsBuffer pendingActionsBuffer; + + @Mock + private CommitPublisher commitPublisher; + + private ClusterConfig clusterConfig; + private InternalRepairService service; + private SecretKey key; + + @BeforeEach + void setUp() { + clusterConfig = new ClusterConfig(); + clusterConfig.setTotalNodes(3); + clusterConfig.setThresholdK(2); + clusterConfig.setQuorumM(2); + clusterConfig.setRepairEnabled(true); + clusterConfig.setRepairTriggerBuffer(1); + service = new InternalRepairService(nodeClient, secretSharingService, pendingActionsBuffer, + commitPublisher, clusterConfig); + key = new SecretKey("user1", "secret1"); + } + + @Test + void testShouldRepairAtThresholdOrThresholdPlusBuffer() { + assertTrue(service.shouldRepairLatestRead(2)); + assertTrue(service.shouldRepairLatestRead(3)); + } + + @Test + void testShouldNotRepairWhenDisabledOrBelowThreshold() { + assertFalse(service.shouldRepairLatestRead(1)); + clusterConfig.setRepairEnabled(false); + assertFalse(service.shouldRepairLatestRead(2)); + } + + @Test + void testRepairStagesAndPublishesCommitWhenQuorumReached() { + when(secretSharingService.split(key, "secret", 2, 3)).thenReturn(parts()); + when(nodeClient.resolvePeerUrls()).thenReturn(List.of("http://peer1:8080", "http://peer2:8080")); + when(nodeClient.sendRepairPrepare(anyString(), any(RepairPrepareRequest.class))) + .thenAnswer(invocation -> PeerResponse.acknowledged(invocation.getArgument(0))); + + service.repairLatestVersion(key, 4L, "secret"); + + verify(pendingActionsBuffer).bufferAction(any(), eq(key), eq(ActionType.REPAIR), any(SecretPart.class)); + verify(commitPublisher).broadcastCommit(any(CommitMessage.class)); + } + + @Test + void testRepairDiscardsWhenQuorumNotReached() { + when(secretSharingService.split(key, "secret", 2, 3)).thenReturn(parts()); + when(nodeClient.resolvePeerUrls()).thenReturn(List.of("http://peer1:8080", "http://peer2:8080")); + when(nodeClient.sendRepairPrepare(anyString(), any(RepairPrepareRequest.class))) + .thenAnswer(invocation -> PeerResponse.failed(invocation.getArgument(0), "timeout")); + + service.repairLatestVersion(key, 4L, "secret"); + + verify(pendingActionsBuffer).discard(any()); + verify(commitPublisher, never()).broadcastCommit(any()); + } + + private List parts() { + return List.of( + new SecretPart(key, null, 1, new byte[] { 1 }), + new SecretPart(key, null, 2, new byte[] { 2 }), + new SecretPart(key, null, 3, new byte[] { 3 })); + } +} diff --git a/src/test/java/edu/yu/capstone/DistributedSecretsVault/service/internal/RepairCommitHandlerTest.java b/src/test/java/edu/yu/capstone/DistributedSecretsVault/service/internal/RepairCommitHandlerTest.java new file mode 100644 index 0000000..4aa4a08 --- /dev/null +++ b/src/test/java/edu/yu/capstone/DistributedSecretsVault/service/internal/RepairCommitHandlerTest.java @@ -0,0 +1,73 @@ +package edu.yu.capstone.DistributedSecretsVault.service.internal; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.time.Instant; +import java.util.UUID; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import edu.yu.capstone.DistributedSecretsVault.domain.model.SecretKey; +import edu.yu.capstone.DistributedSecretsVault.domain.model.SecretPart; +import edu.yu.capstone.DistributedSecretsVault.dto.internal.RepairCommitRequest; +import edu.yu.capstone.DistributedSecretsVault.exceptions.InternalOperationConflictException; +import edu.yu.capstone.DistributedSecretsVault.repository.SecretPartRepository; +import edu.yu.capstone.DistributedSecretsVault.service.internal.PendingActionsBuffer.PendingAction; + +@ExtendWith(MockitoExtension.class) +@Tag("unit") +public class RepairCommitHandlerTest { + @Mock + private PendingActionsBuffer pendingActionsBuffer; + + @Mock + private SecretPartRepository secretPartRepository; + + private RepairCommitHandler handler; + private SecretKey key; + private SecretPart part; + + @BeforeEach + void setUp() { + handler = new RepairCommitHandler(pendingActionsBuffer, secretPartRepository); + key = new SecretKey("user1", "secret1"); + part = new SecretPart(key, 2L, 1, new byte[] { 1, 2, 3 }); + } + + @Test + void testHandleSavesBufferedShardWithoutNewVersion() { + UUID operationId = UUID.randomUUID(); + when(pendingActionsBuffer.commitAndRemove(operationId)) + .thenReturn(new PendingAction(operationId, key, ActionType.REPAIR, part, Instant.now())); + + handler.handle(new RepairCommitRequest(operationId, key)); + + verify(secretPartRepository).savePart(part); + } + + @Test + void testHandleRejectsUnknownOperation() { + UUID operationId = UUID.randomUUID(); + when(pendingActionsBuffer.commitAndRemove(operationId)).thenReturn(null); + + assertThrows(InternalOperationConflictException.class, + () -> handler.handle(new RepairCommitRequest(operationId, key))); + } + + @Test + void testHandleRejectsWrongActionType() { + UUID operationId = UUID.randomUUID(); + when(pendingActionsBuffer.commitAndRemove(operationId)) + .thenReturn(new PendingAction(operationId, key, ActionType.PUT, part, Instant.now())); + + assertThrows(InternalOperationConflictException.class, + () -> handler.handle(new RepairCommitRequest(operationId, key))); + } +} diff --git a/src/test/java/edu/yu/capstone/DistributedSecretsVault/service/internal/RepairPrepareHandlerTest.java b/src/test/java/edu/yu/capstone/DistributedSecretsVault/service/internal/RepairPrepareHandlerTest.java new file mode 100644 index 0000000..9d0b6af --- /dev/null +++ b/src/test/java/edu/yu/capstone/DistributedSecretsVault/service/internal/RepairPrepareHandlerTest.java @@ -0,0 +1,56 @@ +package edu.yu.capstone.DistributedSecretsVault.service.internal; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.verify; + +import java.util.UUID; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import edu.yu.capstone.DistributedSecretsVault.domain.model.SecretKey; +import edu.yu.capstone.DistributedSecretsVault.domain.model.SecretPart; +import edu.yu.capstone.DistributedSecretsVault.dto.internal.RepairPrepareRequest; +import edu.yu.capstone.DistributedSecretsVault.dto.internal.SecretPartMessage; + +@ExtendWith(MockitoExtension.class) +@Tag("unit") +public class RepairPrepareHandlerTest { + @Mock + private PendingActionsBuffer pendingActionsBuffer; + + private RepairPrepareHandler handler; + private SecretKey key; + + @BeforeEach + void setUp() { + handler = new RepairPrepareHandler(pendingActionsBuffer); + key = new SecretKey("user1", "secret1"); + } + + @Test + void testHandleBuffersRepairPart() { + UUID operationId = UUID.randomUUID(); + SecretPartMessage message = new SecretPartMessage(key, 2L, new byte[] { 1 }, 1L, 1); + + handler.handle(new RepairPrepareRequest("node-1", operationId, message)); + + verify(pendingActionsBuffer).bufferAction(eq(operationId), eq(key), eq(ActionType.REPAIR), + any(SecretPart.class)); + } + + @Test + void testHandleRejectsMissingVersion() { + UUID operationId = UUID.randomUUID(); + SecretPartMessage message = new SecretPartMessage(key, null, new byte[] { 1 }, 1L, 1); + + assertThrows(IllegalArgumentException.class, + () -> handler.handle(new RepairPrepareRequest("node-1", operationId, message))); + } +} From f4c1946b59f6fb5e74c582947a9439aa0dba2ddd Mon Sep 17 00:00:00 2001 From: Noam Ben Simon Date: Wed, 20 May 2026 00:42:42 -0400 Subject: [PATCH 2/3] update docs --- docs/architecture.md | 11 +++++++++ docs/challenges.md | 28 ++++++++++++++--------- docs/read-repair.md | 53 -------------------------------------------- docs/scope.md | 7 ++++++ 4 files changed, 35 insertions(+), 64 deletions(-) delete mode 100644 docs/read-repair.md diff --git a/docs/architecture.md b/docs/architecture.md index 68a5a3b..8732cf3 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -76,6 +76,9 @@ sequenceDiagram - The user may request all versions of a secret, which will return a map of version to secret value - Receiving node requests k - 1 shards from nodes, and gets one from itself (minimum threshold to reconstruct) - Node reconstructs the plaintext secret in memory using Shamir's algorithm (requires k of n shards) +- For latest-version reads, the node checks how many shards were actually available. If reconstruction succeeds but the cluster only returned `k` or `k + repairTriggerBuffer` shards, the node performs best-effort read repair before returning the value. +- Read repair re-splits the reconstructed plaintext in memory and republishes shards for the same version through the internal prepare + Kafka commit flow. It does **not** create a new version. +- Explicit historical version reads and all-version reads do not trigger repair. - **Plaintext exists only in memory during reconstruction, never written to disk** - Node returns the secret value to the client and clears it from memory @@ -92,6 +95,12 @@ sequenceDiagram Node->>Cluster: Request k-1 additional shards via ScaleCube Cluster-->>Node: Return shards (encrypted in transit) Node->>Node: Reconstruct plaintext in memory
using Shamir's algorithm (k of n shards) + opt Latest read has only k or k+buffer shards + Node->>Node: Re-split plaintext into same version shards + Node->>Cluster: Prepare repair shards + Cluster-->>Node: Repair ACKs + Node->>Kafka: Publish repair commit + end Node-->>Ingress: Return secret value Ingress-->>User: Secret value ``` @@ -211,3 +220,5 @@ graph LR - If failure occurs in the **ordering phase**, no shard writes are committed and the request fails. - If failure occurs in the **writing phase**, partially written shards are rolled back. - Recovered nodes rejoin automatically via ScaleCube and synchronize state from Kafka and peers. +- Latest-version reads can also repair degraded shard placement when at least `k` shards remain. This read repair is best-effort: a successful GET still returns the reconstructed value even if repair cannot reach quorum. +- Repair uses `ActionType.REPAIR`, stages replacement shards in memory, and commits through Kafka like other internal mutations. The committed shard keeps the existing version number. diff --git a/docs/challenges.md b/docs/challenges.md index 64df778..abbbd2d 100644 --- a/docs/challenges.md +++ b/docs/challenges.md @@ -6,34 +6,40 @@ 2. **Quorum-Based Reconstruction** Reads collect at least `k` shards and reconstruct only in memory. If fewer than `k` shards are available, the read fails deterministically instead of returning partial or stale data. -3. **Create vs Update Under Concurrency** +3. **Read Repair Under Degraded Replication** + Latest-version reads also measure how many shards were actually available. If a GET can reconstruct the value but only has `k` or `k + repairTriggerBuffer` shards, the coordinating node performs best-effort read repair before returning. Repair re-splits the reconstructed plaintext in memory and redistributes shards for the same version through the existing prepare + Kafka commit path. It does not create a new version, and it does not apply to explicit historical reads. + +4. **Create vs Update Under Concurrency** Create requires non-existent key; update requires existing key. Both use the same Kafka-based two-phase write flow. This keeps write ordering consistent while preserving operation-specific preconditions. -4. **Versioning and Time Metadata** +5. **Versioning and Time Metadata** The DSV Worker attaches request timestamp metadata. Versions are committed in per-key Kafka order. This avoids relying on a global clock source while maintaining monotonic per-key history. -5. **History and Validity Intervals** +6. **History and Validity Intervals** Each version is independently stored and retrievable. `valid_from`/`valid_to` define active intervals. Intervals are updated during commits so historical reads can be served without ambiguity. -6. **Replication of Authoritative State** +7. **Replication of Authoritative State** Shards replicate through write quorum. Metadata converges through commit propagation and gossip. Any node can therefore answer existence/version queries from local replicated metadata. -7. **Retries and Idempotency** +8. **Retries and Idempotency** Safe retries return existing committed outcomes. Duplicate create returns `409`; duplicate identical update is idempotent. This lets clients retry on timeout without risking duplicate state transitions. -8. **Namespace Isolation** +9. **Namespace Isolation** Secrets are separated into logical namespaces (`user:key:version`) allowing different groups to reuse key names. Pre-condition checks are enforced on every request path before shard access. -9. **Deterministic Failure Semantics** +10. **Deterministic Failure Semantics** Precondition failures are stable (`409` for duplicate create, `404` for missing update/retrieve/delete). Equivalent requests against equivalent cluster state produce the same status code. -10. **`.env` Batch Semantics** +11. **`.env` Batch Semantics** `enc(NAME)` and `secret(NAME)` processing is all-or-nothing; failures roll back staged writes. Callers receive either a fully transformed file or a single error response. -11. **Failure Phases for Writes** +12. **Failure Phases for Writes** - **Ordering phase failure**: Kafka commit log write failed; no intent published. - **Writing phase failure**: intent published but write quorum fails; partial writes roll back. Phase separation makes recovery behavior explicit and prevents ambiguous outcomes for in-flight writes. -12. **Recovery and Availability** - Nodes recover from durable storage, and rejoin automatically when healthy. Quorum rules determine whether reads/writes continue or fail fast during degraded periods. +13. **Recovery and Availability** + Nodes recover from durable storage, and rejoin automatically when healthy. Quorum rules determine whether reads/writes continue or fail fast during degraded periods. Read repair improves availability after partial failures by restoring shard redundancy while reads are still reconstructable. + +14. **Repair vs Concurrent Mutation** + Read repair follows snapshot-style GET semantics. If a GET reconstructs a value, it may return that value even if a PUT or DELETE commits immediately afterward. Repair is version-preserving, so a concurrent PUT creates a newer version rather than being overwritten by repair. A concurrent DELETE is not rechecked before returning the already reconstructed GET result. diff --git a/docs/read-repair.md b/docs/read-repair.md deleted file mode 100644 index 7adbeee..0000000 --- a/docs/read-repair.md +++ /dev/null @@ -1,53 +0,0 @@ -# Read Repair Design - -## Goal - -GET requests should keep the cluster healthy when reconstruction is still possible but shard availability is close to the Shamir threshold. - -If a latest-version GET collects only `k` or `k + repairTriggerBuffer` shards, the coordinating node reconstructs the plaintext value in memory, re-splits it into the configured `n` shards for the same version, and stages a best-effort repair before returning the value. - -Repair does not create a new secret version. - -## Trigger - -Read repair is controlled by `ClusterConfig`: - -- `cluster.repairEnabled=true` -- `cluster.repairTriggerBuffer=1` - -Repair is considered only for latest-version reads. Explicit historical version reads and all-version reads do not rewrite stored shards. - -The trigger condition is: - -```text -repairEnabled -AND totalNodes > thresholdK -AND availableShardCount >= thresholdK -AND availableShardCount <= thresholdK + repairTriggerBuffer -``` - -## Flow - -1. `InternalGetService` collects local and peer shards for the latest version. -2. If fewer than `k` shards are available, GET fails as before. -3. If at least `k` shards are available, GET reconstructs the plaintext as before. -4. If shard availability is near threshold, `InternalRepairService`: - - re-splits the plaintext into `n` shards, - - stages the local shard in `PendingActionsBuffer` as `ActionType.REPAIR`, - - sends repair prepare requests to peers, - - publishes a Kafka commit if repair quorum is reached. -5. `RepairCommitHandler` saves the staged shard at the same version. -6. GET returns the reconstructed value even if repair cannot complete. - -Plaintext is never written to durable storage or sent to peer nodes. Peers receive only Shamir shards. - -## Concurrency Semantics - -Repair follows the same prepare + Kafka commit shape used by create, update, and delete. It is intentionally best-effort under the chosen snapshot-style GET semantics: - -- A successful GET returns the value it reconstructed. -- A concurrent PUT may create a newer version while repair is running. Repair still writes only the old version number, so it does not create or overwrite the newer latest version. -- A concurrent DELETE is not rechecked immediately before returning the reconstructed GET value. -- If a repair prepare, quorum, or Kafka publish fails, the repair is discarded and the GET still succeeds. - -This keeps GET latency and behavior predictable while allowing reads to heal weakly replicated latest versions. diff --git a/docs/scope.md b/docs/scope.md index ce86612..d25200e 100644 --- a/docs/scope.md +++ b/docs/scope.md @@ -19,6 +19,7 @@ It will: - Reject update requests for secrets that do not exist - Accept secret retrieval requests for previously stored secrets - Accept requests to retrieve the version history of a secret +- Repair latest-version shard placement during retrieval when the system can still reconstruct the value but available shards are close to the recovery threshold - Accept secret **delete** requests that remove enough stored shards to make reconstruction impossible - Reject delete requests for secrets that do not exist - Accept `.env` file content and: @@ -48,6 +49,7 @@ It will: - Shard secret pieces to peer vault instances - Serve retrieval, history, and delete requests based on authoritative state - Remain available under partial failure (within range of accepted Shamir's recovery threshold) +- Perform best-effort read repair for latest-version reads when only `k` or `k + repairTriggerBuffer` shards are available - Recover state on restart --- @@ -71,6 +73,7 @@ It defines: - How secret deletion is defined and when a secret is considered non-reconstructable - What identifiers are used to reference secrets - How retries and concurrent requests are handled +- How read repair behaves under concurrent updates and deletes - What duplicate and _not found_ errors mean The model must be documented and observable in practice. @@ -87,6 +90,9 @@ It defines: - Shamir's Secret Keeping behavior: - A password is separated on a single node into n (configured by user) parts - The data is sent out to n - 1 other nodes, with 1 piece staying local to the machine that received the request directly +- Read repair behavior: + - Latest-version GET requests that barely meet the reconstruction threshold may re-split the reconstructed value and restore shards for the same version + - Repair does not create a new version and does not apply to historical version reads - no master key required, any node can take requests to decode - The rule that plaintext secret bytes are never written to durable storage or passed to other nodes @@ -103,6 +109,7 @@ It will: - Define delete request and response behavior, including threshold-based deletion success criteria - Specify duplicate and _not found_ error behavior - Describe durability and replication guarantees +- Describe best-effort read repair when shard availability is degraded but still reconstructable - Describe secret-keeping and spreading behavior and failure behavior when referenced secrets cannot be resolved - Describe secret history retrieval semantics, including version ordering and validity timestamps - Describe `.env` encryption and expansion semantics, including secret creation and all-or-nothing failure From 153fec01d25d30cda44dfa4942607968f53fd7ab Mon Sep 17 00:00:00 2001 From: Noam Ben Simon Date: Wed, 20 May 2026 10:02:44 -0400 Subject: [PATCH 3/3] check node liveness before repair --- .../service/internal/InternalGetService.java | 44 ++++++++++++++++--- .../internal/InternalRepairService.java | 5 ++- .../internal/InternalGetServiceTest.java | 25 ++++++++++- .../internal/InternalRepairServiceTest.java | 14 ++++-- 4 files changed, 76 insertions(+), 12 deletions(-) diff --git a/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalGetService.java b/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalGetService.java index a292d11..5a77107 100644 --- a/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalGetService.java +++ b/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalGetService.java @@ -15,6 +15,7 @@ import edu.yu.capstone.DistributedSecretsVault.service.internal.NodeClient.SecretPartsResponse; import edu.yu.capstone.DistributedSecretsVault.service.secret.SecretReconstructionService; +import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -114,12 +115,17 @@ private void validateKey(SecretKey key) { private ReconstructionParts collectPartsForReconstruction(SecretKey key, Long requestedVersion) { Map> partsByVersion = new LinkedHashMap<>(); - addLocalPart(partsByVersion, key, requestedVersion); + Optional localPart = addLocalPart(partsByVersion, key, requestedVersion); + List foundPeerParts = new ArrayList<>(); + int liveMissingPeerParts = 0; for (String peerUrl : resolvePeerUrls()) { SecretPartResponse response = nodeClient.fetchSecretPart(peerUrl, key, requestedVersion); if (response != null && response.found()) { addPart(partsByVersion, response.part()); + foundPeerParts.add(response.part()); + } else if (isMissingPartResponse(response)) { + liveMissingPeerParts++; } } @@ -136,7 +142,11 @@ private ReconstructionParts collectPartsForReconstruction(SecretKey key, Long re if (selectedParts == null || selectedParts.isEmpty()) { throw new SecretNotFoundException(); } - return new ReconstructionParts(requireThreshold(selectedParts), resolvedVersion, selectedParts.size()); + int liveRepairTargets = liveMissingPeerParts + + countFoundPartsMissingVersion(foundPeerParts, resolvedVersion) + + (hasPartForVersion(localPart, resolvedVersion) ? 0 : 1); + return new ReconstructionParts( + requireThreshold(selectedParts), resolvedVersion, selectedParts.size(), liveRepairTargets); } private Map> collectAllPartsByVersion(SecretKey key) { @@ -152,11 +162,13 @@ private Map> collectAllPartsByVersion(SecretKey k return partsByVersion; } - private void addLocalPart(Map> partsByVersion, SecretKey key, Long requestedVersion) { + private Optional addLocalPart(Map> partsByVersion, SecretKey key, + Long requestedVersion) { Optional localPart = requestedVersion == null ? secretPartRepository.findLatest(key) : secretPartRepository.findPart(key, requestedVersion); localPart.ifPresent(part -> addPart(partsByVersion, part)); + return localPart; } private void addLocalVersionParts(Map> partsByVersion, SecretKey key) { @@ -197,7 +209,8 @@ private void maybeRepairLatestRead(SecretKey key, Long requestedVersion, if (requestedVersion != null || internalRepairService == null) { return; } - if (!internalRepairService.shouldRepairLatestRead(reconstructionParts.availableParts())) { + if (!internalRepairService.shouldRepairLatestRead( + reconstructionParts.availableParts(), reconstructionParts.liveRepairTargets())) { return; } try { @@ -226,6 +239,27 @@ private int resolveThreshold(int totalParts) { return threshold; } - private record ReconstructionParts(List selectedParts, long version, int availableParts) { + private boolean isMissingPartResponse(SecretPartResponse response) { + return response != null && response.statusCode() != null && response.statusCode() == 404; + } + + private int countFoundPartsMissingVersion(List foundParts, long resolvedVersion) { + int missing = 0; + for (SecretPart part : foundParts) { + if (!hasPartForVersion(Optional.ofNullable(part), resolvedVersion)) { + missing++; + } + } + return missing; + } + + private boolean hasPartForVersion(Optional part, long version) { + return part.isPresent() + && part.get().getVersion() != null + && part.get().getVersion() == version; + } + + private record ReconstructionParts(List selectedParts, long version, int availableParts, + int liveRepairTargets) { } } diff --git a/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalRepairService.java b/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalRepairService.java index c9dafd2..985c156 100644 --- a/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalRepairService.java +++ b/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalRepairService.java @@ -48,10 +48,13 @@ public InternalRepairService(NodeClient nodeClient, ? envNodeId : "local-node"; } - public boolean shouldRepairLatestRead(int availableParts) { + public boolean shouldRepairLatestRead(int availableParts, int liveRepairTargets) { if (clusterConfig == null || !clusterConfig.isRepairEnabled()) { return false; } + if (liveRepairTargets <= 0) { + return false; + } int totalParts = resolveTotalParts(); int threshold = resolveThreshold(totalParts); if (totalParts <= threshold || availableParts < threshold) { diff --git a/src/test/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalGetServiceTest.java b/src/test/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalGetServiceTest.java index a14d049..3fb25f3 100644 --- a/src/test/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalGetServiceTest.java +++ b/src/test/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalGetServiceTest.java @@ -82,11 +82,13 @@ void testGetAcrossClusterRepairsWeakLatestRead() { SecretPart part = new SecretPart(validKey, 2L, 1, new byte[] { 1, 2 }); SecretPart peerPart = new SecretPart(validKey, 2L, 2, new byte[] { 3, 4 }); when(secretPartRepository.findLatest(validKey)).thenReturn(Optional.of(part)); - when(nodeClient.resolvePeerUrls()).thenReturn(List.of("http://peer1:8080")); + when(nodeClient.resolvePeerUrls()).thenReturn(List.of("http://peer1:8080", "http://peer2:8080")); when(nodeClient.fetchSecretPart("http://peer1:8080", validKey, null)) .thenReturn(SecretPartResponse.found("http://peer1:8080", peerPart)); + when(nodeClient.fetchSecretPart("http://peer2:8080", validKey, null)) + .thenReturn(SecretPartResponse.rejected("http://peer2:8080", 404, "missing")); when(secretReconstructionService.reconstruct(anyList())).thenReturn("reconstructed"); - when(internalRepairService.shouldRepairLatestRead(2)).thenReturn(true); + when(internalRepairService.shouldRepairLatestRead(2, 1)).thenReturn(true); String result = internalGetService.getAcrossCluster(validKey, null); @@ -94,6 +96,25 @@ void testGetAcrossClusterRepairsWeakLatestRead() { verify(internalRepairService).repairLatestVersion(validKey, 2L, "reconstructed"); } + @Test + void testGetAcrossClusterDoesNotTreatDeadPeerAsRepairTarget() { + SecretPart part = new SecretPart(validKey, 2L, 1, new byte[] { 1, 2 }); + SecretPart peerPart = new SecretPart(validKey, 2L, 2, new byte[] { 3, 4 }); + when(secretPartRepository.findLatest(validKey)).thenReturn(Optional.of(part)); + when(nodeClient.resolvePeerUrls()).thenReturn(List.of("http://peer1:8080", "http://peer2:8080")); + when(nodeClient.fetchSecretPart("http://peer1:8080", validKey, null)) + .thenReturn(SecretPartResponse.found("http://peer1:8080", peerPart)); + when(nodeClient.fetchSecretPart("http://peer2:8080", validKey, null)) + .thenReturn(SecretPartResponse.failed("http://peer2:8080", "timeout")); + when(secretReconstructionService.reconstruct(anyList())).thenReturn("reconstructed"); + + String result = internalGetService.getAcrossCluster(validKey, null); + + assertEquals("reconstructed", result); + verify(internalRepairService).shouldRepairLatestRead(2, 0); + verify(internalRepairService, never()).repairLatestVersion(validKey, 2L, "reconstructed"); + } + @Test void testGetAcrossClusterDoesNotRepairSpecificVersionRead() { SecretPart part = new SecretPart(validKey, 1L, 1, new byte[] { 1 }); diff --git a/src/test/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalRepairServiceTest.java b/src/test/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalRepairServiceTest.java index dc2cdc8..b3145cb 100644 --- a/src/test/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalRepairServiceTest.java +++ b/src/test/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalRepairServiceTest.java @@ -61,15 +61,21 @@ void setUp() { @Test void testShouldRepairAtThresholdOrThresholdPlusBuffer() { - assertTrue(service.shouldRepairLatestRead(2)); - assertTrue(service.shouldRepairLatestRead(3)); + assertTrue(service.shouldRepairLatestRead(2, 1)); + assertTrue(service.shouldRepairLatestRead(3, 1)); + } + + @Test + void testShouldNotRepairWithoutLiveRepairTarget() { + assertFalse(service.shouldRepairLatestRead(2, 0)); + assertFalse(service.shouldRepairLatestRead(3, 0)); } @Test void testShouldNotRepairWhenDisabledOrBelowThreshold() { - assertFalse(service.shouldRepairLatestRead(1)); + assertFalse(service.shouldRepairLatestRead(1, 1)); clusterConfig.setRepairEnabled(false); - assertFalse(service.shouldRepairLatestRead(2)); + assertFalse(service.shouldRepairLatestRead(2, 1)); } @Test