Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ public CliDiffResult performDiff(CliConfiguration config) throws Exception {

try {
lifecycle.onDiffStart(diffContext);
CompareRuntime runtime = new DefaultCompareRuntime();
CompareRuntime runtime = createCompareRuntime();
DiffResult coreResult = runtime.execute(toCompareRequest(config));

publishDifferences(coreResult, lifecycle, diffContext);
lifecycle.onDiffComplete(coreResult, diffContext);

// Convert core result to CLI result
Expand Down Expand Up @@ -99,15 +100,19 @@ public CliDiffResult performDiff(CliConfiguration config) throws Exception {
}
}

private DiffLifecycle buildLifecycle(CliConfiguration config) {
protected CompareRuntime createCompareRuntime() {
return new DefaultCompareRuntime();
}

protected DiffLifecycle buildLifecycle(CliConfiguration config) {
ResultConfig resultConfig = config.getResult();
if (resultConfig == null || resultConfig.getSinks() == null || resultConfig.getSinks().isEmpty()) {
return new NoopDiffLifecycle();
}
return new DefaultDiffLifecycle(resultConfig);
}

private DiffContext buildDiffContext(CliConfiguration config) {
protected DiffContext buildDiffContext(CliConfiguration config) {
TablePath sourcePath = null;
TablePath targetPath = null;
List<String> sourceColumnNames = new ArrayList<>();
Expand Down Expand Up @@ -152,6 +157,13 @@ private DiffContext buildDiffContext(CliConfiguration config) {
.build();
}

private void publishDifferences(DiffResult coreResult, DiffLifecycle lifecycle, DiffContext diffContext) throws Exception {
if (coreResult == null || coreResult.getDifferences() == null || coreResult.getDifferences().isEmpty()) {
return;
}
lifecycle.onDifferencesFound(coreResult.getDifferences(), diffContext);
}

/**
* Convert core DiffResult to CLI DiffResult.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
package com.consilens.cli.service;

import com.consilens.cli.model.CliConfiguration;
import com.consilens.cli.model.ComparisonConfig;
import com.consilens.cli.model.ConnectionConfig;
import com.consilens.cli.model.ListPairConfig;
import com.consilens.cli.model.StrategyConfig;
import com.consilens.cli.model.StringPairConfig;
import com.consilens.connector.api.planner.CompareRequest;
import com.consilens.connector.api.model.TablePath;
import com.consilens.core.compare.CompareRuntime;
import com.consilens.core.diff.DiffResult;
import com.consilens.core.diff.DiffRow;
import com.consilens.core.lifecycle.DiffContext;
import com.consilens.core.lifecycle.DiffLifecycle;
import com.consilens.core.lifecycle.SegmentResult;
import org.junit.jupiter.api.Test;

import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;

import static org.junit.jupiter.api.Assertions.assertEquals;

class DiffServiceTest {

@Test
void shouldPublishDifferencesBeforeWritingFinalResult() throws Exception {
DiffRow diffRow = DiffRow.modified(
List.of(1),
List.of("alice"),
List.of("bob"),
List.of("name"),
List.of("name"),
List.of("name"),
List.of("name"));
DiffResult coreResult = DiffResult.builder()
.differences(List.of(diffRow))
.statistics(DiffResult.DiffStatistics.builder()
.sourceRowCount(1)
.targetRowCount(1)
.mismatchCount(1)
.totalDifferences(1)
.build())
.infoTree(Optional.empty())
.completedAt(Instant.now())
.metadata(new HashMap<>())
.sourceTablePath(TablePath.of("src.users"))
.targetTablePath(TablePath.of("tgt.users"))
.build();

RecordingLifecycle lifecycle = new RecordingLifecycle();
DiffContext context = DiffContext.builder().taskId("task-1").build();
DiffService service = new TestableDiffService(new StubCompareRuntime(coreResult), lifecycle, context);

service.performDiff(minimalConfig());

assertEquals(List.of("start", "diffs", "complete", "close"), lifecycle.events);
assertEquals(List.of(diffRow), lifecycle.publishedDifferences);
assertEquals(coreResult, lifecycle.completedResult);
}

private CliConfiguration minimalConfig() {
return CliConfiguration.builder()
.source(ConnectionConfig.builder()
.type("mysql")
.name("source")
.connection(ConnectionConfig.ConnectorConnectionProperties.builder()
.url("jdbc:mysql://localhost:3306/source_db")
.username("user")
.password("pwd")
.build())
.build())
.target(ConnectionConfig.builder()
.type("mysql")
.name("target")
.connection(ConnectionConfig.ConnectorConnectionProperties.builder()
.url("jdbc:mysql://localhost:3306/target_db")
.username("user")
.password("pwd")
.build())
.build())
.comparison(ComparisonConfig.builder()
.tables(StringPairConfig.builder()
.source("src.users")
.target("tgt.users")
.build())
.keys(ListPairConfig.builder()
.source(List.of("id"))
.target(List.of("id"))
.build())
.build())
.strategy(StrategyConfig.builder()
.mode("checksum")
.algorithm("concat")
.build())
.build();
}

private static final class TestableDiffService extends DiffService {
private final CompareRuntime runtime;
private final DiffLifecycle lifecycle;
private final DiffContext context;

private TestableDiffService(CompareRuntime runtime, DiffLifecycle lifecycle, DiffContext context) {
this.runtime = runtime;
this.lifecycle = lifecycle;
this.context = context;
}

@Override
protected CompareRuntime createCompareRuntime() {
return runtime;
}

@Override
protected DiffLifecycle buildLifecycle(CliConfiguration config) {
return lifecycle;
}

@Override
protected DiffContext buildDiffContext(CliConfiguration config) {
return context;
}
}

private static final class StubCompareRuntime implements CompareRuntime {
private final DiffResult result;

private StubCompareRuntime(DiffResult result) {
this.result = result;
}

@Override
public DiffResult execute(CompareRequest request) {
return result;
}
}

private static final class RecordingLifecycle implements DiffLifecycle {
private final List<String> events = new ArrayList<>();
private final List<DiffRow> publishedDifferences = new ArrayList<>();
private DiffResult completedResult;

@Override
public void onDiffStart(DiffContext context) {
events.add("start");
}

@Override
public void onSegmentComplete(SegmentResult result) {
// no-op
}

@Override
public void onDifferencesFound(List<DiffRow> diffs, DiffContext context) {
events.add("diffs");
publishedDifferences.addAll(diffs);
}

@Override
public void onDiffComplete(DiffResult result, DiffContext context) {
events.add("complete");
completedResult = result;
}

@Override
public void onDiffError(DiffContext context, Throwable error) {
// no-op
}

@Override
public void close() {
events.add("close");
}
}
}
Loading