Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -559,23 +559,25 @@ private CompletableFuture<Void> performRowHashBasedComparison(

return CompletableFuture.supplyAsync(() -> {
log.debug("Using row-hash based comparison for segment: {}", segmentId);
Map<List<Object>, String> hashes1 = table1.getRowHashes();
Map<List<Object>, String> hashes2 = table2.getRowHashes();
Map<String, DataType> columnTypes1 = extractColumnTypes(table1);
Map<String, DataType> columnTypes2 = extractColumnTypes(table2);
RowHashSide hashes1 = indexRowHashes(table1, table1.getRowHashes(), columnTypes1);
RowHashSide hashes2 = indexRowHashes(table2, table2.getRowHashes(), columnTypes2);
return new RowHashSnapshot(hashes1, hashes2);
}, executorProvider.getIoExecutor())
.thenApplyAsync(snapshot -> {
Set<List<Object>> keysOnlyInTable1 = new HashSet<>(snapshot.hashes1.keySet());
keysOnlyInTable1.removeAll(snapshot.hashes2.keySet());
Set<List<Object>> keysOnlyInTable1 = new HashSet<>(snapshot.table1.hashesByKey.keySet());
keysOnlyInTable1.removeAll(snapshot.table2.hashesByKey.keySet());

Set<List<Object>> keysOnlyInTable2 = new HashSet<>(snapshot.hashes2.keySet());
keysOnlyInTable2.removeAll(snapshot.hashes1.keySet());
Set<List<Object>> keysOnlyInTable2 = new HashSet<>(snapshot.table2.hashesByKey.keySet());
keysOnlyInTable2.removeAll(snapshot.table1.hashesByKey.keySet());

Set<List<Object>> mismatchedKeys = new HashSet<>();
for (Map.Entry<List<Object>, String> entry : snapshot.hashes1.entrySet()) {
for (Map.Entry<List<Object>, String> entry : snapshot.table1.hashesByKey.entrySet()) {
List<Object> key = entry.getKey();
if (snapshot.hashes2.containsKey(key)) {
if (snapshot.table2.hashesByKey.containsKey(key)) {
String hash1 = entry.getValue();
String hash2 = snapshot.hashes2.get(key);
String hash2 = snapshot.table2.hashesByKey.get(key);
if (!hash1.equals(hash2)) {
mismatchedKeys.add(key);
}
Expand All @@ -591,8 +593,8 @@ private CompletableFuture<Void> performRowHashBasedComparison(
int debugCount = 0;
for (List<Object> key : mismatchedKeys) {
if (debugCount++ < 5) {
String hash1 = snapshot.hashes1.get(key);
String hash2 = snapshot.hashes2.get(key);
String hash1 = snapshot.table1.hashesByKey.get(key);
String hash2 = snapshot.table2.hashesByKey.get(key);
log.debug("Hash mismatch for key {}: source_hash={}, target_hash={}",
key, hash1, hash2);
} else {
Expand All @@ -604,23 +606,32 @@ private CompletableFuture<Void> performRowHashBasedComparison(
return new RowChecksumDiffPlan(snapshot, keysOnlyInTable1, keysOnlyInTable2, mismatchedKeys);
}, executorProvider.getCpuExecutor())
.thenApplyAsync(plan -> {
Map<String, DataType> columnTypes1 = extractColumnTypes(table1);
Map<String, DataType> columnTypes2 = extractColumnTypes(table2);
if (plan.totalDifferences() == 0) {
return new RowChecksumDiffData(plan, Collections.emptyMap(), Collections.emptyMap());
return new RowChecksumDiffData(
plan,
Collections.emptyMap(),
Collections.emptyMap(),
columnTypes1,
columnTypes2);
}
Map<List<Object>, Object[]> data1 = queryRowsByKeys(table1,
combineKeys(plan.keysOnlyInTable1, plan.mismatchedKeys));
rawKeysFor(plan.snapshot.table1, plan.keysOnlyInTable1, plan.mismatchedKeys),
columnTypes1);
Map<List<Object>, Object[]> data2 = queryRowsByKeys(table2,
combineKeys(plan.keysOnlyInTable2, plan.mismatchedKeys));
return new RowChecksumDiffData(plan, data1, data2);
rawKeysFor(plan.snapshot.table2, plan.keysOnlyInTable2, plan.mismatchedKeys),
columnTypes2);
return new RowChecksumDiffData(plan, data1, data2, columnTypes1, columnTypes2);
}, executorProvider.getIoExecutor())
.thenAcceptAsync(diffData -> {
try {
RowChecksumDiffPlan plan = diffData.plan;
List<DiffRow> differences = new ArrayList<>();

if (plan.totalDifferences() > 0) {
Map<String, DataType> columnTypes1 = extractColumnTypes(table1);
Map<String, DataType> columnTypes2 = extractColumnTypes(table2);
Map<String, DataType> columnTypes1 = diffData.columnTypes1;
Map<String, DataType> columnTypes2 = diffData.columnTypes2;

for (List<Object> key : plan.keysOnlyInTable1) {
Object[] row = diffData.data1.get(key);
Expand Down Expand Up @@ -652,14 +663,17 @@ private CompletableFuture<Void> performRowHashBasedComparison(
}

performanceMonitor.recordLocalComparison(segmentId,
plan.snapshot.hashes1.size(), plan.snapshot.hashes2.size(), differences.size());
plan.snapshot.table1.hashesByKey.size(),
plan.snapshot.table2.hashesByKey.size(),
differences.size());

storeDifferences(infoTreeRecorder, differences, segmentId);
infoTreeRecorder.addRowsFetched(segmentId,
plan.snapshot.hashes1.size() + plan.snapshot.hashes2.size());
plan.snapshot.table1.hashesByKey.size() + plan.snapshot.table2.hashesByKey.size());

progressReporter.segmentCompleted(segmentId,
plan.snapshot.hashes1.size() + plan.snapshot.hashes2.size(), differences.size());
plan.snapshot.table1.hashesByKey.size() + plan.snapshot.table2.hashesByKey.size(),
differences.size());

log.info("Row-hash based comparison completed: {} differences for segment: {}",
differences.size(), segmentId);
Expand Down Expand Up @@ -716,7 +730,9 @@ private CompletableFuture<Void> performFullDataComparison(
/**
* Query rows by primary keys and return as a map.
*/
private Map<List<Object>, Object[]> queryRowsByKeys(TableSegment segment, Set<List<Object>> keys) {
private Map<List<Object>, Object[]> queryRowsByKeys(TableSegment segment,
Set<List<Object>> keys,
Map<String, DataType> columnTypes) {
if (keys.isEmpty()) {
return Collections.emptyMap();
}
Expand All @@ -727,22 +743,74 @@ private Map<List<Object>, Object[]> queryRowsByKeys(TableSegment segment, Set<Li
int keyColumnCount = segment.getKeyColumns().size();

for (Object[] row : rows) {
// Extract primary key from row
List<Object> primaryKey = new ArrayList<>(keyColumnCount);
primaryKey.addAll(Arrays.asList(row).subList(0, keyColumnCount));
List<Object> primaryKey = normalizeKeyValues(
Arrays.asList(row).subList(0, Math.min(keyColumnCount, row.length)),
segment.getKeyColumns(),
columnTypes);
rowMap.put(primaryKey, row);
}

return rowMap;
}

private RowHashSide indexRowHashes(TableSegment segment,
Map<List<Object>, String> rowHashes,
Map<String, DataType> columnTypes) {
Map<List<Object>, String> hashesByKey = new LinkedHashMap<>();
Map<List<Object>, List<Object>> rawKeysByKey = new LinkedHashMap<>();
for (Map.Entry<List<Object>, String> entry : rowHashes.entrySet()) {
List<Object> normalizedKey = normalizeKeyValues(entry.getKey(), segment.getKeyColumns(), columnTypes);
hashesByKey.put(normalizedKey, entry.getValue());
rawKeysByKey.put(normalizedKey, entry.getKey());
}
return new RowHashSide(hashesByKey, rawKeysByKey);
}

@SafeVarargs
private final Set<List<Object>> rawKeysFor(RowHashSide side, Set<List<Object>>... normalizedKeySets) {
Set<List<Object>> rawKeys = new LinkedHashSet<>();
for (Set<List<Object>> normalizedKeySet : normalizedKeySets) {
for (List<Object> normalizedKey : normalizedKeySet) {
List<Object> rawKey = side.rawKeysByKey.get(normalizedKey);
if (rawKey != null) {
rawKeys.add(rawKey);
}
}
}
return rawKeys;
}

private List<Object> normalizeKeyValues(List<?> keyValues,
List<String> keyColumns,
Map<String, DataType> columnTypes) {
List<Object> normalized = new ArrayList<>(keyValues.size());
for (int i = 0; i < keyValues.size(); i++) {
String columnName = i < keyColumns.size() ? keyColumns.get(i) : null;
DataType dataType = columnName != null
? columnTypes.getOrDefault(columnName, DataType.UNKNOWN)
: DataType.UNKNOWN;
normalized.add(ValueNormalizer.normalizeValue(keyValues.get(i), dataType));
}
return List.copyOf(normalized);
}

private static class RowHashSnapshot {
private final Map<List<Object>, String> hashes1;
private final Map<List<Object>, String> hashes2;
private final RowHashSide table1;
private final RowHashSide table2;

private RowHashSnapshot(RowHashSide table1, RowHashSide table2) {
this.table1 = table1;
this.table2 = table2;
}
}

private static class RowHashSide {
private final Map<List<Object>, String> hashesByKey;
private final Map<List<Object>, List<Object>> rawKeysByKey;

private RowHashSnapshot(Map<List<Object>, String> hashes1, Map<List<Object>, String> hashes2) {
this.hashes1 = hashes1;
this.hashes2 = hashes2;
private RowHashSide(Map<List<Object>, String> hashesByKey, Map<List<Object>, List<Object>> rawKeysByKey) {
this.hashesByKey = hashesByKey;
this.rawKeysByKey = rawKeysByKey;
}
}

Expand All @@ -769,12 +837,18 @@ private static class RowChecksumDiffData {
private final RowChecksumDiffPlan plan;
private final Map<List<Object>, Object[]> data1;
private final Map<List<Object>, Object[]> data2;
private final Map<String, DataType> columnTypes1;
private final Map<String, DataType> columnTypes2;

private RowChecksumDiffData(RowChecksumDiffPlan plan, Map<List<Object>, Object[]> data1,
Map<List<Object>, Object[]> data2) {
Map<List<Object>, Object[]> data2,
Map<String, DataType> columnTypes1,
Map<String, DataType> columnTypes2) {
this.plan = plan;
this.data1 = data1;
this.data2 = data2;
this.columnTypes1 = columnTypes1;
this.columnTypes2 = columnTypes2;
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package com.consilens.core.algorithm;

import com.consilens.common.enums.ChecksumAlgorithm;
import com.consilens.common.enums.LocalCompareMode;
import com.consilens.core.database.adpter.DatabaseAdapter;
import com.consilens.core.database.adpter.DatabaseAdapter.RowMapper;
import com.consilens.core.database.connection.ConnectionPool;
import com.consilens.core.diff.DiffResult;
import com.consilens.core.diff.DiffResult.InfoTreeNode;
import com.consilens.core.thread.ConcurrencyConfig;
import com.consilens.core.thread.ExecutorProvider;
import com.consilens.connector.api.model.TablePath;
import com.consilens.connector.api.model.PoolConfiguration;
Expand Down Expand Up @@ -180,6 +182,46 @@ void testLargeTableChunkedProcessing() throws Exception {
verify(mockAdapter1, atLeast(2)).countAndChecksum(any(TableSegment.class));
verify(mockAdapter2, atLeast(2)).countAndChecksum(any(TableSegment.class));
}

@Test
@DisplayName("测试 row-hash 模式能够在键被标准化后仍然取回差异行")
void testRowHashComparisonShouldResolveRowsUsingNormalizedKeys() throws Exception {
differ.close();
differ = new ChecksumDiffer(new TableDiffer.DifferConfig(
4,
1000,
false,
ChecksumAlgorithm.CONCAT,
LocalCompareMode.ROW_HASH,
ConcurrencyConfig.defaultConfig()));

lenient().when(mockAdapter1.countAndBounds(any(TableSegment.class)))
.thenReturn(new ChecksumResult(1, null, Arrays.asList(0L), Arrays.asList(1L)));
lenient().when(mockAdapter2.countAndBounds(any(TableSegment.class)))
.thenReturn(new ChecksumResult(1, null, Arrays.asList(0L), Arrays.asList(1L)));
lenient().when(mockAdapter1.countAndChecksum(any(TableSegment.class)))
.thenReturn(new ChecksumResult(1, "source-checksum", Arrays.asList(0L), Arrays.asList(1L)));
lenient().when(mockAdapter2.countAndChecksum(any(TableSegment.class)))
.thenReturn(new ChecksumResult(1, "target-checksum", Arrays.asList(0L), Arrays.asList(1L)));

when(mockAdapter1.querySegmentRowHashes(any(TableSegment.class)))
.thenReturn(Map.of(List.of(1), "source-row-hash"));
when(mockAdapter2.querySegmentRowHashes(any(TableSegment.class)))
.thenReturn(Map.of(List.of(1), "target-row-hash"));

when(mockAdapter1.querySegmentByKeys(any(TableSegment.class), anySet()))
.thenReturn(Collections.singletonList(
new Object[] {"1", "user_00001", "1380000002"}));
when(mockAdapter2.querySegmentByKeys(any(TableSegment.class), anySet()))
.thenReturn(Collections.singletonList(
new Object[] {"1", "user_00001", "1380000001"}));

DiffResult result = differ.diffTables(segment1, segment2).get();

assertNotNull(result);
assertEquals(1, result.getDifferences().size());
assertEquals(List.of("value"), result.getDifferences().get(0).getChangedColumns1());
}
}

@Nested
Expand Down
Loading
Loading