diff --git a/docs/architecture.md b/docs/architecture.md
index 68a5a3b..8732cf3 100644
--- a/docs/architecture.md
+++ b/docs/architecture.md
@@ -76,6 +76,9 @@ sequenceDiagram
- The user may request all versions of a secret, which will return a map of version to secret value
- Receiving node requests k - 1 shards from nodes, and gets one from itself (minimum threshold to reconstruct)
- Node reconstructs the plaintext secret in memory using Shamir's algorithm (requires k of n shards)
+- For latest-version reads, the node checks how many shards were actually available. If reconstruction succeeds but the cluster only returned `k` or `k + repairTriggerBuffer` shards, the node performs best-effort read repair before returning the value.
+- Read repair re-splits the reconstructed plaintext in memory and republishes shards for the same version through the internal prepare + Kafka commit flow. It does **not** create a new version.
+- Explicit historical version reads and all-version reads do not trigger repair.
- **Plaintext exists only in memory during reconstruction, never written to disk**
- Node returns the secret value to the client and clears it from memory
@@ -92,6 +95,12 @@ sequenceDiagram
Node->>Cluster: Request k-1 additional shards via ScaleCube
Cluster-->>Node: Return shards (encrypted in transit)
Node->>Node: Reconstruct plaintext in memory
using Shamir's algorithm (k of n shards)
+ opt Latest read has only k or k+buffer shards
+ Node->>Node: Re-split plaintext into same version shards
+ Node->>Cluster: Prepare repair shards
+ Cluster-->>Node: Repair ACKs
+ Node->>Kafka: Publish repair commit
+ end
Node-->>Ingress: Return secret value
Ingress-->>User: Secret value
```
@@ -211,3 +220,5 @@ graph LR
- If failure occurs in the **ordering phase**, no shard writes are committed and the request fails.
- If failure occurs in the **writing phase**, partially written shards are rolled back.
- Recovered nodes rejoin automatically via ScaleCube and synchronize state from Kafka and peers.
+- Latest-version reads can also repair degraded shard placement when at least `k` shards remain. This read repair is best-effort: a successful GET still returns the reconstructed value even if repair cannot reach quorum.
+- Repair uses `ActionType.REPAIR`, stages replacement shards in memory, and commits through Kafka like other internal mutations. The committed shard keeps the existing version number.
diff --git a/docs/challenges.md b/docs/challenges.md
index 64df778..abbbd2d 100644
--- a/docs/challenges.md
+++ b/docs/challenges.md
@@ -6,34 +6,40 @@
2. **Quorum-Based Reconstruction**
Reads collect at least `k` shards and reconstruct only in memory. If fewer than `k` shards are available, the read fails deterministically instead of returning partial or stale data.
-3. **Create vs Update Under Concurrency**
+3. **Read Repair Under Degraded Replication**
+ Latest-version reads also measure how many shards were actually available. If a GET can reconstruct the value but only has `k` or `k + repairTriggerBuffer` shards, the coordinating node performs best-effort read repair before returning. Repair re-splits the reconstructed plaintext in memory and redistributes shards for the same version through the existing prepare + Kafka commit path. It does not create a new version, and it does not apply to explicit historical reads.
+
+4. **Create vs Update Under Concurrency**
Create requires non-existent key; update requires existing key. Both use the same Kafka-based two-phase write flow. This keeps write ordering consistent while preserving operation-specific preconditions.
-4. **Versioning and Time Metadata**
+5. **Versioning and Time Metadata**
The DSV Worker attaches request timestamp metadata. Versions are committed in per-key Kafka order. This avoids relying on a global clock source while maintaining monotonic per-key history.
-5. **History and Validity Intervals**
+6. **History and Validity Intervals**
Each version is independently stored and retrievable. `valid_from`/`valid_to` define active intervals. Intervals are updated during commits so historical reads can be served without ambiguity.
-6. **Replication of Authoritative State**
+7. **Replication of Authoritative State**
Shards replicate through write quorum. Metadata converges through commit propagation and gossip. Any node can therefore answer existence/version queries from local replicated metadata.
-7. **Retries and Idempotency**
+8. **Retries and Idempotency**
Safe retries return existing committed outcomes. Duplicate create returns `409`; duplicate identical update is idempotent. This lets clients retry on timeout without risking duplicate state transitions.
-8. **Namespace Isolation**
+9. **Namespace Isolation**
Secrets are separated into logical namespaces (`user:key:version`) allowing different groups to reuse key names. Pre-condition checks are enforced on every request path before shard access.
-9. **Deterministic Failure Semantics**
+10. **Deterministic Failure Semantics**
Precondition failures are stable (`409` for duplicate create, `404` for missing update/retrieve/delete). Equivalent requests against equivalent cluster state produce the same status code.
-10. **`.env` Batch Semantics**
+11. **`.env` Batch Semantics**
`enc(NAME)` and `secret(NAME)` processing is all-or-nothing; failures roll back staged writes. Callers receive either a fully transformed file or a single error response.
-11. **Failure Phases for Writes**
+12. **Failure Phases for Writes**
- **Ordering phase failure**: Kafka commit log write failed; no intent published.
- **Writing phase failure**: intent published but write quorum fails; partial writes roll back.
Phase separation makes recovery behavior explicit and prevents ambiguous outcomes for in-flight writes.
-12. **Recovery and Availability**
- Nodes recover from durable storage, and rejoin automatically when healthy. Quorum rules determine whether reads/writes continue or fail fast during degraded periods.
+13. **Recovery and Availability**
+ Nodes recover from durable storage, and rejoin automatically when healthy. Quorum rules determine whether reads/writes continue or fail fast during degraded periods. Read repair improves availability after partial failures by restoring shard redundancy while reads are still reconstructable.
+
+14. **Repair vs Concurrent Mutation**
+ Read repair follows snapshot-style GET semantics. If a GET reconstructs a value, it may return that value even if a PUT or DELETE commits immediately afterward. Repair is version-preserving, so a concurrent PUT creates a newer version rather than being overwritten by repair. A concurrent DELETE is not rechecked before returning the already reconstructed GET result.
diff --git a/docs/scope.md b/docs/scope.md
index ce86612..d25200e 100644
--- a/docs/scope.md
+++ b/docs/scope.md
@@ -19,6 +19,7 @@ It will:
- Reject update requests for secrets that do not exist
- Accept secret retrieval requests for previously stored secrets
- Accept requests to retrieve the version history of a secret
+- Repair latest-version shard placement during retrieval when the system can still reconstruct the value but available shards are close to the recovery threshold
- Accept secret **delete** requests that remove enough stored shards to make reconstruction impossible
- Reject delete requests for secrets that do not exist
- Accept `.env` file content and:
@@ -48,6 +49,7 @@ It will:
- Shard secret pieces to peer vault instances
- Serve retrieval, history, and delete requests based on authoritative state
- Remain available under partial failure (within range of accepted Shamir's recovery threshold)
+- Perform best-effort read repair for latest-version reads when only `k` or `k + repairTriggerBuffer` shards are available
- Recover state on restart
---
@@ -71,6 +73,7 @@ It defines:
- How secret deletion is defined and when a secret is considered non-reconstructable
- What identifiers are used to reference secrets
- How retries and concurrent requests are handled
+- How read repair behaves under concurrent updates and deletes
- What duplicate and _not found_ errors mean
The model must be documented and observable in practice.
@@ -87,6 +90,9 @@ It defines:
- Shamir's Secret Keeping behavior:
- A password is separated on a single node into n (configured by user) parts
- The data is sent out to n - 1 other nodes, with 1 piece staying local to the machine that received the request directly
+- Read repair behavior:
+ - Latest-version GET requests that barely meet the reconstruction threshold may re-split the reconstructed value and restore shards for the same version
+ - Repair does not create a new version and does not apply to historical version reads
- no master key required, any node can take requests to decode
- The rule that plaintext secret bytes are never written to durable storage or passed to other nodes
@@ -103,6 +109,7 @@ It will:
- Define delete request and response behavior, including threshold-based deletion success criteria
- Specify duplicate and _not found_ error behavior
- Describe durability and replication guarantees
+- Describe best-effort read repair when shard availability is degraded but still reconstructable
- Describe secret-keeping and spreading behavior and failure behavior when referenced secrets cannot be resolved
- Describe secret history retrieval semantics, including version ordering and validity timestamps
- Describe `.env` encryption and expansion semantics, including secret creation and all-or-nothing failure
diff --git a/src/main/java/edu/yu/capstone/DistributedSecretsVault/config/ClusterConfig.java b/src/main/java/edu/yu/capstone/DistributedSecretsVault/config/ClusterConfig.java
index 1646407..cf80ac0 100644
--- a/src/main/java/edu/yu/capstone/DistributedSecretsVault/config/ClusterConfig.java
+++ b/src/main/java/edu/yu/capstone/DistributedSecretsVault/config/ClusterConfig.java
@@ -14,4 +14,6 @@ public class ClusterConfig {
private int quorumM;
private long lockTimeoutMillis;
private long writeTimeoutMillis;
+ private boolean repairEnabled = true;
+ private int repairTriggerBuffer = 1;
}
diff --git a/src/main/java/edu/yu/capstone/DistributedSecretsVault/controller/InternalController.java b/src/main/java/edu/yu/capstone/DistributedSecretsVault/controller/InternalController.java
index 960ce78..8dc18ad 100644
--- a/src/main/java/edu/yu/capstone/DistributedSecretsVault/controller/InternalController.java
+++ b/src/main/java/edu/yu/capstone/DistributedSecretsVault/controller/InternalController.java
@@ -5,10 +5,12 @@
import edu.yu.capstone.DistributedSecretsVault.dto.internal.DeletePrepareRequest;
import edu.yu.capstone.DistributedSecretsVault.dto.internal.PostPrepareRequest;
import edu.yu.capstone.DistributedSecretsVault.dto.internal.PutPrepareRequest;
+import edu.yu.capstone.DistributedSecretsVault.dto.internal.RepairPrepareRequest;
import edu.yu.capstone.DistributedSecretsVault.service.internal.DeletePrepareHandler;
import edu.yu.capstone.DistributedSecretsVault.service.internal.InternalGetService;
import edu.yu.capstone.DistributedSecretsVault.service.internal.PostPrepareHandler;
import edu.yu.capstone.DistributedSecretsVault.service.internal.PutPrepareHandler;
+import edu.yu.capstone.DistributedSecretsVault.service.internal.RepairPrepareHandler;
import java.util.Map;
import java.util.UUID;
@@ -32,15 +34,18 @@ public class InternalController {
private final PostPrepareHandler postPrepareHandler;
private final PutPrepareHandler putPrepareHandler;
private final DeletePrepareHandler deletePrepareHandler;
+ private final RepairPrepareHandler repairPrepareHandler;
public InternalController(InternalGetService internalGetService,
PostPrepareHandler postPrepareHandler,
PutPrepareHandler putPrepareHandler,
- DeletePrepareHandler deletePrepareHandler) {
+ DeletePrepareHandler deletePrepareHandler,
+ RepairPrepareHandler repairPrepareHandler) {
this.internalGetService = internalGetService;
this.postPrepareHandler = postPrepareHandler;
this.putPrepareHandler = putPrepareHandler;
this.deletePrepareHandler = deletePrepareHandler;
+ this.repairPrepareHandler = repairPrepareHandler;
}
@GetMapping("/{id}")
@@ -68,6 +73,12 @@ public ResponseEntity preparePut(@RequestBody PutPrepareRequest request) {
return ResponseEntity.noContent().build();
}
+ @PostMapping("/repair/prepare")
+ public ResponseEntity prepareRepair(@RequestBody RepairPrepareRequest request) {
+ repairPrepareHandler.handle(request);
+ return ResponseEntity.noContent().build();
+ }
+
@DeleteMapping("/prepare")
public ResponseEntity prepareDelete(
@RequestParam("originatorNodeId") String originatorNodeId,
diff --git a/src/main/java/edu/yu/capstone/DistributedSecretsVault/dto/internal/RepairCommitRequest.java b/src/main/java/edu/yu/capstone/DistributedSecretsVault/dto/internal/RepairCommitRequest.java
new file mode 100644
index 0000000..23697d5
--- /dev/null
+++ b/src/main/java/edu/yu/capstone/DistributedSecretsVault/dto/internal/RepairCommitRequest.java
@@ -0,0 +1,16 @@
+package edu.yu.capstone.DistributedSecretsVault.dto.internal;
+
+import java.util.UUID;
+
+import edu.yu.capstone.DistributedSecretsVault.domain.model.SecretKey;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class RepairCommitRequest {
+ private UUID operationId;
+ private SecretKey secretKey;
+}
diff --git a/src/main/java/edu/yu/capstone/DistributedSecretsVault/dto/internal/RepairPrepareRequest.java b/src/main/java/edu/yu/capstone/DistributedSecretsVault/dto/internal/RepairPrepareRequest.java
new file mode 100644
index 0000000..5751aad
--- /dev/null
+++ b/src/main/java/edu/yu/capstone/DistributedSecretsVault/dto/internal/RepairPrepareRequest.java
@@ -0,0 +1,16 @@
+package edu.yu.capstone.DistributedSecretsVault.dto.internal;
+
+import java.util.UUID;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class RepairPrepareRequest {
+ private String originatorNodeId;
+ private UUID operationId;
+ private SecretPartMessage secretPartMessage;
+}
diff --git a/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/communication/CommitDispatcher.java b/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/communication/CommitDispatcher.java
index e240d5e..aff60c1 100644
--- a/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/communication/CommitDispatcher.java
+++ b/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/communication/CommitDispatcher.java
@@ -8,11 +8,13 @@
import edu.yu.capstone.DistributedSecretsVault.dto.internal.DeleteCommitRequest;
import edu.yu.capstone.DistributedSecretsVault.dto.internal.PostCommitRequest;
import edu.yu.capstone.DistributedSecretsVault.dto.internal.PutCommitRequest;
+import edu.yu.capstone.DistributedSecretsVault.dto.internal.RepairCommitRequest;
import edu.yu.capstone.DistributedSecretsVault.exceptions.InternalOperationConflictException;
import edu.yu.capstone.DistributedSecretsVault.service.internal.ActionType;
import edu.yu.capstone.DistributedSecretsVault.service.internal.DeleteCommitHandler;
import edu.yu.capstone.DistributedSecretsVault.service.internal.PostCommitHandler;
import edu.yu.capstone.DistributedSecretsVault.service.internal.PutCommitHandler;
+import edu.yu.capstone.DistributedSecretsVault.service.internal.RepairCommitHandler;
@Service
public class CommitDispatcher {
@@ -21,13 +23,16 @@ public class CommitDispatcher {
private final DeleteCommitHandler deleteCommitHandler;
private final PostCommitHandler postCommitHandler;
private final PutCommitHandler putCommitHandler;
+ private final RepairCommitHandler repairCommitHandler;
public CommitDispatcher(DeleteCommitHandler deleteCommitHandler,
PostCommitHandler postCommitHandler,
- PutCommitHandler putCommitHandler) {
+ PutCommitHandler putCommitHandler,
+ RepairCommitHandler repairCommitHandler) {
this.deleteCommitHandler = deleteCommitHandler;
this.postCommitHandler = postCommitHandler;
this.putCommitHandler = putCommitHandler;
+ this.repairCommitHandler = repairCommitHandler;
}
public void dispatch(CommitMessage message) {
@@ -39,6 +44,8 @@ public void dispatch(CommitMessage message) {
postCommitHandler.handle(new PostCommitRequest(message.getOperationId(), message.getSecretKey()));
} else if (message.getActionType() == ActionType.PUT) {
putCommitHandler.handle(new PutCommitRequest(message.getOperationId(), message.getSecretKey()));
+ } else if (message.getActionType() == ActionType.REPAIR) {
+ repairCommitHandler.handle(new RepairCommitRequest(message.getOperationId(), message.getSecretKey()));
} else {
log.warn("Ignoring unsupported commit action type: operationId={}, actionType={}",
message.getOperationId(), message.getActionType());
diff --git a/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/ActionType.java b/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/ActionType.java
index 47e4c27..edc5c08 100644
--- a/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/ActionType.java
+++ b/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/ActionType.java
@@ -7,5 +7,6 @@
public enum ActionType {
POST,
DELETE,
- PUT
+ PUT,
+ REPAIR
}
diff --git a/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalGetService.java b/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalGetService.java
index 02d3cfc..5a77107 100644
--- a/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalGetService.java
+++ b/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalGetService.java
@@ -1,6 +1,8 @@
package edu.yu.capstone.DistributedSecretsVault.service.internal;
import org.springframework.http.ResponseEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import edu.yu.capstone.DistributedSecretsVault.config.ClusterConfig;
@@ -13,6 +15,7 @@
import edu.yu.capstone.DistributedSecretsVault.service.internal.NodeClient.SecretPartsResponse;
import edu.yu.capstone.DistributedSecretsVault.service.secret.SecretReconstructionService;
+import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -21,25 +24,32 @@
@Service
public class InternalGetService {
+ private static final Logger log = LoggerFactory.getLogger(InternalGetService.class);
+
private final SecretPartRepository secretPartRepository;
private final SecretReconstructionService secretReconstructionService;
private final NodeClient nodeClient;
private final ClusterConfig clusterConfig;
+ private final InternalRepairService internalRepairService;
public InternalGetService(SecretPartRepository secretPartRepository,
SecretReconstructionService secretReconstructionService,
NodeClient nodeClient,
- ClusterConfig clusterConfig) {
+ ClusterConfig clusterConfig,
+ InternalRepairService internalRepairService) {
this.secretPartRepository = secretPartRepository;
this.secretReconstructionService = secretReconstructionService;
this.nodeClient = nodeClient;
this.clusterConfig = clusterConfig;
+ this.internalRepairService = internalRepairService;
}
public String getAcrossCluster(SecretKey key, Long version) {
validateKey(key);
- List selected = collectPartsForReconstruction(key, version);
- return secretReconstructionService.reconstruct(selected);
+ ReconstructionParts reconstructionParts = collectPartsForReconstruction(key, version);
+ String value = secretReconstructionService.reconstruct(reconstructionParts.selectedParts());
+ maybeRepairLatestRead(key, version, reconstructionParts, value);
+ return value;
}
public Map getAllVersionsAcrossCluster(SecretKey key) {
@@ -103,14 +113,19 @@ private void validateKey(SecretKey key) {
}
}
- private List collectPartsForReconstruction(SecretKey key, Long requestedVersion) {
+ private ReconstructionParts collectPartsForReconstruction(SecretKey key, Long requestedVersion) {
Map> partsByVersion = new LinkedHashMap<>();
- addLocalPart(partsByVersion, key, requestedVersion);
+ Optional localPart = addLocalPart(partsByVersion, key, requestedVersion);
+ List foundPeerParts = new ArrayList<>();
+ int liveMissingPeerParts = 0;
for (String peerUrl : resolvePeerUrls()) {
SecretPartResponse response = nodeClient.fetchSecretPart(peerUrl, key, requestedVersion);
if (response != null && response.found()) {
addPart(partsByVersion, response.part());
+ foundPeerParts.add(response.part());
+ } else if (isMissingPartResponse(response)) {
+ liveMissingPeerParts++;
}
}
@@ -127,7 +142,11 @@ private List collectPartsForReconstruction(SecretKey key, Long reque
if (selectedParts == null || selectedParts.isEmpty()) {
throw new SecretNotFoundException();
}
- return requireThreshold(selectedParts);
+ int liveRepairTargets = liveMissingPeerParts
+ + countFoundPartsMissingVersion(foundPeerParts, resolvedVersion)
+ + (hasPartForVersion(localPart, resolvedVersion) ? 0 : 1);
+ return new ReconstructionParts(
+ requireThreshold(selectedParts), resolvedVersion, selectedParts.size(), liveRepairTargets);
}
private Map> collectAllPartsByVersion(SecretKey key) {
@@ -143,11 +162,13 @@ private Map> collectAllPartsByVersion(SecretKey k
return partsByVersion;
}
- private void addLocalPart(Map> partsByVersion, SecretKey key, Long requestedVersion) {
+ private Optional addLocalPart(Map> partsByVersion, SecretKey key,
+ Long requestedVersion) {
Optional localPart = requestedVersion == null
? secretPartRepository.findLatest(key)
: secretPartRepository.findPart(key, requestedVersion);
localPart.ifPresent(part -> addPart(partsByVersion, part));
+ return localPart;
}
private void addLocalVersionParts(Map> partsByVersion, SecretKey key) {
@@ -183,6 +204,23 @@ private List resolvePeerUrls() {
return nodeClient == null ? List.of() : nodeClient.resolvePeerUrls();
}
+ private void maybeRepairLatestRead(SecretKey key, Long requestedVersion,
+ ReconstructionParts reconstructionParts, String value) {
+ if (requestedVersion != null || internalRepairService == null) {
+ return;
+ }
+ if (!internalRepairService.shouldRepairLatestRead(
+ reconstructionParts.availableParts(), reconstructionParts.liveRepairTargets())) {
+ return;
+ }
+ try {
+ internalRepairService.repairLatestVersion(key, reconstructionParts.version(), value);
+ } catch (RuntimeException e) {
+ log.warn("Read repair skipped after successful reconstruction: key={}, version={}, reason={}",
+ key, reconstructionParts.version(), e.getMessage());
+ }
+ }
+
private int resolveTotalParts() {
if (clusterConfig == null || clusterConfig.getTotalNodes() <= 0) {
return 1;
@@ -200,4 +238,28 @@ private int resolveThreshold(int totalParts) {
}
return threshold;
}
+
+ private boolean isMissingPartResponse(SecretPartResponse response) {
+ return response != null && response.statusCode() != null && response.statusCode() == 404;
+ }
+
+ private int countFoundPartsMissingVersion(List foundParts, long resolvedVersion) {
+ int missing = 0;
+ for (SecretPart part : foundParts) {
+ if (!hasPartForVersion(Optional.ofNullable(part), resolvedVersion)) {
+ missing++;
+ }
+ }
+ return missing;
+ }
+
+ private boolean hasPartForVersion(Optional part, long version) {
+ return part.isPresent()
+ && part.get().getVersion() != null
+ && part.get().getVersion() == version;
+ }
+
+ private record ReconstructionParts(List selectedParts, long version, int availableParts,
+ int liveRepairTargets) {
+ }
}
diff --git a/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalRepairService.java b/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalRepairService.java
new file mode 100644
index 0000000..985c156
--- /dev/null
+++ b/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalRepairService.java
@@ -0,0 +1,155 @@
+package edu.yu.capstone.DistributedSecretsVault.service.internal;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import edu.yu.capstone.DistributedSecretsVault.config.ClusterConfig;
+import edu.yu.capstone.DistributedSecretsVault.domain.model.SecretKey;
+import edu.yu.capstone.DistributedSecretsVault.domain.model.SecretPart;
+import edu.yu.capstone.DistributedSecretsVault.dto.internal.CommitMessage;
+import edu.yu.capstone.DistributedSecretsVault.dto.internal.RepairPrepareRequest;
+import edu.yu.capstone.DistributedSecretsVault.dto.internal.SecretPartMessage;
+import edu.yu.capstone.DistributedSecretsVault.service.communication.CommitPublisher;
+import edu.yu.capstone.DistributedSecretsVault.service.internal.NodeClient.PeerResponse;
+import edu.yu.capstone.DistributedSecretsVault.service.secret.SecretSharingService;
+
+@Service
+public class InternalRepairService {
+ private static final Logger log = LoggerFactory.getLogger(InternalRepairService.class);
+
+ private final NodeClient nodeClient;
+ private final SecretSharingService secretSharingService;
+ private final PendingActionsBuffer pendingActionsBuffer;
+ private final CommitPublisher commitPublisher;
+ private final ClusterConfig clusterConfig;
+ private final String nodeId;
+
+ public InternalRepairService(NodeClient nodeClient,
+ SecretSharingService secretSharingService,
+ PendingActionsBuffer pendingActionsBuffer,
+ CommitPublisher commitPublisher,
+ ClusterConfig clusterConfig) {
+ this.nodeClient = nodeClient;
+ this.secretSharingService = secretSharingService;
+ this.pendingActionsBuffer = pendingActionsBuffer;
+ this.commitPublisher = commitPublisher;
+ this.clusterConfig = clusterConfig;
+
+ String envNodeId = System.getenv("NODE_NAME");
+ if (envNodeId == null || envNodeId.isBlank()) {
+ envNodeId = System.getProperty("NODE_NAME");
+ }
+ this.nodeId = (envNodeId != null && !envNodeId.isBlank())
+ ? envNodeId : "local-node";
+ }
+
+ public boolean shouldRepairLatestRead(int availableParts, int liveRepairTargets) {
+ if (clusterConfig == null || !clusterConfig.isRepairEnabled()) {
+ return false;
+ }
+ if (liveRepairTargets <= 0) {
+ return false;
+ }
+ int totalParts = resolveTotalParts();
+ int threshold = resolveThreshold(totalParts);
+ if (totalParts <= threshold || availableParts < threshold) {
+ return false;
+ }
+ int buffer = Math.max(clusterConfig.getRepairTriggerBuffer(), 0);
+ return availableParts <= threshold + buffer;
+ }
+
+ public void repairLatestVersion(SecretKey key, long version, String value) {
+ if (key == null || value == null) {
+ throw new IllegalArgumentException("Secret key and value are required for repair");
+ }
+
+ UUID operationId = UUID.randomUUID();
+ List parts = createParts(key, value, version);
+ SecretPart localPart = parts.get(0);
+ List peerParts = parts.subList(1, parts.size());
+ List peerUrls = nodeClient.resolvePeerUrls();
+
+ log.info("Starting read repair: operationId={}, key={}, version={}", operationId, key, version);
+ pendingActionsBuffer.bufferAction(operationId, key, ActionType.REPAIR, localPart);
+
+ int peerAcks = broadcastPrepare(peerUrls, peerParts, operationId);
+ int totalAcks = peerAcks + 1;
+ int requiredAcks = computeRequiredAcks();
+ if (totalAcks < requiredAcks) {
+ pendingActionsBuffer.discard(operationId);
+ log.warn("Read repair skipped because quorum was not reached: operationId={}, totalAcks={}, requiredAcks={}",
+ operationId, totalAcks, requiredAcks);
+ return;
+ }
+
+ try {
+ commitPublisher.broadcastCommit(new CommitMessage(operationId, key, ActionType.REPAIR));
+ log.info("Read repair commit submitted to Kafka: operationId={}, version={}", operationId, version);
+ } catch (RuntimeException e) {
+ pendingActionsBuffer.discard(operationId);
+ log.warn("Read repair skipped because commit publish failed: operationId={}, reason={}",
+ operationId, e.getMessage());
+ }
+ }
+
+ private int broadcastPrepare(List peerUrls, List peerParts, UUID operationId) {
+ int acks = 0;
+ int count = Math.min(peerUrls.size(), peerParts.size());
+ for (int i = 0; i < count; i++) {
+ SecretPart part = peerParts.get(i);
+ SecretPartMessage message = new SecretPartMessage(
+ part.getKey(),
+ part.getVersion(),
+ part.getShard(),
+ System.currentTimeMillis(),
+ part.getPartIndex());
+ RepairPrepareRequest request = new RepairPrepareRequest(nodeId, operationId, message);
+ PeerResponse response = nodeClient.sendRepairPrepare(peerUrls.get(i), request);
+ if (response.acknowledged()) {
+ acks++;
+ } else {
+ log.debug("Repair prepare was not acknowledged by peer={}, status={}, error={}",
+ response.peerUrl(), response.statusCode(), response.errorMessage());
+ }
+ }
+ return acks;
+ }
+
+ private List createParts(SecretKey key, String value, long version) {
+ int totalParts = resolveTotalParts();
+ int threshold = resolveThreshold(totalParts);
+ return secretSharingService.split(key, value, threshold, totalParts).stream()
+ .peek(part -> part.setVersion(version))
+ .sorted(Comparator.comparingInt(SecretPart::getPartIndex))
+ .toList();
+ }
+
+ private int computeRequiredAcks() {
+ int required = clusterConfig == null ? 0 : clusterConfig.getQuorumM();
+ return Math.max(required, 1);
+ }
+
+ private int resolveTotalParts() {
+ if (clusterConfig == null || clusterConfig.getTotalNodes() <= 0) {
+ return 1;
+ }
+ return clusterConfig.getTotalNodes();
+ }
+
+ private int resolveThreshold(int totalParts) {
+ int threshold = clusterConfig == null ? 0 : clusterConfig.getThresholdK();
+ if (threshold <= 0) {
+ threshold = 1;
+ }
+ if (threshold > totalParts) {
+ threshold = totalParts;
+ }
+ return threshold;
+ }
+}
diff --git a/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/NodeClient.java b/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/NodeClient.java
index 36b0c6f..d2280bd 100644
--- a/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/NodeClient.java
+++ b/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/NodeClient.java
@@ -19,6 +19,7 @@
import edu.yu.capstone.DistributedSecretsVault.dto.internal.DeletePrepareRequest;
import edu.yu.capstone.DistributedSecretsVault.dto.internal.PostPrepareRequest;
import edu.yu.capstone.DistributedSecretsVault.dto.internal.PutPrepareRequest;
+import edu.yu.capstone.DistributedSecretsVault.dto.internal.RepairPrepareRequest;
import io.scalecube.services.Microservices;
/**
@@ -124,6 +125,24 @@ public PeerResponse sendPutPrepare(String peerUrl, PutPrepareRequest request) {
}
}
+ public PeerResponse sendRepairPrepare(String peerUrl, RepairPrepareRequest request) {
+ try {
+ restClient.post()
+ .uri(peerUrl + "/internal/repair/prepare")
+ .body(request)
+ .retrieve()
+ .toBodilessEntity();
+ log.debug("Repair prepare ACK received from {}", peerUrl);
+ return PeerResponse.acknowledged(peerUrl);
+ } catch (RestClientResponseException ex) {
+ log.warn("Repair prepare rejected by {} with HTTP {}", peerUrl, ex.getStatusCode().value());
+ return PeerResponse.rejected(peerUrl, ex.getStatusCode().value(), ex.getResponseBodyAsString());
+ } catch (Exception ex) {
+ log.warn("Failed to send repair prepare to {}: {}", peerUrl, ex.getMessage());
+ return PeerResponse.failed(peerUrl, ex.getMessage());
+ }
+ }
+
public SecretPartResponse fetchSecretPart(String peerUrl, SecretKey key, Long version) {
try {
SecretPart part;
diff --git a/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/RepairCommitHandler.java b/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/RepairCommitHandler.java
new file mode 100644
index 0000000..7c5dbd8
--- /dev/null
+++ b/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/RepairCommitHandler.java
@@ -0,0 +1,58 @@
+package edu.yu.capstone.DistributedSecretsVault.service.internal;
+
+import org.springframework.stereotype.Service;
+
+import edu.yu.capstone.DistributedSecretsVault.domain.model.SecretPart;
+import edu.yu.capstone.DistributedSecretsVault.dto.internal.RepairCommitRequest;
+import edu.yu.capstone.DistributedSecretsVault.exceptions.InternalOperationConflictException;
+import edu.yu.capstone.DistributedSecretsVault.repository.SecretPartRepository;
+import edu.yu.capstone.DistributedSecretsVault.service.internal.PendingActionsBuffer.PendingAction;
+
+@Service
+public class RepairCommitHandler {
+ private final PendingActionsBuffer pendingActionsBuffer;
+ private final SecretPartRepository secretPartRepository;
+
+ public RepairCommitHandler(PendingActionsBuffer pendingActionsBuffer,
+ SecretPartRepository secretPartRepository) {
+ this.pendingActionsBuffer = pendingActionsBuffer;
+ this.secretPartRepository = secretPartRepository;
+ }
+
+ public void handle(RepairCommitRequest request) {
+ validateRequest(request);
+
+ PendingAction committed = pendingActionsBuffer.commitAndRemove(request.getOperationId());
+ if (committed == null) {
+ throw new InternalOperationConflictException(
+ "No staged operation found for repair commit: " + request.getOperationId());
+ }
+ if (committed.actionType() != ActionType.REPAIR) {
+ throw new InternalOperationConflictException(
+ "Staged operation is not a repair: " + request.getOperationId());
+ }
+ if (!committed.secretKey().equals(request.getSecretKey())) {
+ throw new InternalOperationConflictException(
+ "Commit secret key does not match staged repair: " + request.getOperationId());
+ }
+ SecretPart part = committed.secretPart();
+ if (part == null || part.getVersion() == null) {
+ throw new InternalOperationConflictException(
+ "No staged shard found for repair commit: " + request.getOperationId());
+ }
+
+ secretPartRepository.savePart(part);
+ }
+
+ private void validateRequest(RepairCommitRequest request) {
+ if (request == null) {
+ throw new IllegalArgumentException("Repair commit request is required");
+ }
+ if (request.getOperationId() == null) {
+ throw new IllegalArgumentException("Operation ID is required");
+ }
+ if (request.getSecretKey() == null) {
+ throw new IllegalArgumentException("Secret key is required");
+ }
+ }
+}
diff --git a/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/RepairPrepareHandler.java b/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/RepairPrepareHandler.java
new file mode 100644
index 0000000..28803a1
--- /dev/null
+++ b/src/main/java/edu/yu/capstone/DistributedSecretsVault/service/internal/RepairPrepareHandler.java
@@ -0,0 +1,51 @@
+package edu.yu.capstone.DistributedSecretsVault.service.internal;
+
+import org.springframework.stereotype.Service;
+
+import edu.yu.capstone.DistributedSecretsVault.domain.model.SecretPart;
+import edu.yu.capstone.DistributedSecretsVault.dto.internal.RepairPrepareRequest;
+import edu.yu.capstone.DistributedSecretsVault.dto.internal.SecretPartMessage;
+
+@Service
+public class RepairPrepareHandler {
+ private final PendingActionsBuffer pendingActionsBuffer;
+
+ public RepairPrepareHandler(PendingActionsBuffer pendingActionsBuffer) {
+ this.pendingActionsBuffer = pendingActionsBuffer;
+ }
+
+ public void handle(RepairPrepareRequest request) {
+ validateRequest(request);
+
+ SecretPartMessage message = request.getSecretPartMessage();
+ SecretPart part = new SecretPart(
+ message.getKey(),
+ message.getVersion(),
+ message.getPartIndex(),
+ message.getShard());
+ pendingActionsBuffer.bufferAction(
+ request.getOperationId(), message.getKey(), ActionType.REPAIR, part);
+ }
+
+ private void validateRequest(RepairPrepareRequest request) {
+ if (request == null) {
+ throw new IllegalArgumentException("Repair prepare request is required");
+ }
+ if (request.getOperationId() == null) {
+ throw new IllegalArgumentException("Operation ID is required");
+ }
+ SecretPartMessage message = request.getSecretPartMessage();
+ if (message == null) {
+ throw new IllegalArgumentException("Secret part message is required");
+ }
+ if (message.getKey() == null) {
+ throw new IllegalArgumentException("Secret key is required");
+ }
+ if (message.getVersion() == null) {
+ throw new IllegalArgumentException("Secret version is required");
+ }
+ if (message.getShard() == null) {
+ throw new IllegalArgumentException("Secret value is required");
+ }
+ }
+}
diff --git a/src/test/java/edu/yu/capstone/DistributedSecretsVault/controller/InternalControllerDeleteTest.java b/src/test/java/edu/yu/capstone/DistributedSecretsVault/controller/InternalControllerDeleteTest.java
index 09fbdd1..815ed39 100644
--- a/src/test/java/edu/yu/capstone/DistributedSecretsVault/controller/InternalControllerDeleteTest.java
+++ b/src/test/java/edu/yu/capstone/DistributedSecretsVault/controller/InternalControllerDeleteTest.java
@@ -5,6 +5,7 @@
import edu.yu.capstone.DistributedSecretsVault.service.internal.InternalGetService;
import edu.yu.capstone.DistributedSecretsVault.service.internal.PostPrepareHandler;
import edu.yu.capstone.DistributedSecretsVault.service.internal.PutPrepareHandler;
+import edu.yu.capstone.DistributedSecretsVault.service.internal.RepairPrepareHandler;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
@@ -39,6 +40,9 @@ public class InternalControllerDeleteTest {
@MockitoBean
private DeletePrepareHandler deletePrepareHandler;
+ @MockitoBean
+ private RepairPrepareHandler repairPrepareHandler;
+
@Test
void testPrepareDeleteReturnsNoContent() throws Exception {
doNothing().when(deletePrepareHandler).handle(any(DeletePrepareRequest.class));
diff --git a/src/test/java/edu/yu/capstone/DistributedSecretsVault/controller/InternalControllerPostTest.java b/src/test/java/edu/yu/capstone/DistributedSecretsVault/controller/InternalControllerPostTest.java
index 0b78040..b02ae86 100644
--- a/src/test/java/edu/yu/capstone/DistributedSecretsVault/controller/InternalControllerPostTest.java
+++ b/src/test/java/edu/yu/capstone/DistributedSecretsVault/controller/InternalControllerPostTest.java
@@ -19,6 +19,7 @@
import edu.yu.capstone.DistributedSecretsVault.service.internal.InternalGetService;
import edu.yu.capstone.DistributedSecretsVault.service.internal.PostPrepareHandler;
import edu.yu.capstone.DistributedSecretsVault.service.internal.PutPrepareHandler;
+import edu.yu.capstone.DistributedSecretsVault.service.internal.RepairPrepareHandler;
@WebMvcTest(InternalController.class)
@Tag("slice")
@@ -38,6 +39,9 @@ public class InternalControllerPostTest {
@MockitoBean
private DeletePrepareHandler deletePrepareHandler;
+ @MockitoBean
+ private RepairPrepareHandler repairPrepareHandler;
+
@Test
void testPreparePostReturnsNoContent() throws Exception {
doNothing().when(postPrepareHandler).handle(any(PostPrepareRequest.class));
diff --git a/src/test/java/edu/yu/capstone/DistributedSecretsVault/controller/InternalControllerPutTest.java b/src/test/java/edu/yu/capstone/DistributedSecretsVault/controller/InternalControllerPutTest.java
index c627b0e..8551fad 100644
--- a/src/test/java/edu/yu/capstone/DistributedSecretsVault/controller/InternalControllerPutTest.java
+++ b/src/test/java/edu/yu/capstone/DistributedSecretsVault/controller/InternalControllerPutTest.java
@@ -19,6 +19,7 @@
import edu.yu.capstone.DistributedSecretsVault.service.internal.InternalGetService;
import edu.yu.capstone.DistributedSecretsVault.service.internal.PostPrepareHandler;
import edu.yu.capstone.DistributedSecretsVault.service.internal.PutPrepareHandler;
+import edu.yu.capstone.DistributedSecretsVault.service.internal.RepairPrepareHandler;
@WebMvcTest(InternalController.class)
@Tag("slice")
@@ -38,6 +39,9 @@ public class InternalControllerPutTest {
@MockitoBean
private DeletePrepareHandler deletePrepareHandler;
+ @MockitoBean
+ private RepairPrepareHandler repairPrepareHandler;
+
@Test
void testPreparePutReturnsNoContent() throws Exception {
doNothing().when(putPrepareHandler).handle(any(PutPrepareRequest.class));
diff --git a/src/test/java/edu/yu/capstone/DistributedSecretsVault/service/communication/CommitDispatcherTest.java b/src/test/java/edu/yu/capstone/DistributedSecretsVault/service/communication/CommitDispatcherTest.java
index c1a15d7..02b86d0 100644
--- a/src/test/java/edu/yu/capstone/DistributedSecretsVault/service/communication/CommitDispatcherTest.java
+++ b/src/test/java/edu/yu/capstone/DistributedSecretsVault/service/communication/CommitDispatcherTest.java
@@ -18,11 +18,13 @@
import edu.yu.capstone.DistributedSecretsVault.dto.internal.DeleteCommitRequest;
import edu.yu.capstone.DistributedSecretsVault.dto.internal.PostCommitRequest;
import edu.yu.capstone.DistributedSecretsVault.dto.internal.PutCommitRequest;
+import edu.yu.capstone.DistributedSecretsVault.dto.internal.RepairCommitRequest;
import edu.yu.capstone.DistributedSecretsVault.exceptions.InternalOperationConflictException;
import edu.yu.capstone.DistributedSecretsVault.service.internal.ActionType;
import edu.yu.capstone.DistributedSecretsVault.service.internal.DeleteCommitHandler;
import edu.yu.capstone.DistributedSecretsVault.service.internal.PostCommitHandler;
import edu.yu.capstone.DistributedSecretsVault.service.internal.PutCommitHandler;
+import edu.yu.capstone.DistributedSecretsVault.service.internal.RepairCommitHandler;
/**
* Unit tests for {@link CommitDispatcher}.
@@ -41,11 +43,14 @@ public class CommitDispatcherTest {
@Mock
private PutCommitHandler putCommitHandler;
+ @Mock
+ private RepairCommitHandler repairCommitHandler;
+
private CommitDispatcher dispatcher;
@BeforeEach
void setUp() {
- dispatcher = new CommitDispatcher(deleteCommitHandler, postCommitHandler, putCommitHandler);
+ dispatcher = new CommitDispatcher(deleteCommitHandler, postCommitHandler, putCommitHandler, repairCommitHandler);
}
// ── Routing ─────────────────────────────────────────────────────────
@@ -86,6 +91,19 @@ void testDispatchesPutToPutHandler() {
verify(postCommitHandler, never()).handle(any());
}
+ @Test
+ void testDispatchesRepairToRepairHandler() {
+ CommitMessage msg = new CommitMessage(
+ UUID.randomUUID(), new SecretKey("user1", "secret1"), ActionType.REPAIR);
+
+ dispatcher.dispatch(msg);
+
+ verify(repairCommitHandler).handle(any(RepairCommitRequest.class));
+ verify(deleteCommitHandler, never()).handle(any());
+ verify(postCommitHandler, never()).handle(any());
+ verify(putCommitHandler, never()).handle(any());
+ }
+
// ── Stale / Conflicting Commits ─────────────────────────────────────
@Test
@@ -119,6 +137,16 @@ void testStalePutCommitIsCaughtAndLogged() {
assertDoesNotThrow(() -> dispatcher.dispatch(msg));
}
+ @Test
+ void testStaleRepairCommitIsCaughtAndLogged() {
+ CommitMessage msg = new CommitMessage(
+ UUID.randomUUID(), new SecretKey("user1", "secret1"), ActionType.REPAIR);
+ doThrow(new InternalOperationConflictException("No staged operation found"))
+ .when(repairCommitHandler).handle(any(RepairCommitRequest.class));
+
+ assertDoesNotThrow(() -> dispatcher.dispatch(msg));
+ }
+
// ── Validation ──────────────────────────────────────────────────────
@Test
diff --git a/src/test/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalGetServiceTest.java b/src/test/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalGetServiceTest.java
index c97bed8..3fb25f3 100644
--- a/src/test/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalGetServiceTest.java
+++ b/src/test/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalGetServiceTest.java
@@ -26,6 +26,7 @@
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -40,6 +41,9 @@ public class InternalGetServiceTest {
@Mock
private NodeClient nodeClient;
+ @Mock
+ private InternalRepairService internalRepairService;
+
private ClusterConfig clusterConfig;
private InternalGetService internalGetService;
@@ -52,7 +56,7 @@ void setUp() {
clusterConfig.setTotalNodes(3);
clusterConfig.setThresholdK(2);
internalGetService = new InternalGetService(secretPartRepository, secretReconstructionService,
- nodeClient, clusterConfig);
+ nodeClient, clusterConfig, internalRepairService);
lenient().when(nodeClient.resolvePeerUrls()).thenReturn(List.of());
validKey = new SecretKey("user1", "secret1");
}
@@ -73,6 +77,60 @@ void testGetAcrossClusterLatestVersion() {
verify(secretReconstructionService).reconstruct(argThat(parts -> parts.size() == 2));
}
+ @Test
+ void testGetAcrossClusterRepairsWeakLatestRead() {
+ SecretPart part = new SecretPart(validKey, 2L, 1, new byte[] { 1, 2 });
+ SecretPart peerPart = new SecretPart(validKey, 2L, 2, new byte[] { 3, 4 });
+ when(secretPartRepository.findLatest(validKey)).thenReturn(Optional.of(part));
+ when(nodeClient.resolvePeerUrls()).thenReturn(List.of("http://peer1:8080", "http://peer2:8080"));
+ when(nodeClient.fetchSecretPart("http://peer1:8080", validKey, null))
+ .thenReturn(SecretPartResponse.found("http://peer1:8080", peerPart));
+ when(nodeClient.fetchSecretPart("http://peer2:8080", validKey, null))
+ .thenReturn(SecretPartResponse.rejected("http://peer2:8080", 404, "missing"));
+ when(secretReconstructionService.reconstruct(anyList())).thenReturn("reconstructed");
+ when(internalRepairService.shouldRepairLatestRead(2, 1)).thenReturn(true);
+
+ String result = internalGetService.getAcrossCluster(validKey, null);
+
+ assertEquals("reconstructed", result);
+ verify(internalRepairService).repairLatestVersion(validKey, 2L, "reconstructed");
+ }
+
+ @Test
+ void testGetAcrossClusterDoesNotTreatDeadPeerAsRepairTarget() {
+ SecretPart part = new SecretPart(validKey, 2L, 1, new byte[] { 1, 2 });
+ SecretPart peerPart = new SecretPart(validKey, 2L, 2, new byte[] { 3, 4 });
+ when(secretPartRepository.findLatest(validKey)).thenReturn(Optional.of(part));
+ when(nodeClient.resolvePeerUrls()).thenReturn(List.of("http://peer1:8080", "http://peer2:8080"));
+ when(nodeClient.fetchSecretPart("http://peer1:8080", validKey, null))
+ .thenReturn(SecretPartResponse.found("http://peer1:8080", peerPart));
+ when(nodeClient.fetchSecretPart("http://peer2:8080", validKey, null))
+ .thenReturn(SecretPartResponse.failed("http://peer2:8080", "timeout"));
+ when(secretReconstructionService.reconstruct(anyList())).thenReturn("reconstructed");
+
+ String result = internalGetService.getAcrossCluster(validKey, null);
+
+ assertEquals("reconstructed", result);
+ verify(internalRepairService).shouldRepairLatestRead(2, 0);
+ verify(internalRepairService, never()).repairLatestVersion(validKey, 2L, "reconstructed");
+ }
+
+ @Test
+ void testGetAcrossClusterDoesNotRepairSpecificVersionRead() {
+ SecretPart part = new SecretPart(validKey, 1L, 1, new byte[] { 1 });
+ SecretPart peerPart = new SecretPart(validKey, 1L, 2, new byte[] { 2 });
+ when(secretPartRepository.findPart(validKey, 1L)).thenReturn(Optional.of(part));
+ when(nodeClient.resolvePeerUrls()).thenReturn(List.of("http://peer1:8080"));
+ when(nodeClient.fetchSecretPart("http://peer1:8080", validKey, 1L))
+ .thenReturn(SecretPartResponse.found("http://peer1:8080", peerPart));
+ when(secretReconstructionService.reconstruct(anyList())).thenReturn("v1-secret");
+
+ String result = internalGetService.getAcrossCluster(validKey, 1L);
+
+ assertEquals("v1-secret", result);
+ verify(internalRepairService, never()).repairLatestVersion(validKey, 1L, "v1-secret");
+ }
+
@Test
void testGetAcrossClusterSpecificVersion() {
SecretPart part = new SecretPart(validKey, 1L, 1, new byte[] { 1 });
diff --git a/src/test/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalRepairServiceTest.java b/src/test/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalRepairServiceTest.java
new file mode 100644
index 0000000..b3145cb
--- /dev/null
+++ b/src/test/java/edu/yu/capstone/DistributedSecretsVault/service/internal/InternalRepairServiceTest.java
@@ -0,0 +1,113 @@
+package edu.yu.capstone.DistributedSecretsVault.service.internal;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import edu.yu.capstone.DistributedSecretsVault.config.ClusterConfig;
+import edu.yu.capstone.DistributedSecretsVault.domain.model.SecretKey;
+import edu.yu.capstone.DistributedSecretsVault.domain.model.SecretPart;
+import edu.yu.capstone.DistributedSecretsVault.dto.internal.CommitMessage;
+import edu.yu.capstone.DistributedSecretsVault.dto.internal.RepairPrepareRequest;
+import edu.yu.capstone.DistributedSecretsVault.service.communication.CommitPublisher;
+import edu.yu.capstone.DistributedSecretsVault.service.internal.NodeClient.PeerResponse;
+import edu.yu.capstone.DistributedSecretsVault.service.secret.SecretSharingService;
+
+@ExtendWith(MockitoExtension.class)
+@Tag("unit")
+public class InternalRepairServiceTest {
+ @Mock
+ private NodeClient nodeClient;
+
+ @Mock
+ private SecretSharingService secretSharingService;
+
+ @Mock
+ private PendingActionsBuffer pendingActionsBuffer;
+
+ @Mock
+ private CommitPublisher commitPublisher;
+
+ private ClusterConfig clusterConfig;
+ private InternalRepairService service;
+ private SecretKey key;
+
+ @BeforeEach
+ void setUp() {
+ clusterConfig = new ClusterConfig();
+ clusterConfig.setTotalNodes(3);
+ clusterConfig.setThresholdK(2);
+ clusterConfig.setQuorumM(2);
+ clusterConfig.setRepairEnabled(true);
+ clusterConfig.setRepairTriggerBuffer(1);
+ service = new InternalRepairService(nodeClient, secretSharingService, pendingActionsBuffer,
+ commitPublisher, clusterConfig);
+ key = new SecretKey("user1", "secret1");
+ }
+
+ @Test
+ void testShouldRepairAtThresholdOrThresholdPlusBuffer() {
+ assertTrue(service.shouldRepairLatestRead(2, 1));
+ assertTrue(service.shouldRepairLatestRead(3, 1));
+ }
+
+ @Test
+ void testShouldNotRepairWithoutLiveRepairTarget() {
+ assertFalse(service.shouldRepairLatestRead(2, 0));
+ assertFalse(service.shouldRepairLatestRead(3, 0));
+ }
+
+ @Test
+ void testShouldNotRepairWhenDisabledOrBelowThreshold() {
+ assertFalse(service.shouldRepairLatestRead(1, 1));
+ clusterConfig.setRepairEnabled(false);
+ assertFalse(service.shouldRepairLatestRead(2, 1));
+ }
+
+ @Test
+ void testRepairStagesAndPublishesCommitWhenQuorumReached() {
+ when(secretSharingService.split(key, "secret", 2, 3)).thenReturn(parts());
+ when(nodeClient.resolvePeerUrls()).thenReturn(List.of("http://peer1:8080", "http://peer2:8080"));
+ when(nodeClient.sendRepairPrepare(anyString(), any(RepairPrepareRequest.class)))
+ .thenAnswer(invocation -> PeerResponse.acknowledged(invocation.getArgument(0)));
+
+ service.repairLatestVersion(key, 4L, "secret");
+
+ verify(pendingActionsBuffer).bufferAction(any(), eq(key), eq(ActionType.REPAIR), any(SecretPart.class));
+ verify(commitPublisher).broadcastCommit(any(CommitMessage.class));
+ }
+
+ @Test
+ void testRepairDiscardsWhenQuorumNotReached() {
+ when(secretSharingService.split(key, "secret", 2, 3)).thenReturn(parts());
+ when(nodeClient.resolvePeerUrls()).thenReturn(List.of("http://peer1:8080", "http://peer2:8080"));
+ when(nodeClient.sendRepairPrepare(anyString(), any(RepairPrepareRequest.class)))
+ .thenAnswer(invocation -> PeerResponse.failed(invocation.getArgument(0), "timeout"));
+
+ service.repairLatestVersion(key, 4L, "secret");
+
+ verify(pendingActionsBuffer).discard(any());
+ verify(commitPublisher, never()).broadcastCommit(any());
+ }
+
+ private List parts() {
+ return List.of(
+ new SecretPart(key, null, 1, new byte[] { 1 }),
+ new SecretPart(key, null, 2, new byte[] { 2 }),
+ new SecretPart(key, null, 3, new byte[] { 3 }));
+ }
+}
diff --git a/src/test/java/edu/yu/capstone/DistributedSecretsVault/service/internal/RepairCommitHandlerTest.java b/src/test/java/edu/yu/capstone/DistributedSecretsVault/service/internal/RepairCommitHandlerTest.java
new file mode 100644
index 0000000..4aa4a08
--- /dev/null
+++ b/src/test/java/edu/yu/capstone/DistributedSecretsVault/service/internal/RepairCommitHandlerTest.java
@@ -0,0 +1,73 @@
+package edu.yu.capstone.DistributedSecretsVault.service.internal;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.time.Instant;
+import java.util.UUID;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import edu.yu.capstone.DistributedSecretsVault.domain.model.SecretKey;
+import edu.yu.capstone.DistributedSecretsVault.domain.model.SecretPart;
+import edu.yu.capstone.DistributedSecretsVault.dto.internal.RepairCommitRequest;
+import edu.yu.capstone.DistributedSecretsVault.exceptions.InternalOperationConflictException;
+import edu.yu.capstone.DistributedSecretsVault.repository.SecretPartRepository;
+import edu.yu.capstone.DistributedSecretsVault.service.internal.PendingActionsBuffer.PendingAction;
+
+@ExtendWith(MockitoExtension.class)
+@Tag("unit")
+public class RepairCommitHandlerTest {
+ @Mock
+ private PendingActionsBuffer pendingActionsBuffer;
+
+ @Mock
+ private SecretPartRepository secretPartRepository;
+
+ private RepairCommitHandler handler;
+ private SecretKey key;
+ private SecretPart part;
+
+ @BeforeEach
+ void setUp() {
+ handler = new RepairCommitHandler(pendingActionsBuffer, secretPartRepository);
+ key = new SecretKey("user1", "secret1");
+ part = new SecretPart(key, 2L, 1, new byte[] { 1, 2, 3 });
+ }
+
+ @Test
+ void testHandleSavesBufferedShardWithoutNewVersion() {
+ UUID operationId = UUID.randomUUID();
+ when(pendingActionsBuffer.commitAndRemove(operationId))
+ .thenReturn(new PendingAction(operationId, key, ActionType.REPAIR, part, Instant.now()));
+
+ handler.handle(new RepairCommitRequest(operationId, key));
+
+ verify(secretPartRepository).savePart(part);
+ }
+
+ @Test
+ void testHandleRejectsUnknownOperation() {
+ UUID operationId = UUID.randomUUID();
+ when(pendingActionsBuffer.commitAndRemove(operationId)).thenReturn(null);
+
+ assertThrows(InternalOperationConflictException.class,
+ () -> handler.handle(new RepairCommitRequest(operationId, key)));
+ }
+
+ @Test
+ void testHandleRejectsWrongActionType() {
+ UUID operationId = UUID.randomUUID();
+ when(pendingActionsBuffer.commitAndRemove(operationId))
+ .thenReturn(new PendingAction(operationId, key, ActionType.PUT, part, Instant.now()));
+
+ assertThrows(InternalOperationConflictException.class,
+ () -> handler.handle(new RepairCommitRequest(operationId, key)));
+ }
+}
diff --git a/src/test/java/edu/yu/capstone/DistributedSecretsVault/service/internal/RepairPrepareHandlerTest.java b/src/test/java/edu/yu/capstone/DistributedSecretsVault/service/internal/RepairPrepareHandlerTest.java
new file mode 100644
index 0000000..9d0b6af
--- /dev/null
+++ b/src/test/java/edu/yu/capstone/DistributedSecretsVault/service/internal/RepairPrepareHandlerTest.java
@@ -0,0 +1,56 @@
+package edu.yu.capstone.DistributedSecretsVault.service.internal;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.verify;
+
+import java.util.UUID;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import edu.yu.capstone.DistributedSecretsVault.domain.model.SecretKey;
+import edu.yu.capstone.DistributedSecretsVault.domain.model.SecretPart;
+import edu.yu.capstone.DistributedSecretsVault.dto.internal.RepairPrepareRequest;
+import edu.yu.capstone.DistributedSecretsVault.dto.internal.SecretPartMessage;
+
+@ExtendWith(MockitoExtension.class)
+@Tag("unit")
+public class RepairPrepareHandlerTest {
+ @Mock
+ private PendingActionsBuffer pendingActionsBuffer;
+
+ private RepairPrepareHandler handler;
+ private SecretKey key;
+
+ @BeforeEach
+ void setUp() {
+ handler = new RepairPrepareHandler(pendingActionsBuffer);
+ key = new SecretKey("user1", "secret1");
+ }
+
+ @Test
+ void testHandleBuffersRepairPart() {
+ UUID operationId = UUID.randomUUID();
+ SecretPartMessage message = new SecretPartMessage(key, 2L, new byte[] { 1 }, 1L, 1);
+
+ handler.handle(new RepairPrepareRequest("node-1", operationId, message));
+
+ verify(pendingActionsBuffer).bufferAction(eq(operationId), eq(key), eq(ActionType.REPAIR),
+ any(SecretPart.class));
+ }
+
+ @Test
+ void testHandleRejectsMissingVersion() {
+ UUID operationId = UUID.randomUUID();
+ SecretPartMessage message = new SecretPartMessage(key, null, new byte[] { 1 }, 1L, 1);
+
+ assertThrows(IllegalArgumentException.class,
+ () -> handler.handle(new RepairPrepareRequest("node-1", operationId, message)));
+ }
+}