From 7d9b2e614f00fbd51da98a6f78b4bf8804d87357 Mon Sep 17 00:00:00 2001 From: xiaoyao0120 Date: Wed, 29 Apr 2026 15:07:20 +0800 Subject: [PATCH] fix: resolve output error --- .../consilens/cli/service/DiffService.java | 18 +- .../cli/service/DiffServiceTest.java | 179 ++++++++++++++++++ 2 files changed, 194 insertions(+), 3 deletions(-) create mode 100644 consilens-cli/src/test/java/com/consilens/cli/service/DiffServiceTest.java diff --git a/consilens-cli/src/main/java/com/consilens/cli/service/DiffService.java b/consilens-cli/src/main/java/com/consilens/cli/service/DiffService.java index 8605d24..7e356ba 100644 --- a/consilens-cli/src/main/java/com/consilens/cli/service/DiffService.java +++ b/consilens-cli/src/main/java/com/consilens/cli/service/DiffService.java @@ -63,9 +63,10 @@ public CliDiffResult performDiff(CliConfiguration config) throws Exception { try { lifecycle.onDiffStart(diffContext); - CompareRuntime runtime = new DefaultCompareRuntime(); + CompareRuntime runtime = createCompareRuntime(); DiffResult coreResult = runtime.execute(toCompareRequest(config)); + publishDifferences(coreResult, lifecycle, diffContext); lifecycle.onDiffComplete(coreResult, diffContext); // Convert core result to CLI result @@ -99,7 +100,11 @@ public CliDiffResult performDiff(CliConfiguration config) throws Exception { } } - private DiffLifecycle buildLifecycle(CliConfiguration config) { + protected CompareRuntime createCompareRuntime() { + return new DefaultCompareRuntime(); + } + + protected DiffLifecycle buildLifecycle(CliConfiguration config) { ResultConfig resultConfig = config.getResult(); if (resultConfig == null || resultConfig.getSinks() == null || resultConfig.getSinks().isEmpty()) { return new NoopDiffLifecycle(); @@ -107,7 +112,7 @@ private DiffLifecycle buildLifecycle(CliConfiguration config) { return new DefaultDiffLifecycle(resultConfig); } - private DiffContext buildDiffContext(CliConfiguration config) { + protected DiffContext buildDiffContext(CliConfiguration config) { TablePath sourcePath = null; TablePath targetPath = null; List sourceColumnNames = new ArrayList<>(); @@ -152,6 +157,13 @@ private DiffContext buildDiffContext(CliConfiguration config) { .build(); } + private void publishDifferences(DiffResult coreResult, DiffLifecycle lifecycle, DiffContext diffContext) throws Exception { + if (coreResult == null || coreResult.getDifferences() == null || coreResult.getDifferences().isEmpty()) { + return; + } + lifecycle.onDifferencesFound(coreResult.getDifferences(), diffContext); + } + /** * Convert core DiffResult to CLI DiffResult. */ diff --git a/consilens-cli/src/test/java/com/consilens/cli/service/DiffServiceTest.java b/consilens-cli/src/test/java/com/consilens/cli/service/DiffServiceTest.java new file mode 100644 index 0000000..7f3deb9 --- /dev/null +++ b/consilens-cli/src/test/java/com/consilens/cli/service/DiffServiceTest.java @@ -0,0 +1,179 @@ +package com.consilens.cli.service; + +import com.consilens.cli.model.CliConfiguration; +import com.consilens.cli.model.ComparisonConfig; +import com.consilens.cli.model.ConnectionConfig; +import com.consilens.cli.model.ListPairConfig; +import com.consilens.cli.model.StrategyConfig; +import com.consilens.cli.model.StringPairConfig; +import com.consilens.connector.api.planner.CompareRequest; +import com.consilens.connector.api.model.TablePath; +import com.consilens.core.compare.CompareRuntime; +import com.consilens.core.diff.DiffResult; +import com.consilens.core.diff.DiffRow; +import com.consilens.core.lifecycle.DiffContext; +import com.consilens.core.lifecycle.DiffLifecycle; +import com.consilens.core.lifecycle.SegmentResult; +import org.junit.jupiter.api.Test; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class DiffServiceTest { + + @Test + void shouldPublishDifferencesBeforeWritingFinalResult() throws Exception { + DiffRow diffRow = DiffRow.modified( + List.of(1), + List.of("alice"), + List.of("bob"), + List.of("name"), + List.of("name"), + List.of("name"), + List.of("name")); + DiffResult coreResult = DiffResult.builder() + .differences(List.of(diffRow)) + .statistics(DiffResult.DiffStatistics.builder() + .sourceRowCount(1) + .targetRowCount(1) + .mismatchCount(1) + .totalDifferences(1) + .build()) + .infoTree(Optional.empty()) + .completedAt(Instant.now()) + .metadata(new HashMap<>()) + .sourceTablePath(TablePath.of("src.users")) + .targetTablePath(TablePath.of("tgt.users")) + .build(); + + RecordingLifecycle lifecycle = new RecordingLifecycle(); + DiffContext context = DiffContext.builder().taskId("task-1").build(); + DiffService service = new TestableDiffService(new StubCompareRuntime(coreResult), lifecycle, context); + + service.performDiff(minimalConfig()); + + assertEquals(List.of("start", "diffs", "complete", "close"), lifecycle.events); + assertEquals(List.of(diffRow), lifecycle.publishedDifferences); + assertEquals(coreResult, lifecycle.completedResult); + } + + private CliConfiguration minimalConfig() { + return CliConfiguration.builder() + .source(ConnectionConfig.builder() + .type("mysql") + .name("source") + .connection(ConnectionConfig.ConnectorConnectionProperties.builder() + .url("jdbc:mysql://localhost:3306/source_db") + .username("user") + .password("pwd") + .build()) + .build()) + .target(ConnectionConfig.builder() + .type("mysql") + .name("target") + .connection(ConnectionConfig.ConnectorConnectionProperties.builder() + .url("jdbc:mysql://localhost:3306/target_db") + .username("user") + .password("pwd") + .build()) + .build()) + .comparison(ComparisonConfig.builder() + .tables(StringPairConfig.builder() + .source("src.users") + .target("tgt.users") + .build()) + .keys(ListPairConfig.builder() + .source(List.of("id")) + .target(List.of("id")) + .build()) + .build()) + .strategy(StrategyConfig.builder() + .mode("checksum") + .algorithm("concat") + .build()) + .build(); + } + + private static final class TestableDiffService extends DiffService { + private final CompareRuntime runtime; + private final DiffLifecycle lifecycle; + private final DiffContext context; + + private TestableDiffService(CompareRuntime runtime, DiffLifecycle lifecycle, DiffContext context) { + this.runtime = runtime; + this.lifecycle = lifecycle; + this.context = context; + } + + @Override + protected CompareRuntime createCompareRuntime() { + return runtime; + } + + @Override + protected DiffLifecycle buildLifecycle(CliConfiguration config) { + return lifecycle; + } + + @Override + protected DiffContext buildDiffContext(CliConfiguration config) { + return context; + } + } + + private static final class StubCompareRuntime implements CompareRuntime { + private final DiffResult result; + + private StubCompareRuntime(DiffResult result) { + this.result = result; + } + + @Override + public DiffResult execute(CompareRequest request) { + return result; + } + } + + private static final class RecordingLifecycle implements DiffLifecycle { + private final List events = new ArrayList<>(); + private final List publishedDifferences = new ArrayList<>(); + private DiffResult completedResult; + + @Override + public void onDiffStart(DiffContext context) { + events.add("start"); + } + + @Override + public void onSegmentComplete(SegmentResult result) { + // no-op + } + + @Override + public void onDifferencesFound(List diffs, DiffContext context) { + events.add("diffs"); + publishedDifferences.addAll(diffs); + } + + @Override + public void onDiffComplete(DiffResult result, DiffContext context) { + events.add("complete"); + completedResult = result; + } + + @Override + public void onDiffError(DiffContext context, Throwable error) { + // no-op + } + + @Override + public void close() { + events.add("close"); + } + } +}