diff --git a/apps/optimizer-analyzer/build.gradle b/apps/optimizer-analyzer/build.gradle new file mode 100644 index 000000000..f66ecc608 --- /dev/null +++ b/apps/optimizer-analyzer/build.gradle @@ -0,0 +1,20 @@ +plugins { + id 'openhouse.springboot-ext-conventions' + id 'org.springframework.boot' version '2.7.8' +} + +dependencies { + implementation project(':services:optimizer') + implementation 'org.springframework.boot:spring-boot-starter:2.7.8' + implementation 'org.springframework.boot:spring-boot-starter-webflux:2.7.8' + implementation 'org.springframework.boot:spring-boot-starter-data-jpa:2.7.8' + implementation 'org.springframework.boot:spring-boot-starter-aop:2.7.8' + runtimeOnly 'mysql:mysql-connector-java:8.0.33' + testImplementation 'org.springframework.boot:spring-boot-starter-test:2.7.8' + testImplementation 'com.squareup.okhttp3:mockwebserver:4.10.0' + testRuntimeOnly 'com.h2database:h2' +} + +test { + useJUnitPlatform() +} diff --git a/apps/optimizer-analyzer/src/main/java/com/linkedin/openhouse/analyzer/AnalyzerApplication.java b/apps/optimizer-analyzer/src/main/java/com/linkedin/openhouse/analyzer/AnalyzerApplication.java new file mode 100644 index 000000000..1b6250c5e --- /dev/null +++ b/apps/optimizer-analyzer/src/main/java/com/linkedin/openhouse/analyzer/AnalyzerApplication.java @@ -0,0 +1,29 @@ +package com.linkedin.openhouse.analyzer; + +import java.util.List; +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.domain.EntityScan; +import org.springframework.context.annotation.Bean; +import org.springframework.data.jpa.repository.config.EnableJpaRepositories; + +/** Entry point for the Optimizer Analyzer application. */ +@SpringBootApplication +@EntityScan(basePackages = "com.linkedin.openhouse.optimizer.db") +@EnableJpaRepositories(basePackages = "com.linkedin.openhouse.optimizer.repository") +public class AnalyzerApplication { + + public static void main(String[] args) { + SpringApplication.run(AnalyzerApplication.class, args); + } + + /** + * Runs the analyzer once per registered {@link OperationAnalyzer} per process invocation. Each + * call is scoped to one operation type; the runner iterates databases internally. + */ + @Bean + public CommandLineRunner run(AnalyzerRunner runner, List analyzers) { + return args -> analyzers.forEach(a -> runner.analyze(a.getOperationType())); + } +} diff --git a/apps/optimizer-analyzer/src/main/java/com/linkedin/openhouse/analyzer/AnalyzerRunner.java b/apps/optimizer-analyzer/src/main/java/com/linkedin/openhouse/analyzer/AnalyzerRunner.java new file mode 100644 index 000000000..313e41514 --- /dev/null +++ b/apps/optimizer-analyzer/src/main/java/com/linkedin/openhouse/analyzer/AnalyzerRunner.java @@ -0,0 +1,164 @@ +package com.linkedin.openhouse.analyzer; + +import com.linkedin.openhouse.optimizer.model.OperationTypeDto; +import com.linkedin.openhouse.optimizer.model.TableDto; +import com.linkedin.openhouse.optimizer.model.TableOperationDto; +import com.linkedin.openhouse.optimizer.model.TableOperationsHistoryDto; +import com.linkedin.openhouse.optimizer.repository.TableOperationsHistoryRepository; +import com.linkedin.openhouse.optimizer.repository.TableOperationsRepository; +import com.linkedin.openhouse.optimizer.repository.TableStatsRepository; +import java.time.Instant; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.data.domain.PageRequest; +import org.springframework.stereotype.Component; + +/** + * Core analysis loop. For one operation type per call, iterates databases and evaluates each table + * in a database against the matching {@link OperationAnalyzer}. + * + *

Both sides of the join — current operations and latest history per (table, type) — are loaded + * into maps once per database before the table loop. This is correct at small scale (≤~100k + * tables); past that the per-db query shape and projection need further tuning. Scale-up work is + * tracked in BDP-102182. + * + *

// TODO(scale-test): benchmark the per-db working set at up to 10k tables and measure JVM heap + * residency for the three intermediate maps; per-db iteration bounds memory by tables-per-db rather + * than tables-total, but the upper bound still needs empirical validation. + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class AnalyzerRunner { + + private final List analyzers; + private final TableStatsRepository statsRepo; + private final TableOperationsRepository operationsRepo; + private final TableOperationsHistoryRepository historyRepo; + + @Value("${optimizer.repo.default-limit:10000}") + private int defaultLimit = 10_000; + + /** + * Run the analysis loop for {@code operationType} across all databases, with no filters. + * Equivalent to {@link #analyze(OperationTypeDto, Optional, Optional, Optional)} with all-empty + * filters. + */ + public void analyze(OperationTypeDto operationType) { + analyze(operationType, Optional.empty(), Optional.empty(), Optional.empty()); + } + + /** + * Run the analysis loop for the given operation type, optionally scoped to a single database, + * table name, or table UUID. Iterates databases one at a time so the working set is bounded by + * tables-per-db, not tables-total. + */ + public void analyze( + OperationTypeDto operationType, + Optional databaseName, + Optional tableName, + Optional tableUuid) { + OperationAnalyzer analyzer = + analyzers.stream() + .filter(a -> a.getOperationType() == operationType) + .findFirst() + .orElseThrow( + () -> + new IllegalStateException( + "No analyzer registered for operation type " + operationType)); + List dbs = databaseName.map(List::of).orElseGet(statsRepo::findDistinctDatabaseNames); + log.info("Analyzing {} across {} database(s)", operationType, dbs.size()); + dbs.forEach(db -> analyzeDatabase(analyzer, db, tableName, tableUuid)); + log.info("Analysis complete for {}", operationType); + } + + private void analyzeDatabase( + OperationAnalyzer analyzer, + String databaseName, + Optional tableName, + Optional tableUuid) { + + com.linkedin.openhouse.optimizer.db.OperationType dbOperationType = + analyzer.getOperationType().toDb(); + + // Pre-load the small sides of the joins — bounded by tables in this database. + PageRequest page = PageRequest.of(0, defaultLimit); + Map currentOps = + operationsRepo + .find( + Optional.of(dbOperationType), + Optional.empty(), + tableUuid, + Optional.of(databaseName), + tableName, + Optional.empty(), + Optional.empty(), + page) + .stream() + .filter(e -> e.getTableUuid() != null) + .map(TableOperationDto::fromRow) + .collect( + Collectors.toMap( + TableOperationDto::getTableUuid, op -> op, TableOperationDto::mostRecent)); + + Map latestHistory = + historyRepo.findLatest(dbOperationType, page).stream() + .filter(r -> r.getTableUuid() != null) + .map(TableOperationsHistoryDto::fromRow) + .collect( + Collectors.toMap( + TableOperationsHistoryDto::getTableUuid, + h -> h, + AnalyzerRunner::moreRecentHistory)); + + List tables = + statsRepo.find(Optional.of(databaseName), tableName, tableUuid, page).stream() + .filter(row -> row.getTableUuid() != null) + .map(TableDto::fromRow) + .collect(Collectors.toList()); + + /* + * For each table in this database, decide whether to create a new PENDING operation. + * + * 1. Skip tables not opted in to this operation type. The opt-in check today reads a + * table-property flag; in the future it will read a denormalized column. + * 2. Look up the table's current active operation (if any) and its most recent completed + * history entry from the maps loaded above. + * 3. Delegate the schedule-or-not decision to the analyzer's shouldSchedule — strategy + * encapsulates cadence, retry policy, and any future per-operation signals. + * 4. On true, persist a new PENDING operation. The scheduler picks it up on its next pass. + */ + tables.forEach( + table -> { + if (!analyzer.isEnabled(table)) { + return; + } + Optional currentOp = + Optional.ofNullable(currentOps.get(table.getTableUuid())); + Optional entry = + Optional.ofNullable(latestHistory.get(table.getTableUuid())); + if (analyzer.shouldSchedule(table, currentOp, entry)) { + TableOperationDto op = TableOperationDto.pending(table, analyzer.getOperationType()); + operationsRepo.save(op.toRow()); + log.info( + "Created PENDING {} operation for table {}.{}", + analyzer.getOperationType(), + table.getDatabaseName(), + table.getTableId()); + } + }); + } + + private static TableOperationsHistoryDto moreRecentHistory( + TableOperationsHistoryDto a, TableOperationsHistoryDto b) { + Comparator byCompletedAt = + Comparator.comparing(r -> r.getCompletedAt() != null ? r.getCompletedAt() : Instant.EPOCH); + return byCompletedAt.compare(a, b) >= 0 ? a : b; + } +} diff --git a/apps/optimizer-analyzer/src/main/java/com/linkedin/openhouse/analyzer/CadenceBasedOrphanFilesDeletionAnalyzer.java b/apps/optimizer-analyzer/src/main/java/com/linkedin/openhouse/analyzer/CadenceBasedOrphanFilesDeletionAnalyzer.java new file mode 100644 index 000000000..302669dbe --- /dev/null +++ b/apps/optimizer-analyzer/src/main/java/com/linkedin/openhouse/analyzer/CadenceBasedOrphanFilesDeletionAnalyzer.java @@ -0,0 +1,51 @@ +package com.linkedin.openhouse.analyzer; + +import com.linkedin.openhouse.optimizer.model.OperationTypeDto; +import com.linkedin.openhouse.optimizer.model.TableDto; +import com.linkedin.openhouse.optimizer.model.TableOperationDto; +import com.linkedin.openhouse.optimizer.model.TableOperationsHistoryDto; +import java.time.Duration; +import java.util.Optional; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +/** Analyzer for the {@link OperationTypeDto#ORPHAN_FILES_DELETION} operation type. */ +@Component +public class CadenceBasedOrphanFilesDeletionAnalyzer implements OperationAnalyzer { + + static final String OFD_ENABLED_PROPERTY = "maintenance.optimizer.ofd.enabled"; + + private final CadencePolicy cadencePolicy; + + @Autowired + public CadenceBasedOrphanFilesDeletionAnalyzer( + @Value("${ofd.success-retry-hours:24}") long successRetryHours, + @Value("${ofd.failure-retry-hours:1}") long failureRetryHours) { + this.cadencePolicy = + new CadencePolicy(Duration.ofHours(successRetryHours), Duration.ofHours(failureRetryHours)); + } + + /** Package-private for tests that supply a pre-built {@link CadencePolicy}. */ + CadenceBasedOrphanFilesDeletionAnalyzer(CadencePolicy cadencePolicy) { + this.cadencePolicy = cadencePolicy; + } + + @Override + public OperationTypeDto getOperationType() { + return OperationTypeDto.ORPHAN_FILES_DELETION; + } + + @Override + public boolean isEnabled(TableDto table) { + return "true".equals(table.getTableProperties().get(OFD_ENABLED_PROPERTY)); + } + + @Override + public boolean shouldSchedule( + TableDto table, + Optional currentOp, + Optional latestHistory) { + return cadencePolicy.shouldSchedule(currentOp, latestHistory); + } +} diff --git a/apps/optimizer-analyzer/src/main/java/com/linkedin/openhouse/analyzer/CadencePolicy.java b/apps/optimizer-analyzer/src/main/java/com/linkedin/openhouse/analyzer/CadencePolicy.java new file mode 100644 index 000000000..b4461e6c6 --- /dev/null +++ b/apps/optimizer-analyzer/src/main/java/com/linkedin/openhouse/analyzer/CadencePolicy.java @@ -0,0 +1,57 @@ +package com.linkedin.openhouse.analyzer; + +import com.linkedin.openhouse.optimizer.model.HistoryStatusDto; +import com.linkedin.openhouse.optimizer.model.OperationStatusDto; +import com.linkedin.openhouse.optimizer.model.TableOperationDto; +import com.linkedin.openhouse.optimizer.model.TableOperationsHistoryDto; +import java.time.Duration; +import java.time.Instant; +import java.util.Optional; +import lombok.RequiredArgsConstructor; + +/** + * Time-based scheduling policy. An analyzer delegates to {@link CadencePolicy} to decide whether to + * re-issue a recommendation for a table. + * + *

The analyzer stays out of any table that already has a non-CANCELED active operation — those + * belong to the scheduler. For tables with no active operation (or only a CANCELED one), the + * decision is based on the most recent completed-history entry: re-evaluate after {@code + * successRetryInterval} on success, or after {@code failureRetryInterval} on failure. + */ +@RequiredArgsConstructor +public class CadencePolicy { + + /** + * How long to wait after a successful operation before re-evaluating the table. For example, if + * set to 24 hours and OFD succeeded at 10:00 AM Monday, the table won't be scheduled again until + * after 10:00 AM Tuesday. + */ + private final Duration successRetryInterval; + + /** + * How long to wait after a failed operation before retrying. Shorter than the success interval to + * allow quick recovery. For example, if set to 1 hour and OFD failed at 2:00 PM, the table + * becomes eligible for retry at 3:00 PM. + */ + private final Duration failureRetryInterval; + + /** + * Returns {@code true} if a new or refreshed operation record should be upserted. + * + * @param currentOp the existing active operation record, or empty if none exists + * @param latestHistory the most recent history entry for this (table, type), or empty + */ + public boolean shouldSchedule( + Optional currentOp, Optional latestHistory) { + if (currentOp.isPresent() && currentOp.get().getStatus() != OperationStatusDto.CANCELED) { + return false; + } + return latestHistory.map(this::readyAfterHistoryEntry).orElse(true); + } + + private boolean readyAfterHistoryEntry(TableOperationsHistoryDto entry) { + Duration interval = + entry.getStatus() == HistoryStatusDto.FAILED ? failureRetryInterval : successRetryInterval; + return Duration.between(entry.getCompletedAt(), Instant.now()).compareTo(interval) > 0; + } +} diff --git a/apps/optimizer-analyzer/src/main/java/com/linkedin/openhouse/analyzer/OperationAnalyzer.java b/apps/optimizer-analyzer/src/main/java/com/linkedin/openhouse/analyzer/OperationAnalyzer.java new file mode 100644 index 000000000..ab69386e4 --- /dev/null +++ b/apps/optimizer-analyzer/src/main/java/com/linkedin/openhouse/analyzer/OperationAnalyzer.java @@ -0,0 +1,41 @@ +package com.linkedin.openhouse.analyzer; + +import com.linkedin.openhouse.optimizer.model.OperationTypeDto; +import com.linkedin.openhouse.optimizer.model.TableDto; +import com.linkedin.openhouse.optimizer.model.TableOperationDto; +import com.linkedin.openhouse.optimizer.model.TableOperationsHistoryDto; +import java.util.Optional; + +/** + * Strategy interface for a single operation type. Each implementation decides whether a given table + * needs an operation recommendation upserted in the Optimizer Service. + * + *

// TODO(circuit-breaker): a chronically-failing table currently produces a new PENDING row on + * every Analyzer pass. Add a circuit breaker that suppresses scheduling for a (table, type) after N + * consecutive FAILED history entries. Requirements: configurable threshold per operation type, + * automatic reset via exponential backoff so tables can recover, and an operator-visible signal + * (metric or query) so tripped breakers are diagnosable. + */ +public interface OperationAnalyzer { + + /** The operation type this analyzer handles. */ + OperationTypeDto getOperationType(); + + /** + * Returns {@code true} if this operation is opted-in for the given table. Tables that return + * {@code false} are skipped entirely — no upsert is issued. + */ + boolean isEnabled(TableDto table); + + /** + * Returns {@code true} if a new or refreshed operation record should be upserted. + * + * @param table the table entry + * @param currentOp the existing active operation record, or empty if none exists + * @param latestHistory the most recent history entry for this (table, type), or empty + */ + boolean shouldSchedule( + TableDto table, + Optional currentOp, + Optional latestHistory); +} diff --git a/apps/optimizer-analyzer/src/main/resources/application.properties b/apps/optimizer-analyzer/src/main/resources/application.properties new file mode 100644 index 000000000..4ee825c55 --- /dev/null +++ b/apps/optimizer-analyzer/src/main/resources/application.properties @@ -0,0 +1,9 @@ +spring.application.name=openhouse-optimizer-analyzer +spring.main.web-application-type=none +spring.datasource.url=${OPTIMIZER_DB_URL:jdbc:h2:mem:analyzerdb;DB_CLOSE_DELAY=-1;MODE=MySQL} +spring.datasource.username=${OPTIMIZER_DB_USER:sa} +spring.datasource.password=${OPTIMIZER_DB_PASSWORD:} +spring.jpa.hibernate.ddl-auto=none +ofd.success-retry-hours=24 +ofd.failure-retry-hours=1 +optimizer.repo.default-limit=10000 diff --git a/apps/optimizer-analyzer/src/test/java/com/linkedin/openhouse/analyzer/AnalyzerRunnerTest.java b/apps/optimizer-analyzer/src/test/java/com/linkedin/openhouse/analyzer/AnalyzerRunnerTest.java new file mode 100644 index 000000000..546279d64 --- /dev/null +++ b/apps/optimizer-analyzer/src/test/java/com/linkedin/openhouse/analyzer/AnalyzerRunnerTest.java @@ -0,0 +1,218 @@ +package com.linkedin.openhouse.analyzer; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.linkedin.openhouse.optimizer.db.TableOperationsRow; +import com.linkedin.openhouse.optimizer.db.TableStatsRow; +import com.linkedin.openhouse.optimizer.model.OperationTypeDto; +import com.linkedin.openhouse.optimizer.model.TableDto; +import com.linkedin.openhouse.optimizer.model.TableOperationDto; +import com.linkedin.openhouse.optimizer.repository.TableOperationsHistoryRepository; +import com.linkedin.openhouse.optimizer.repository.TableOperationsRepository; +import com.linkedin.openhouse.optimizer.repository.TableStatsRepository; +import java.time.Instant; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class AnalyzerRunnerTest { + + private static final OperationTypeDto OFD_TYPE = OperationTypeDto.ORPHAN_FILES_DELETION; + private static final com.linkedin.openhouse.optimizer.db.OperationType OFD_DB = + com.linkedin.openhouse.optimizer.db.OperationType.ORPHAN_FILES_DELETION; + private static final String DB = "db1"; + + @Mock private TableStatsRepository statsRepo; + @Mock private TableOperationsRepository operationsRepo; + @Mock private TableOperationsHistoryRepository historyRepo; + @Mock private OperationAnalyzer analyzer; + + private AnalyzerRunner runner; + + @BeforeEach + void setUp() { + runner = new AnalyzerRunner(List.of(analyzer), statsRepo, operationsRepo, historyRepo); + when(analyzer.getOperationType()).thenReturn(OFD_TYPE); + when(statsRepo.findDistinctDatabaseNames()).thenReturn(List.of(DB)); + } + + @Test + void analyze_insertsNewRow_forEligibleTableWithNoExistingOp() { + TableStatsRow statsEntity = + TableStatsRow.builder().tableUuid("uuid-1").databaseName(DB).tableName("tbl1").build(); + + TableDto expectedTable = TableDto.fromRow(statsEntity); + + when(statsRepo.find(eq(Optional.of(DB)), eq(Optional.empty()), eq(Optional.empty()), any())) + .thenReturn(List.of(statsEntity)); + when(operationsRepo.find( + eq(Optional.of(OFD_DB)), + eq(Optional.empty()), + eq(Optional.empty()), + eq(Optional.of(DB)), + eq(Optional.empty()), + eq(Optional.empty()), + eq(Optional.empty()), + any())) + .thenReturn(Collections.emptyList()); + when(historyRepo.findLatest(eq(OFD_DB), any())).thenReturn(Collections.emptyList()); + when(analyzer.isEnabled(expectedTable)).thenReturn(true); + when(analyzer.shouldSchedule(expectedTable, Optional.empty(), Optional.empty())) + .thenReturn(true); + + runner.analyze(OFD_TYPE); + + ArgumentCaptor captor = ArgumentCaptor.forClass(TableOperationsRow.class); + verify(operationsRepo).save(captor.capture()); + TableOperationsRow saved = captor.getValue(); + assertThat(saved.getTableUuid()).isEqualTo("uuid-1"); + assertThat(saved.getDatabaseName()).isEqualTo(DB); + assertThat(saved.getTableName()).isEqualTo("tbl1"); + assertThat(saved.getOperationType()).isEqualTo(OFD_DB); + assertThat(saved.getStatus()) + .isEqualTo(com.linkedin.openhouse.optimizer.db.OperationStatus.PENDING); + assertThat(saved.getId()).isNotNull(); + } + + @Test + void analyze_noOp_whenCadencePolicyReturnsFalseForPending() { + TableStatsRow statsEntity = + TableStatsRow.builder().tableUuid("uuid-1").databaseName(DB).tableName("tbl1").build(); + + TableDto expectedTable = TableDto.fromRow(statsEntity); + + TableOperationsRow existingEntity = + TableOperationsRow.builder() + .id("existing-op-id") + .status(com.linkedin.openhouse.optimizer.db.OperationStatus.PENDING) + .tableUuid("uuid-1") + .operationType(OFD_DB) + .createdAt(Instant.now()) + .build(); + + when(statsRepo.find(eq(Optional.of(DB)), eq(Optional.empty()), eq(Optional.empty()), any())) + .thenReturn(List.of(statsEntity)); + when(operationsRepo.find( + eq(Optional.of(OFD_DB)), + eq(Optional.empty()), + eq(Optional.empty()), + eq(Optional.of(DB)), + eq(Optional.empty()), + eq(Optional.empty()), + eq(Optional.empty()), + any())) + .thenReturn(List.of(existingEntity)); + when(historyRepo.findLatest(eq(OFD_DB), any())).thenReturn(Collections.emptyList()); + when(analyzer.isEnabled(expectedTable)).thenReturn(true); + + TableOperationDto existingOp = TableOperationDto.fromRow(existingEntity); + when(analyzer.shouldSchedule(expectedTable, Optional.of(existingOp), Optional.empty())) + .thenReturn(false); + + runner.analyze(OFD_TYPE); + + verify(operationsRepo, never()).save(any()); + } + + @Test + void analyze_skipsTable_whenNotEnabled() { + TableStatsRow statsEntity = + TableStatsRow.builder().tableUuid("uuid-1").databaseName(DB).build(); + + TableDto expectedTable = TableDto.fromRow(statsEntity); + + when(statsRepo.find(eq(Optional.of(DB)), eq(Optional.empty()), eq(Optional.empty()), any())) + .thenReturn(List.of(statsEntity)); + when(operationsRepo.find( + eq(Optional.of(OFD_DB)), + eq(Optional.empty()), + eq(Optional.empty()), + eq(Optional.of(DB)), + eq(Optional.empty()), + eq(Optional.empty()), + eq(Optional.empty()), + any())) + .thenReturn(Collections.emptyList()); + when(historyRepo.findLatest(eq(OFD_DB), any())).thenReturn(Collections.emptyList()); + when(analyzer.isEnabled(expectedTable)).thenReturn(false); + + runner.analyze(OFD_TYPE); + + verify(operationsRepo, never()).save(any()); + } + + @Test + void analyze_skipsTable_whenShouldScheduleReturnsFalse() { + TableStatsRow statsEntity = + TableStatsRow.builder().tableUuid("uuid-1").databaseName(DB).build(); + + TableDto expectedTable = TableDto.fromRow(statsEntity); + + TableOperationsRow scheduled = + TableOperationsRow.builder() + .id("op-id") + .status(com.linkedin.openhouse.optimizer.db.OperationStatus.SCHEDULED) + .tableUuid("uuid-1") + .operationType(OFD_DB) + .createdAt(Instant.now()) + .build(); + + when(statsRepo.find(eq(Optional.of(DB)), eq(Optional.empty()), eq(Optional.empty()), any())) + .thenReturn(List.of(statsEntity)); + when(operationsRepo.find( + eq(Optional.of(OFD_DB)), + eq(Optional.empty()), + eq(Optional.empty()), + eq(Optional.of(DB)), + eq(Optional.empty()), + eq(Optional.empty()), + eq(Optional.empty()), + any())) + .thenReturn(List.of(scheduled)); + when(historyRepo.findLatest(eq(OFD_DB), any())).thenReturn(Collections.emptyList()); + when(analyzer.isEnabled(expectedTable)).thenReturn(true); + + TableOperationDto scheduledOp = TableOperationDto.fromRow(scheduled); + when(analyzer.shouldSchedule(expectedTable, Optional.of(scheduledOp), Optional.empty())) + .thenReturn(false); + + runner.analyze(OFD_TYPE); + + verify(operationsRepo, never()).save(any()); + } + + @Test + void analyze_skipsTable_whenTableUuidIsNull() { + TableStatsRow statsEntity = TableStatsRow.builder().databaseName(DB).build(); + + when(statsRepo.find(eq(Optional.of(DB)), eq(Optional.empty()), eq(Optional.empty()), any())) + .thenReturn(List.of(statsEntity)); + when(operationsRepo.find( + eq(Optional.of(OFD_DB)), + eq(Optional.empty()), + eq(Optional.empty()), + eq(Optional.of(DB)), + eq(Optional.empty()), + eq(Optional.empty()), + eq(Optional.empty()), + any())) + .thenReturn(Collections.emptyList()); + when(historyRepo.findLatest(any(), any())).thenReturn(Collections.emptyList()); + + runner.analyze(OFD_TYPE); + + verify(operationsRepo, never()).save(any()); + } +} diff --git a/apps/optimizer-analyzer/src/test/java/com/linkedin/openhouse/analyzer/CadenceBasedOrphanFilesDeletionAnalyzerTest.java b/apps/optimizer-analyzer/src/test/java/com/linkedin/openhouse/analyzer/CadenceBasedOrphanFilesDeletionAnalyzerTest.java new file mode 100644 index 000000000..e50bb694d --- /dev/null +++ b/apps/optimizer-analyzer/src/test/java/com/linkedin/openhouse/analyzer/CadenceBasedOrphanFilesDeletionAnalyzerTest.java @@ -0,0 +1,205 @@ +package com.linkedin.openhouse.analyzer; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.linkedin.openhouse.optimizer.model.HistoryStatusDto; +import com.linkedin.openhouse.optimizer.model.OperationStatusDto; +import com.linkedin.openhouse.optimizer.model.TableDto; +import com.linkedin.openhouse.optimizer.model.TableOperationDto; +import com.linkedin.openhouse.optimizer.model.TableOperationsHistoryDto; +import java.time.Duration; +import java.time.Instant; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class CadenceBasedOrphanFilesDeletionAnalyzerTest { + + private static final Duration SUCCESS_INTERVAL = Duration.ofHours(24); + private static final Duration FAILURE_INTERVAL = Duration.ofHours(1); + + private CadenceBasedOrphanFilesDeletionAnalyzer analyzer; + + @BeforeEach + void setUp() { + analyzer = + new CadenceBasedOrphanFilesDeletionAnalyzer( + new CadencePolicy(SUCCESS_INTERVAL, FAILURE_INTERVAL)); + } + + // --- isEnabled --- + + @Test + void isEnabled_returnsTrue_whenPropertySet() { + assertThat(analyzer.isEnabled(tableWithProperty("true"))).isTrue(); + } + + @Test + void isEnabled_returnsFalse_whenPropertyAbsent() { + assertThat(analyzer.isEnabled(tableWithProperty(null))).isFalse(); + } + + @Test + void isEnabled_returnsFalse_whenPropertyFalse() { + assertThat(analyzer.isEnabled(tableWithProperty("false"))).isFalse(); + } + + @Test + void isEnabled_returnsFalse_whenTablePropertiesEmpty() { + TableDto table = TableDto.builder().tableUuid("uuid").build(); + assertThat(analyzer.isEnabled(table)).isFalse(); + } + + // --- shouldSchedule: no existing op --- + + @Test + void shouldSchedule_noOp_noHistory_returnsTrue() { + assertThat( + analyzer.shouldSchedule(tableWithProperty("true"), Optional.empty(), Optional.empty())) + .isTrue(); + } + + @Test + void shouldSchedule_noOp_successHistoryAfterCooldown_returnsTrue() { + Instant longAgo = Instant.now().minus(SUCCESS_INTERVAL).minusSeconds(60); + assertThat( + analyzer.shouldSchedule( + tableWithProperty("true"), + Optional.empty(), + Optional.of(historyWithStatus(HistoryStatusDto.SUCCESS, longAgo)))) + .isTrue(); + } + + @Test + void shouldSchedule_noOp_successHistoryBeforeCooldown_returnsFalse() { + Instant recent = Instant.now().minus(SUCCESS_INTERVAL).plusSeconds(60); + assertThat( + analyzer.shouldSchedule( + tableWithProperty("true"), + Optional.empty(), + Optional.of(historyWithStatus(HistoryStatusDto.SUCCESS, recent)))) + .isFalse(); + } + + @Test + void shouldSchedule_noOp_failedHistoryAfterRetry_returnsTrue() { + Instant longAgo = Instant.now().minus(FAILURE_INTERVAL).minusSeconds(60); + assertThat( + analyzer.shouldSchedule( + tableWithProperty("true"), + Optional.empty(), + Optional.of(historyWithStatus(HistoryStatusDto.FAILED, longAgo)))) + .isTrue(); + } + + @Test + void shouldSchedule_noOp_failedHistoryBeforeRetry_returnsFalse() { + Instant recent = Instant.now().minus(FAILURE_INTERVAL).plusSeconds(60); + assertThat( + analyzer.shouldSchedule( + tableWithProperty("true"), + Optional.empty(), + Optional.of(historyWithStatus(HistoryStatusDto.FAILED, recent)))) + .isFalse(); + } + + // --- shouldSchedule: active op (non-CANCELED) → analyzer stays out --- + + @Test + void shouldSchedule_pending_returnsFalse() { + assertThat( + analyzer.shouldSchedule( + tableWithProperty("true"), + Optional.of(opWithStatus(OperationStatusDto.PENDING)), + Optional.empty())) + .isFalse(); + } + + @Test + void shouldSchedule_scheduling_returnsFalse() { + assertThat( + analyzer.shouldSchedule( + tableWithProperty("true"), + Optional.of(opWithStatus(OperationStatusDto.SCHEDULING)), + Optional.empty())) + .isFalse(); + } + + @Test + void shouldSchedule_scheduled_returnsFalse_regardlessOfHistory() { + Instant historyAt = Instant.now().minus(SUCCESS_INTERVAL).minusSeconds(60); + assertThat( + analyzer.shouldSchedule( + tableWithProperty("true"), + Optional.of(opWithStatus(OperationStatusDto.SCHEDULED)), + Optional.of(historyWithStatus(HistoryStatusDto.SUCCESS, historyAt)))) + .isFalse(); + } + + // --- shouldSchedule: CANCELED → cadence on history --- + + @Test + void shouldSchedule_canceled_successHistoryAfterCooldown_returnsTrue() { + Instant longAgo = Instant.now().minus(SUCCESS_INTERVAL).minusSeconds(60); + assertThat( + analyzer.shouldSchedule( + tableWithProperty("true"), + Optional.of(opWithStatus(OperationStatusDto.CANCELED)), + Optional.of(historyWithStatus(HistoryStatusDto.SUCCESS, longAgo)))) + .isTrue(); + } + + @Test + void shouldSchedule_canceled_successHistoryBeforeCooldown_returnsFalse() { + Instant recent = Instant.now().minus(SUCCESS_INTERVAL).plusSeconds(60); + assertThat( + analyzer.shouldSchedule( + tableWithProperty("true"), + Optional.of(opWithStatus(OperationStatusDto.CANCELED)), + Optional.of(historyWithStatus(HistoryStatusDto.SUCCESS, recent)))) + .isFalse(); + } + + @Test + void shouldSchedule_canceled_noHistory_returnsTrue() { + assertThat( + analyzer.shouldSchedule( + tableWithProperty("true"), + Optional.of(opWithStatus(OperationStatusDto.CANCELED)), + Optional.empty())) + .isTrue(); + } + + // --- helpers --- + + private TableDto tableWithProperty(String value) { + Map props = + value == null + ? Collections.emptyMap() + : Map.of(CadenceBasedOrphanFilesDeletionAnalyzer.OFD_ENABLED_PROPERTY, value); + return TableDto.builder() + .tableUuid("test-uuid") + .databaseName("db1") + .tableId("tbl1") + .tableProperties(props) + .build(); + } + + private TableOperationDto opWithStatus(OperationStatusDto status) { + return TableOperationDto.builder().status(status).build(); + } + + private TableOperationsHistoryDto historyWithStatus( + HistoryStatusDto status, Instant completedAt) { + return TableOperationsHistoryDto.builder() + .id("hist-id") + .tableUuid("test-uuid") + .operationType( + com.linkedin.openhouse.optimizer.model.OperationTypeDto.ORPHAN_FILES_DELETION) + .completedAt(completedAt) + .status(status) + .build(); + } +} diff --git a/settings.gradle b/settings.gradle index cad06785e..7942f44d3 100644 --- a/settings.gradle +++ b/settings.gradle @@ -50,6 +50,7 @@ include ':services:common' include ':services:housetables' include ':services:jobs' include ':services:optimizer' +include ':apps:optimizer-analyzer' include ':services:tables' include ':tables-test-fixtures:tables-test-fixtures-iceberg-1.2' include ':tables-test-fixtures:tables-test-fixtures-iceberg-1.5'