diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/client/TablesClient.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/client/TablesClient.java index 93107c99a..5609ffaf8 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/client/TablesClient.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/client/TablesClient.java @@ -470,6 +470,21 @@ private List getDataLayoutStrategies(GetTableResponseBody ta tableProps.get(StrategiesDaoTableProps.DATA_LAYOUT_STRATEGIES_PROPERTY_KEY)); } + /** + * Read the persisted data layout strategies for a table from its table properties. + * + *

Returns an empty list if the table is missing or has no {@code write.data-layout.strategies} + * property. Used by the scheduler to skip launching compaction jobs that the strategy generator + * has already shown will produce no useful work. + */ + public List getDataLayoutStrategies(TableMetadata tableMetadata) { + GetTableResponseBody response = getTable(tableMetadata); + if (response == null) { + return Collections.emptyList(); + } + return getDataLayoutStrategies(response); + } + protected @NonNull String getTableCreator(GetTableResponseBody responseBody) { return Objects.requireNonNull(responseBody.getTableCreator()); } diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/TableDataCompactionTask.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/TableDataCompactionTask.java index 32baa0465..742e973c7 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/TableDataCompactionTask.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/TableDataCompactionTask.java @@ -1,5 +1,6 @@ package com.linkedin.openhouse.jobs.scheduler.tasks; +import com.linkedin.openhouse.datalayout.strategy.DataLayoutStrategy; import com.linkedin.openhouse.jobs.client.JobsClient; import com.linkedin.openhouse.jobs.client.TablesClient; import com.linkedin.openhouse.jobs.client.model.JobConf; @@ -7,6 +8,7 @@ import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; +import lombok.extern.slf4j.Slf4j; /** * A task to rewrite data files in a table. @@ -14,6 +16,7 @@ * @see Compact * data files */ +@Slf4j public class TableDataCompactionTask extends TableOperationTask { public static final JobConf.JobTypeEnum OPERATION_TYPE = JobConf.JobTypeEnum.DATA_COMPACTION; @@ -44,6 +47,24 @@ protected List getArgs() { @Override protected boolean shouldRunTask() { - return metadata.isPrimary() && (metadata.isTimePartitioned() || metadata.isClustered()); + if (!metadata.isPrimary() || (!metadata.isTimePartitioned() && !metadata.isClustered())) { + return false; + } + List strategies = tablesClient.getDataLayoutStrategies(metadata); + if (strategies.isEmpty()) { + log.info( + "Skipping data compaction for {}: no data-layout strategies persisted on the table", + metadata.fqtn()); + return false; + } + boolean hasPositiveGain = strategies.stream().anyMatch(s -> s.getGain() > 0); + if (!hasPositiveGain) { + log.info( + "Skipping data compaction for {}: all {} strategies have gain <= 0", + metadata.fqtn(), + strategies.size()); + return false; + } + return true; } } diff --git a/apps/spark/src/test/java/com/linkedin/openhouse/jobs/scheduler/tasks/TableDataCompactionTaskTest.java b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/scheduler/tasks/TableDataCompactionTaskTest.java new file mode 100644 index 000000000..30be3af25 --- /dev/null +++ b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/scheduler/tasks/TableDataCompactionTaskTest.java @@ -0,0 +1,83 @@ +package com.linkedin.openhouse.jobs.scheduler.tasks; + +import com.linkedin.openhouse.datalayout.strategy.DataLayoutStrategy; +import com.linkedin.openhouse.jobs.client.JobsClient; +import com.linkedin.openhouse.jobs.client.TablesClient; +import com.linkedin.openhouse.jobs.util.TableMetadata; +import java.util.Arrays; +import java.util.Collections; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +public class TableDataCompactionTaskTest { + private TablesClient tablesClient; + private JobsClient jobsClient; + private TableMetadata tableMetadata; + private TableDataCompactionTask task; + + @BeforeEach + void setup() { + tablesClient = Mockito.mock(TablesClient.class); + jobsClient = Mockito.mock(JobsClient.class); + tableMetadata = Mockito.mock(TableMetadata.class); + Mockito.when(tableMetadata.fqtn()).thenReturn("db.table"); + Mockito.when(tableMetadata.isPrimary()).thenReturn(true); + Mockito.when(tableMetadata.isTimePartitioned()).thenReturn(true); + Mockito.when(tableMetadata.isClustered()).thenReturn(false); + task = new TableDataCompactionTask(jobsClient, tablesClient, tableMetadata); + } + + @Test + void shouldRunWhenAtLeastOneStrategyHasPositiveGain() { + Mockito.when(tablesClient.getDataLayoutStrategies(tableMetadata)) + .thenReturn( + Arrays.asList( + DataLayoutStrategy.builder().gain(0.0).build(), + DataLayoutStrategy.builder().gain(2.5).build())); + Assertions.assertTrue(task.shouldRunTask()); + } + + @Test + void shouldNotRunWhenStrategiesEmpty() { + Mockito.when(tablesClient.getDataLayoutStrategies(tableMetadata)) + .thenReturn(Collections.emptyList()); + Assertions.assertFalse(task.shouldRunTask()); + } + + @Test + void shouldNotRunWhenAllStrategiesHaveNonPositiveGain() { + Mockito.when(tablesClient.getDataLayoutStrategies(tableMetadata)) + .thenReturn( + Arrays.asList( + DataLayoutStrategy.builder().gain(0.0).build(), + DataLayoutStrategy.builder().gain(-1.0).build())); + Assertions.assertFalse(task.shouldRunTask()); + } + + @Test + void shouldNotRunForNonPrimaryTable() { + Mockito.when(tableMetadata.isPrimary()).thenReturn(false); + Assertions.assertFalse(task.shouldRunTask()); + // No need to fetch strategies if the primary/partitioning guard fails first. + Mockito.verify(tablesClient, Mockito.never()).getDataLayoutStrategies(tableMetadata); + } + + @Test + void shouldNotRunWhenNeitherPartitionedNorClustered() { + Mockito.when(tableMetadata.isTimePartitioned()).thenReturn(false); + Mockito.when(tableMetadata.isClustered()).thenReturn(false); + Assertions.assertFalse(task.shouldRunTask()); + Mockito.verify(tablesClient, Mockito.never()).getDataLayoutStrategies(tableMetadata); + } + + @Test + void shouldRunForClusteredNonTimePartitionedTableWithGain() { + Mockito.when(tableMetadata.isTimePartitioned()).thenReturn(false); + Mockito.when(tableMetadata.isClustered()).thenReturn(true); + Mockito.when(tablesClient.getDataLayoutStrategies(tableMetadata)) + .thenReturn(Collections.singletonList(DataLayoutStrategy.builder().gain(1.0).build())); + Assertions.assertTrue(task.shouldRunTask()); + } +}