-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathRealStorageNodesIT.java
More file actions
651 lines (517 loc) · 24.7 KB
/
RealStorageNodesIT.java
File metadata and controls
651 lines (517 loc) · 24.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
package koop;
import com.github.koop.common.metadata.ErasureSetConfiguration;
import com.github.koop.common.metadata.ErasureSetConfiguration.ErasureSet;
import com.github.koop.common.metadata.ErasureSetConfiguration.Machine;
import com.github.koop.common.metadata.MemoryFetcher;
import com.github.koop.common.metadata.MetadataClient;
import com.github.koop.common.metadata.PartitionSpreadConfiguration;
import com.github.koop.common.metadata.PartitionSpreadConfiguration.PartitionSpread;
import com.github.koop.common.pubsub.MemoryPubSub;
import com.github.koop.common.pubsub.PubSubClient;
import com.github.koop.queryprocessor.processor.CommitCoordinator;
import com.github.koop.queryprocessor.processor.StorageWorker;
import com.github.koop.storagenode.RocksDbRepairQueue;
import com.github.koop.storagenode.StorageNodeServerV2;
import com.github.koop.storagenode.db.Database;
import com.github.koop.storagenode.db.RocksDbStorageStrategy;
import org.junit.jupiter.api.*;
import java.io.*;
import java.net.*;
import java.nio.file.*;
import java.security.SecureRandom;
import java.time.LocalTime;
import java.util.*;
import static org.junit.jupiter.api.Assertions.*;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class RealStorageNodesIT {
private static final int TOTAL_NODES = 9;
private static final int DATA_SIZE = 2 * 1024 * 1024; // 2MB
private final List<StorageNodeServerV2> servers = new ArrayList<>();
private final List<Path> dataDirs = new ArrayList<>();
private final List<Database> databases = new ArrayList<>();
private List<InetSocketAddress> addrs;
private MemoryFetcher sharedFetcher;
private MemoryPubSub sharedPubSub;
private MetadataClient sharedMetadataClient;
private PubSubClient sharedPubSubClient;
private CommitCoordinator commitCoordinator;
private StorageWorker worker;
private static void log(String msg) {
System.out.printf("[%s] %s%n",
LocalTime.now().withNano(0),
msg);
}
@BeforeEach
void startRealNodes() throws Exception {
log("=== STARTING STORAGE CLUSTER ===");
addrs = new ArrayList<>();
sharedFetcher = new MemoryFetcher();
sharedPubSub = new MemoryPubSub();
sharedPubSubClient = new PubSubClient(sharedPubSub);
sharedPubSubClient.start();
sharedMetadataClient = new MetadataClient(sharedFetcher);
sharedMetadataClient.start();
commitCoordinator = new CommitCoordinator(sharedPubSubClient, 0, 10);
worker = new StorageWorker(sharedMetadataClient, commitCoordinator);
for (int i = 0; i < TOTAL_NODES; i++) {
int port = freePort();
Path dir = Files.createTempDirectory("storagenode-" + i + "-");
log("Starting node " + i + " on port " + port);
RocksDbStorageStrategy strategy = new RocksDbStorageStrategy(dir.resolve("db").toString());
RocksDbRepairQueue repairQueue = new RocksDbRepairQueue(strategy);
Database db = new Database(strategy);
StorageNodeServerV2 server =
new StorageNodeServerV2(port, "127.0.0.1", db, dir.resolve("data"), sharedMetadataClient, sharedPubSubClient, repairQueue);
servers.add(server);
dataDirs.add(dir);
databases.add(db);
log("[NODE " + i + "] server starting");
InetSocketAddress addr = new InetSocketAddress("127.0.0.1", port);
log("[NODE " + i + "] READY");
addrs.add(addr);
}
ErasureSetConfiguration esConfig = new ErasureSetConfiguration();
ErasureSet es = new ErasureSet();
es.setNumber(1);
es.setN(9);
es.setM(6);
es.setWriteQuorum(7);
List<Machine> machines = new ArrayList<>();
for (InetSocketAddress addr : addrs) {
Machine m = new Machine();
m.setIp(addr.getHostString());
m.setPort(addr.getPort());
machines.add(m);
}
es.setMachines(machines);
esConfig.setErasureSets(List.of(es));
PartitionSpreadConfiguration psConfig = new PartitionSpreadConfiguration();
PartitionSpread ps = new PartitionSpread();
ps.setErasureSet(1);
List<Integer> parts = new ArrayList<>();
for(int i = 0; i < 3; i++) parts.add(i);
ps.setPartitions(parts);
psConfig.setPartitionSpread(List.of(ps));
sharedFetcher.update(esConfig);
sharedFetcher.update(psConfig);
servers.forEach(StorageNodeServerV2::start);
log("=== CLUSTER READY ===");
}
@AfterEach
void stopRealNodes() throws Exception {
log("=== STOPPING CLUSTER ===");
if (worker != null) worker.shutdown();
if (sharedMetadataClient != null) sharedMetadataClient.close();
if (sharedPubSubClient != null) sharedPubSubClient.close();
for (int i = 0; i < servers.size(); i++) {
log("Stopping node " + i);
servers.get(i).stop();
}
for (Database db : databases) {
try {
db.close();
} catch (Exception ignored) {}
}
for (Path d : dataDirs)
deleteRecursive(d);
servers.clear();
databases.clear();
dataDirs.clear();
log("=== CLUSTER STOPPED ===");
}
@Test
void put_get_roundTrip_realServers() throws Exception {
log("Generating random test data...");
byte[] data = new byte[DATA_SIZE];
new SecureRandom().nextBytes(data);
UUID req = UUID.randomUUID();
log("[WORKER] PUT starting...");
boolean putOk =
worker.put(req, "b", "realA",
data.length,
new ByteArrayInputStream(data));
log("[WORKER] PUT result = " + putOk);
assertTrue(putOk, "PUT should have successfully reached ACK quorum");
log("[WORKER] GET starting...");
StorageWorker.RetrievedObject obj = worker.get(UUID.randomUUID(), "b", "realA");
assertNotNull(obj);
try (InputStream in = obj.stream()) {
byte[] got = in.readAllBytes();
log("[WORKER] GET received " + got.length + " bytes");
assertArrayEquals(data, got);
}
log("Round-trip test completed successfully.");
}
@Test
void get_tolerates_three_node_failures_realServers() throws Exception {
log("Generating random test data for erasure tolerance test...");
byte[] data = new byte[DATA_SIZE];
new SecureRandom().nextBytes(data);
String key = "erasureB";
UUID putReq = UUID.randomUUID();
log("[WORKER] PUT starting (erasureB)...");
boolean putOk = worker.put(putReq, "b", key, data.length, new ByteArrayInputStream(data));
log("[WORKER] PUT result = " + putOk);
assertTrue(putOk, "PUT should succeed before failures");
log("Simulating 3 node failures (should still reconstruct)...");
stopNodes(0, 1, 2);
log("[WORKER] GET starting with 3 nodes down...");
byte[] got;
StorageWorker.RetrievedObject obj = worker.get(UUID.randomUUID(), "b", key);
assertNotNull(obj);
try (InputStream in = obj.stream()) {
got = in.readAllBytes();
}
log("[WORKER] GET received " + got.length + " bytes with 3 nodes down");
assertArrayEquals(data, got, "Should reconstruct full object with 3 nodes down");
log("Erasure tolerance test (3 failures) completed successfully.");
}
@Test
void get_fails_with_four_node_failures_realServers() throws Exception {
log("Generating random test data for erasure failure test...");
byte[] data = new byte[DATA_SIZE];
new SecureRandom().nextBytes(data);
String key = "erasureC";
UUID putReq = UUID.randomUUID();
log("[WORKER] PUT starting (erasureC)...");
boolean putOk = worker.put(putReq, "b", key, data.length, new ByteArrayInputStream(data));
log("[WORKER] PUT result = " + putOk);
assertTrue(putOk, "PUT should succeed before failures");
log("Simulating 4 node failures (should NOT reconstruct)...");
stopNodes(0, 1, 2, 3);
log("[WORKER] GET starting with 4 nodes down (expect failure)...");
try {
StorageWorker.RetrievedObject obj = worker.get(UUID.randomUUID(), "b", key);
if (obj != null) {
try (InputStream in = obj.stream()) {
byte[] got = in.readAllBytes();
log("[WORKER] GET returned " + got.length + " bytes with 4 nodes down");
if (Arrays.equals(data, got)) {
fail("Expected reconstruction to fail with 4 nodes down, but got full correct data");
}
assertNotEquals(data.length, got.length,
"Expected missing/corrupt data length with 4 nodes down");
}
} else {
log("[WORKER] GET returned null with 4 nodes down (as expected)");
}
} catch (Exception e) {
log("[WORKER] GET failed with exception as expected: " + e);
}
log("Erasure failure test (4 failures) completed.");
}
@Test
void get_after_delete_returnsNull_realServers() throws Exception {
log("Generating random test data for tombstone test...");
byte[] data = new byte[DATA_SIZE];
new SecureRandom().nextBytes(data);
String key = "deleteMe";
UUID putReq = UUID.randomUUID();
log("[WORKER] PUT starting (deleteMe)...");
boolean putOk = worker.put(putReq, "b", key, data.length, new ByteArrayInputStream(data));
assertTrue(putOk, "PUT should succeed");
log("[WORKER] GET starting (before delete)...");
StorageWorker.RetrievedObject beforeDelete = worker.get(UUID.randomUUID(), "b", key);
assertNotNull(beforeDelete);
try (InputStream in = beforeDelete.stream()) {
byte[] got = in.readAllBytes();
assertArrayEquals(data, got, "GET should return correct data before delete");
}
var v1Version = databases.get(0).getLatestFileVersion("b/" + key);
assertTrue(v1Version.isPresent(), "v1 version should exist in DB");
long v1Seq = v1Version.get().sequenceNumber();
var metadata = databases.get(0).getItem("b/" + key);
assertTrue(metadata.isPresent(), "metadata should exist in DB");
int partition = metadata.get().partition();
long tombstoneSeq = v1Seq + 1;
log("Writing tombstone (seq=" + tombstoneSeq + ") to all " + databases.size() + " node DBs...");
for (Database db : databases) {
db.deleteItem("b/" + key, partition, tombstoneSeq);
}
log("[WORKER] GET starting (after delete, expect null)...");
StorageWorker.RetrievedObject afterDelete = worker.get(UUID.randomUUID(), "b", key);
assertNull(afterDelete, "GET after delete should return null");
log("Tombstone/delete test completed successfully.");
}
@Test
void get_multipart_roundTrip_realServers() throws Exception {
log("Generating chunk data for multipart test...");
int chunkSize = DATA_SIZE / 2;
byte[] chunk1Data = new byte[chunkSize];
byte[] chunk2Data = new byte[chunkSize];
new SecureRandom().nextBytes(chunk1Data);
new SecureRandom().nextBytes(chunk2Data);
UUID chunk1Req = UUID.randomUUID();
log("[WORKER] PUT chunk1 starting...");
boolean put1Ok = worker.put(chunk1Req, "b", "part1", chunk1Data.length, new ByteArrayInputStream(chunk1Data));
assertTrue(put1Ok, "PUT chunk1 should succeed");
UUID chunk2Req = UUID.randomUUID();
log("[WORKER] PUT chunk2 starting...");
boolean put2Ok = worker.put(chunk2Req, "b", "part2", chunk2Data.length, new ByteArrayInputStream(chunk2Data));
assertTrue(put2Ok, "PUT chunk2 should succeed");
String uploadId = UUID.randomUUID().toString();
log("[WORKER] beginMultipartCommit starting...");
long totalSize = chunk1Data.length + chunk2Data.length;
boolean commitOk = worker.beginMultipartCommit("b", "multipartFile", uploadId, List.of("b/part1", "b/part2"), totalSize);
assertTrue(commitOk, "Multipart commit should succeed");
log("[WORKER] GET multipart starting...");
StorageWorker.RetrievedObject obj = worker.get(UUID.randomUUID(), "b", "multipartFile");
assertNotNull(obj);
try (InputStream in = obj.stream()) {
byte[] got = in.readAllBytes();
byte[] expected = new byte[chunk1Data.length + chunk2Data.length];
System.arraycopy(chunk1Data, 0, expected, 0, chunk1Data.length);
System.arraycopy(chunk2Data, 0, expected, chunk1Data.length, chunk2Data.length);
log("[WORKER] GET multipart received " + got.length + " bytes (expected " + expected.length + ")");
assertArrayEquals(expected, got, "Multipart GET should return concatenated chunk data");
}
log("Multipart roundtrip test completed successfully.");
}
@Test
void get_blob_version_fallback_realServers() throws Exception {
log("Generating random test data for version fallback test...");
byte[] v1Data = new byte[DATA_SIZE];
new SecureRandom().nextBytes(v1Data);
String key = "fallbackKey";
UUID v1Req = UUID.randomUUID();
log("[WORKER] PUT v1 starting...");
boolean putOk = worker.put(v1Req, "b", key, v1Data.length, new ByteArrayInputStream(v1Data));
assertTrue(putOk, "PUT v1 should succeed");
log("[WORKER] GET v1 starting (sanity check)...");
StorageWorker.RetrievedObject obj1 = worker.get(UUID.randomUUID(), "b", key);
assertNotNull(obj1);
try (InputStream in = obj1.stream()) {
assertArrayEquals(v1Data, in.readAllBytes(), "v1 should be readable");
}
var v1Version = databases.get(0).getLatestFileVersion("b/" + key);
assertTrue(v1Version.isPresent(), "v1 version should exist in DB");
long v1Seq = v1Version.get().sequenceNumber();
var metadata = databases.get(0).getItem("b/" + key);
assertTrue(metadata.isPresent(), "metadata should exist in DB");
int partition = metadata.get().partition();
String fakeV2RequestId = UUID.randomUUID().toString();
long v2Seq = v1Seq + 1;
log("Writing partial v2 (seq=" + v2Seq + ") to 3 nodes...");
for (int i = 0; i < 3; i++) {
Path blobDir = dataDirs.get(i).resolve("data").resolve("blobs");
String prefix = fakeV2RequestId.length() >= 3 ? fakeV2RequestId.substring(0, 3) : "000";
Path blobPath = blobDir.resolve(prefix).resolve(fakeV2RequestId);
Files.createDirectories(blobPath.getParent());
Files.write(blobPath, new byte[]{0, 1, 2, 3});
databases.get(i).putItem("b/" + key, partition, v2Seq, fakeV2RequestId, v1Data.length);
}
log("[WORKER] GET starting (expect v2 fallback to v1)...");
StorageWorker.RetrievedObject obj2 = worker.get(UUID.randomUUID(), "b", key);
assertNotNull(obj2);
try (InputStream in = obj2.stream()) {
byte[] got = in.readAllBytes();
log("[WORKER] GET received " + got.length + " bytes after version fallback");
assertArrayEquals(v1Data, got, "Should reconstruct v1 data after v2 has insufficient shards");
}
log("Version fallback test completed successfully.");
}
@Test
void createBucket_realServers() throws Exception {
log("Testing createBucket with real storage nodes...");
UUID req = UUID.randomUUID();
boolean ok = worker.createBucket(req, "integration-bucket");
log("[WORKER] createBucket result = " + ok);
assertTrue(ok, "createBucket should succeed with real SNs");
byte[] data = new byte[1024];
new SecureRandom().nextBytes(data);
boolean putOk = worker.put(UUID.randomUUID(), "integration-bucket", "probe",
data.length, new ByteArrayInputStream(data));
assertTrue(putOk, "PUT into newly created bucket should succeed");
log("createBucket test completed successfully.");
}
@Test
void deleteBucket_realServers() throws Exception {
log("Testing deleteBucket with real storage nodes...");
assertTrue(worker.createBucket(UUID.randomUUID(), "doomed-bucket"),
"createBucket should succeed");
byte[] data = new byte[1024];
new SecureRandom().nextBytes(data);
assertTrue(worker.put(UUID.randomUUID(), "doomed-bucket", "obj",
data.length, new ByteArrayInputStream(data)),
"PUT should succeed before bucket deletion");
boolean deleteOk = worker.deleteBucket(UUID.randomUUID(), "doomed-bucket");
log("[WORKER] deleteBucket result = " + deleteOk);
assertTrue(deleteOk, "deleteBucket should succeed with real SNs");
log("deleteBucket test completed successfully.");
}
@Test
void bucketLifecycle_realServers() throws Exception {
log("Testing full bucket lifecycle with real storage nodes...");
String bucket = "lifecycle-bucket";
assertTrue(worker.createBucket(UUID.randomUUID(), bucket),
"createBucket should succeed");
log("Bucket created.");
byte[] data = new byte[DATA_SIZE];
new SecureRandom().nextBytes(data);
assertTrue(worker.put(UUID.randomUUID(), bucket, "life-obj",
data.length, new ByteArrayInputStream(data)),
"PUT should succeed");
log("Object PUT succeeded.");
StorageWorker.RetrievedObject obj1 = worker.get(UUID.randomUUID(), bucket, "life-obj");
assertNotNull(obj1);
try (InputStream in = obj1.stream()) {
assertArrayEquals(data, in.readAllBytes(), "GET should return original data");
}
log("GET round-trip verified.");
assertTrue(worker.delete(UUID.randomUUID(), bucket, "life-obj"),
"delete should succeed");
log("Object deleted.");
try {
StorageWorker.RetrievedObject obj2 = worker.get(UUID.randomUUID(), bucket, "life-obj");
if (obj2 != null) {
try (InputStream in = obj2.stream()) {
byte[] got = in.readAllBytes();
assertNotEquals(data.length, got.length,
"Data should not be fully retrievable after delete");
}
} else {
log("GET returned null as expected after delete.");
}
} catch (Exception e) {
log("[WORKER] GET after delete threw as expected: " + e.getMessage());
}
log("Post-delete GET correctly failed.");
assertTrue(worker.deleteBucket(UUID.randomUUID(), bucket),
"deleteBucket should succeed");
log("Bucket deleted.");
log("Full bucket lifecycle test completed successfully.");
}
@Test
void delete_tombstones_object_realServers() throws Exception {
log("Testing pub/sub delete with real storage nodes...");
byte[] data = new byte[DATA_SIZE];
new SecureRandom().nextBytes(data);
assertTrue(worker.put(UUID.randomUUID(), "b", "to-delete",
data.length, new ByteArrayInputStream(data)),
"PUT should succeed");
log("Object PUT succeeded.");
StorageWorker.RetrievedObject obj1 = worker.get(UUID.randomUUID(), "b", "to-delete");
assertNotNull(obj1);
try (InputStream in = obj1.stream()) {
assertArrayEquals(data, in.readAllBytes(), "Object should be readable before delete");
}
boolean deleteOk = worker.delete(UUID.randomUUID(), "b", "to-delete");
log("[WORKER] delete result = " + deleteOk);
assertTrue(deleteOk, "delete should succeed with real SNs");
try {
StorageWorker.RetrievedObject obj2 = worker.get(UUID.randomUUID(), "b", "to-delete");
if (obj2 != null) {
try (InputStream in = obj2.stream()) {
byte[] got = in.readAllBytes();
assertNotEquals(data.length, got.length,
"Object should not be fully retrievable after tombstone");
}
}
} catch (Exception e) {
log("[WORKER] GET after delete threw as expected: " + e.getMessage());
}
log("Delete tombstone test completed successfully.");
}
@Test
void goodSlowLorisTest() throws Exception {
// 800 KB of random data; 8 KB chunks × 400 ms sleep → ~100 sleeps per direction → ~40 s each.
// Verifies that neither the server nor the client times out during a slow transfer.
final int CHUNK = 8 * 1024;
final long DELAY_MS = 400L;
final int TOTAL = 100 * CHUNK;
byte[] data = new byte[TOTAL];
new SecureRandom().nextBytes(data);
log("Starting slow PUT (~40 s)...");
long putStart = System.currentTimeMillis();
boolean putOk = worker.put(
UUID.randomUUID(), "b", "slow-loris",
data.length,
new SlowInputStream(new ByteArrayInputStream(data), CHUNK, DELAY_MS));
log("Slow PUT finished in " + (System.currentTimeMillis() - putStart) + " ms");
assertTrue(putOk, "Slow PUT should succeed without timing out");
log("Starting slow GET (~40 s)...");
long getStart = System.currentTimeMillis();
StorageWorker.RetrievedObject obj = worker.get(UUID.randomUUID(), "b", "slow-loris");
assertNotNull(obj, "GET should return a non-null object");
byte[] got;
try (InputStream in = obj.stream()) {
got = readSlowly(in, CHUNK, DELAY_MS);
}
log("Slow GET finished in " + (System.currentTimeMillis() - getStart) + " ms");
assertArrayEquals(data, got, "Data should round-trip correctly across slow transfers");
log("Slow-loris test completed successfully.");
}
private static byte[] readSlowly(InputStream in, int chunkSize, long delayMs) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
byte[] buf = new byte[chunkSize];
int n;
while ((n = in.read(buf)) != -1) {
out.write(buf, 0, n);
try {
Thread.sleep(delayMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted during slow read", e);
}
}
return out.toByteArray();
}
private static class SlowInputStream extends InputStream {
private final InputStream delegate;
private final int maxChunk;
private final long delayMs;
SlowInputStream(InputStream delegate, int maxChunk, long delayMs) {
this.delegate = delegate;
this.maxChunk = maxChunk;
this.delayMs = delayMs;
}
@Override
public int read() throws IOException {
try {
Thread.sleep(delayMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted during slow read", e);
}
return delegate.read();
}
@Override
public int read(byte[] buf, int off, int len) throws IOException {
try {
Thread.sleep(delayMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted during slow read", e);
}
return delegate.read(buf, off, Math.min(len, maxChunk));
}
@Override
public void close() throws IOException {
delegate.close();
}
}
private void stopNodes(int... idxs) {
for (int idx : idxs) {
if (idx < 0 || idx >= servers.size()) {
throw new IllegalArgumentException("Bad node index: " + idx);
}
log("Stopping node " + idx + " to simulate failure");
servers.get(idx).stop();
}
}
private static int freePort() throws IOException {
try (ServerSocket ss = new ServerSocket(0)) {
ss.setReuseAddress(true);
return ss.getLocalPort();
}
}
private static void deleteRecursive(Path root) throws IOException {
if (!Files.exists(root)) return;
Files.walk(root)
.sorted(Comparator.reverseOrder())
.forEach(p -> {
try { Files.deleteIfExists(p); }
catch (IOException ignored) {}
});
}
}