-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathErasureRouting.java
More file actions
117 lines (101 loc) · 4.38 KB
/
ErasureRouting.java
File metadata and controls
117 lines (101 loc) · 4.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package com.github.koop.common.metadata;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.zip.CRC32;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* Routes a storage key to a partition number and then to the list of storage
* node addresses responsible for that partition, driven entirely by live
* metadata rather than hardcoded boundaries.
*
* <p>
* Partition assignment:
* <ol>
* <li>Collect all partition numbers from all spread entries and sort them.</li>
* <li>Hash the key with CRC32 and mod by the total count to get an index.</li>
* <li>Look up the partition number at that index in the sorted list.</li>
* </ol>
*
* <p>
* Sorting ensures that the same key always maps to the same partition number
* regardless of the order in which partitions appear in the config or how they
* are redistributed across erasure sets.
*
* <p>
* Errors are logged and empty Optionals returned rather than throwing, so
* that a bad config entry does not crash the query processor.
*/
public final class ErasureRouting {
private static final Logger logger = LogManager.getLogger(ErasureRouting.class);
private final PartitionSpreadConfiguration partitionSpread;
private final ErasureSetConfiguration erasureSetConfig;
public ErasureRouting(PartitionSpreadConfiguration partitionSpread,
ErasureSetConfiguration erasureSetConfig) {
if (partitionSpread == null)
throw new IllegalArgumentException("partitionSpread is null");
if (erasureSetConfig == null)
throw new IllegalArgumentException("erasureSetConfig is null");
this.partitionSpread = partitionSpread;
this.erasureSetConfig = erasureSetConfig;
}
/**
* Returns the partition number that owns {@code key}, or an empty
* {@link OptionalInt} if routing cannot be resolved.
*
* <p>
* All partition numbers across all spread entries are collected and
* sorted so that the mapping is stable even if partition assignments are
* redistributed between erasure sets.
*/
public OptionalInt getPartition(String key) {
var spreads = partitionSpread.getPartitionSpread();
if (spreads == null || spreads.isEmpty()) {
logger.error("getPartition failed: PartitionSpreadConfiguration has no entries");
return OptionalInt.empty();
}
// Collect and sort all partition numbers for a stable, order-independent
// mapping
List<Integer> allPartitions = spreads.stream()
.flatMap(s -> s.getPartitions().stream())
.sorted()
.toList();
if (allPartitions.isEmpty()) {
logger.error("getPartition failed: no partitions found in PartitionSpreadConfiguration");
return OptionalInt.empty();
}
CRC32 crc = new CRC32();
byte[] b = key.getBytes(StandardCharsets.UTF_8);
crc.update(b, 0, b.length);
int index = (int) (crc.getValue() % allPartitions.size());
return OptionalInt.of(allPartitions.get(index));
}
/**
* Returns the ErasureSet responsible for {@code partition}, or an empty
* {@link Optional} if it cannot be resolved.
*/
public Optional<ErasureSetConfiguration.ErasureSet> getErasureSet(int partition) {
var spreads = partitionSpread.getPartitionSpread();
if (spreads == null || spreads.isEmpty()) {
logger.error("getErasureSet failed: PartitionSpreadConfiguration has no entries");
return Optional.empty();
}
for (var spread : spreads) {
if (spread.getPartitions().contains(partition)) {
int setNumber = spread.getErasureSet();
Optional<ErasureSetConfiguration.ErasureSet> erasureSet = erasureSetConfig.getErasureSets().stream()
.filter(es -> es.getNumber() == setNumber)
.findFirst();
if (erasureSet.isEmpty()) {
logger.error("getErasureSet failed: no erasure set found for set number {}", setNumber);
}
return erasureSet;
}
}
logger.error("getErasureSet failed: no spread entry found for partition {}", partition);
return Optional.empty();
}
}