Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,21 @@ private List<DataLayoutStrategy> getDataLayoutStrategies(GetTableResponseBody ta
tableProps.get(StrategiesDaoTableProps.DATA_LAYOUT_STRATEGIES_PROPERTY_KEY));
}

/**
* Read the persisted data layout strategies for a table from its table properties.
*
* <p>Returns an empty list if the table is missing or has no {@code write.data-layout.strategies}
* property. Used by the scheduler to skip launching compaction jobs that the strategy generator
* has already shown will produce no useful work.
*/
public List<DataLayoutStrategy> getDataLayoutStrategies(TableMetadata tableMetadata) {
GetTableResponseBody response = getTable(tableMetadata);
if (response == null) {
return Collections.emptyList();
}
return getDataLayoutStrategies(response);
}

protected @NonNull String getTableCreator(GetTableResponseBody responseBody) {
return Objects.requireNonNull(responseBody.getTableCreator());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
package com.linkedin.openhouse.jobs.scheduler.tasks;

import com.linkedin.openhouse.datalayout.strategy.DataLayoutStrategy;
import com.linkedin.openhouse.jobs.client.JobsClient;
import com.linkedin.openhouse.jobs.client.TablesClient;
import com.linkedin.openhouse.jobs.client.model.JobConf;
import com.linkedin.openhouse.jobs.util.TableMetadata;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;

/**
* A task to rewrite data files in a table.
*
* @see <a href="https://iceberg.apache.org/docs/latest/maintenance/#compact-data-files">Compact
* data files</a>
*/
@Slf4j
public class TableDataCompactionTask extends TableOperationTask<TableMetadata> {
public static final JobConf.JobTypeEnum OPERATION_TYPE = JobConf.JobTypeEnum.DATA_COMPACTION;

Expand Down Expand Up @@ -44,6 +47,24 @@ protected List<String> getArgs() {

@Override
protected boolean shouldRunTask() {
return metadata.isPrimary() && (metadata.isTimePartitioned() || metadata.isClustered());
if (!metadata.isPrimary() || (!metadata.isTimePartitioned() && !metadata.isClustered())) {
return false;
}
List<DataLayoutStrategy> strategies = tablesClient.getDataLayoutStrategies(metadata);
if (strategies.isEmpty()) {
log.info(
"Skipping data compaction for {}: no data-layout strategies persisted on the table",
metadata.fqtn());
return false;
}
boolean hasPositiveGain = strategies.stream().anyMatch(s -> s.getGain() > 0);
if (!hasPositiveGain) {
log.info(
"Skipping data compaction for {}: all {} strategies have gain <= 0",
metadata.fqtn(),
strategies.size());
return false;
}
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package com.linkedin.openhouse.jobs.scheduler.tasks;

import com.linkedin.openhouse.datalayout.strategy.DataLayoutStrategy;
import com.linkedin.openhouse.jobs.client.JobsClient;
import com.linkedin.openhouse.jobs.client.TablesClient;
import com.linkedin.openhouse.jobs.util.TableMetadata;
import java.util.Arrays;
import java.util.Collections;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

public class TableDataCompactionTaskTest {
private TablesClient tablesClient;
private JobsClient jobsClient;
private TableMetadata tableMetadata;
private TableDataCompactionTask task;

@BeforeEach
void setup() {
tablesClient = Mockito.mock(TablesClient.class);
jobsClient = Mockito.mock(JobsClient.class);
tableMetadata = Mockito.mock(TableMetadata.class);
Mockito.when(tableMetadata.fqtn()).thenReturn("db.table");
Mockito.when(tableMetadata.isPrimary()).thenReturn(true);
Mockito.when(tableMetadata.isTimePartitioned()).thenReturn(true);
Mockito.when(tableMetadata.isClustered()).thenReturn(false);
task = new TableDataCompactionTask(jobsClient, tablesClient, tableMetadata);
}

@Test
void shouldRunWhenAtLeastOneStrategyHasPositiveGain() {
Mockito.when(tablesClient.getDataLayoutStrategies(tableMetadata))
.thenReturn(
Arrays.asList(
DataLayoutStrategy.builder().gain(0.0).build(),
DataLayoutStrategy.builder().gain(2.5).build()));
Assertions.assertTrue(task.shouldRunTask());
}

@Test
void shouldNotRunWhenStrategiesEmpty() {
Mockito.when(tablesClient.getDataLayoutStrategies(tableMetadata))
.thenReturn(Collections.emptyList());
Assertions.assertFalse(task.shouldRunTask());
}

@Test
void shouldNotRunWhenAllStrategiesHaveNonPositiveGain() {
Mockito.when(tablesClient.getDataLayoutStrategies(tableMetadata))
.thenReturn(
Arrays.asList(
DataLayoutStrategy.builder().gain(0.0).build(),
DataLayoutStrategy.builder().gain(-1.0).build()));
Assertions.assertFalse(task.shouldRunTask());
}

@Test
void shouldNotRunForNonPrimaryTable() {
Mockito.when(tableMetadata.isPrimary()).thenReturn(false);
Assertions.assertFalse(task.shouldRunTask());
// No need to fetch strategies if the primary/partitioning guard fails first.
Mockito.verify(tablesClient, Mockito.never()).getDataLayoutStrategies(tableMetadata);
}

@Test
void shouldNotRunWhenNeitherPartitionedNorClustered() {
Mockito.when(tableMetadata.isTimePartitioned()).thenReturn(false);
Mockito.when(tableMetadata.isClustered()).thenReturn(false);
Assertions.assertFalse(task.shouldRunTask());
Mockito.verify(tablesClient, Mockito.never()).getDataLayoutStrategies(tableMetadata);
}

@Test
void shouldRunForClusteredNonTimePartitionedTableWithGain() {
Mockito.when(tableMetadata.isTimePartitioned()).thenReturn(false);
Mockito.when(tableMetadata.isClustered()).thenReturn(true);
Mockito.when(tablesClient.getDataLayoutStrategies(tableMetadata))
.thenReturn(Collections.singletonList(DataLayoutStrategy.builder().gain(1.0).build()));
Assertions.assertTrue(task.shouldRunTask());
}
}