getEntries() {
+ return Collections.unmodifiableList(entries);
+ }
+
+ /** Visible for tests. */
+ int getDriverParallelism() {
+ return driverParallelism;
+ }
+}
diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/optimizer/OperationUpdateRequest.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/optimizer/OperationUpdateRequest.java
new file mode 100644
index 000000000..715873aaa
--- /dev/null
+++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/optimizer/OperationUpdateRequest.java
@@ -0,0 +1,26 @@
+package com.linkedin.openhouse.jobs.spark.optimizer;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Wire-compatible body for {@code POST /v1/optimizer/operations/update} on the Optimizer Service.
+ *
+ * Mirrors {@code com.linkedin.openhouse.optimizer.api.spec.UpdateOperationRequest} from the
+ * optimizer service module so this app can be built before that module merges. Keep the two in
+ * sync.
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class OperationUpdateRequest {
+ private String operationId;
+ private String status;
+ private String tableUuid;
+ private String databaseName;
+ private String tableName;
+ private String operationType;
+}
diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/optimizer/OptimizerServiceClient.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/optimizer/OptimizerServiceClient.java
new file mode 100644
index 000000000..ec220337f
--- /dev/null
+++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/optimizer/OptimizerServiceClient.java
@@ -0,0 +1,81 @@
+package com.linkedin.openhouse.jobs.spark.optimizer;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+
+/**
+ * Thin OkHttp client for the Optimizer Service. The batched Spark app calls {@link
+ * #updateOperation(OperationUpdateRequest)} once per finished operation to record SUCCESS or
+ * FAILED.
+ *
+ *
Errors are surfaced as {@link IOException}; the caller decides whether to retry. Per the
+ * design, a missed update is recoverable — the operation row stays SCHEDULED and the Analyzer's
+ * stale-timeout will re-queue it.
+ */
+@Slf4j
+public class OptimizerServiceClient implements AutoCloseable {
+
+ private static final MediaType JSON = MediaType.parse("application/json; charset=utf-8");
+ private static final String UPDATE_PATH = "/v1/optimizer/operations/update";
+
+ private final String baseUrl;
+ private final OkHttpClient httpClient;
+ private final ObjectMapper objectMapper;
+
+ public OptimizerServiceClient(String baseUrl) {
+ this(baseUrl, defaultClient(), new ObjectMapper());
+ }
+
+ OptimizerServiceClient(String baseUrl, OkHttpClient httpClient, ObjectMapper objectMapper) {
+ this.baseUrl = stripTrailingSlash(baseUrl);
+ this.httpClient = httpClient;
+ this.objectMapper = objectMapper;
+ }
+
+ public void updateOperation(OperationUpdateRequest body) throws IOException {
+ String url = baseUrl + UPDATE_PATH;
+ String json = objectMapper.writeValueAsString(body);
+ Request request = new Request.Builder().url(url).post(RequestBody.create(json, JSON)).build();
+ try (Response response = httpClient.newCall(request).execute()) {
+ if (!response.isSuccessful()) {
+ throw new IOException(
+ String.format(
+ "Optimizer Service update failed: url=%s status=%d operationId=%s",
+ url, response.code(), body.getOperationId()));
+ }
+ log.info(
+ "Reported operation update: operationId={} status={} httpStatus={}",
+ body.getOperationId(),
+ body.getStatus(),
+ response.code());
+ }
+ }
+
+ @Override
+ public void close() {
+ httpClient.dispatcher().executorService().shutdown();
+ httpClient.connectionPool().evictAll();
+ }
+
+ private static OkHttpClient defaultClient() {
+ return new OkHttpClient.Builder()
+ .connectTimeout(10, TimeUnit.SECONDS)
+ .readTimeout(30, TimeUnit.SECONDS)
+ .writeTimeout(30, TimeUnit.SECONDS)
+ .build();
+ }
+
+ private static String stripTrailingSlash(String url) {
+ if (url == null || url.isEmpty()) {
+ throw new IllegalArgumentException("Optimizer Service base URL must be non-empty");
+ }
+ return url.endsWith("/") ? url.substring(0, url.length() - 1) : url;
+ }
+}
diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/binpack/Bin.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/binpack/Bin.java
new file mode 100644
index 000000000..0b40b4958
--- /dev/null
+++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/binpack/Bin.java
@@ -0,0 +1,49 @@
+package com.linkedin.openhouse.jobs.util.binpack;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import lombok.Getter;
+import lombok.ToString;
+
+/**
+ * Mutable accumulator used by {@link FirstFitDecreasingBinPacker}. After packing completes the
+ * caller treats the returned bins as immutable; {@link #items()} returns an unmodifiable view.
+ */
+@ToString
+public class Bin {
+ private final List items = new ArrayList<>();
+ @Getter private long totalWeight;
+ @Getter private long totalSizeBytes;
+
+ /**
+ * Returns true iff adding {@code item} would keep this bin at or below all three caps. A cap of
+ * {@code <= 0} disables that dimension.
+ */
+ boolean fits(BinItem item, long maxWeight, long maxSizeBytes, int maxItems) {
+ if (maxItems > 0 && items.size() >= maxItems) {
+ return false;
+ }
+ if (maxWeight > 0 && totalWeight + item.getWeight() > maxWeight) {
+ return false;
+ }
+ if (maxSizeBytes > 0 && totalSizeBytes + item.getSizeBytes() > maxSizeBytes) {
+ return false;
+ }
+ return true;
+ }
+
+ void add(BinItem item) {
+ items.add(item);
+ totalWeight += item.getWeight();
+ totalSizeBytes += item.getSizeBytes();
+ }
+
+ public List items() {
+ return Collections.unmodifiableList(items);
+ }
+
+ public int size() {
+ return items.size();
+ }
+}
diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/binpack/BinItem.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/binpack/BinItem.java
new file mode 100644
index 000000000..68bcb16e2
--- /dev/null
+++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/binpack/BinItem.java
@@ -0,0 +1,29 @@
+package com.linkedin.openhouse.jobs.util.binpack;
+
+import lombok.Builder;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.ToString;
+
+/**
+ * A single packable unit for {@link FirstFitDecreasingBinPacker}. Carries everything the batched
+ * Spark app needs both to do the work ({@link #fqtn}) and to report the result back to the
+ * Optimizer Service ({@link #operationId}, {@link #tableUuid}, {@link #databaseName}, {@link
+ * #tableName}).
+ *
+ * {@link #weight} is the bin-packing dimension (for OFD: number of current files in the table).
+ * {@link #sizeBytes} is a secondary capacity dimension that lets the packer cap the total on-disk
+ * footprint of a bin independently of file count.
+ */
+@Getter
+@Builder
+@ToString
+public class BinItem {
+ @NonNull private final String fqtn;
+ @NonNull private final String operationId;
+ @NonNull private final String tableUuid;
+ @NonNull private final String databaseName;
+ @NonNull private final String tableName;
+ private final long weight;
+ private final long sizeBytes;
+}
diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/binpack/FirstFitDecreasingBinPacker.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/binpack/FirstFitDecreasingBinPacker.java
new file mode 100644
index 000000000..71009d3ff
--- /dev/null
+++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/binpack/FirstFitDecreasingBinPacker.java
@@ -0,0 +1,70 @@
+package com.linkedin.openhouse.jobs.util.binpack;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+import lombok.Builder;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * First-fit-decreasing bin packer used by the optimizer scheduler to group table operations into
+ * batches before launching a single Spark job per batch.
+ *
+ *
Each bin has three independent caps:
+ *
+ *
+ * - {@code maxWeightPerBin} — total {@link BinItem#getWeight()} (for OFD: number of files)
+ *
- {@code maxSizeBytesPerBin} — total on-disk size of all tables in the bin
+ *
- {@code maxItemsPerBin} — number of tables per bin
+ *
+ *
+ * An item that exceeds any single cap on its own is placed into a bin by itself rather than
+ * dropped — we never silently skip maintenance work for an oversized table.
+ *
+ *
Pass {@code 0} or a negative value for any cap to disable that dimension.
+ */
+@Slf4j
+@Builder
+public class FirstFitDecreasingBinPacker {
+
+ @Builder.Default private final long maxWeightPerBin = 1_000_000L;
+ @Builder.Default private final long maxSizeBytesPerBin = 5L * 1024L * 1024L * 1024L * 1024L;
+ @Builder.Default private final int maxItemsPerBin = 50;
+
+ public List pack(List items) {
+ if (items == null || items.isEmpty()) {
+ return new ArrayList<>();
+ }
+
+ List sorted =
+ items.stream()
+ .sorted(Comparator.comparingLong(BinItem::getWeight).reversed())
+ .collect(Collectors.toList());
+
+ List bins = new ArrayList<>();
+ for (BinItem item : sorted) {
+ Bin target = null;
+ for (Bin bin : bins) {
+ if (bin.fits(item, maxWeightPerBin, maxSizeBytesPerBin, maxItemsPerBin)) {
+ target = bin;
+ break;
+ }
+ }
+ if (target == null) {
+ target = new Bin();
+ bins.add(target);
+ if (!target.fits(item, maxWeightPerBin, maxSizeBytesPerBin, maxItemsPerBin)) {
+ log.warn(
+ "Item exceeds per-bin caps on its own; placing in dedicated bin: fqtn={} weight={} sizeBytes={}",
+ item.getFqtn(),
+ item.getWeight(),
+ item.getSizeBytes());
+ }
+ }
+ target.add(item);
+ }
+ log.info("Packed {} items into {} bins", items.size(), bins.size());
+ return bins;
+ }
+}
diff --git a/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkAppArgsTest.java b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkAppArgsTest.java
new file mode 100644
index 000000000..7a32e503f
--- /dev/null
+++ b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/BatchedOrphanFilesDeletionSparkAppArgsTest.java
@@ -0,0 +1,74 @@
+package com.linkedin.openhouse.jobs.spark;
+
+import java.util.List;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Pure-Java unit tests for {@link BatchedOrphanFilesDeletionSparkApp#buildEntries}. No Spark
+ * session, no HTTP — exercises the CLI-parsing edges that decide whether the app can even start.
+ */
+public class BatchedOrphanFilesDeletionSparkAppArgsTest {
+
+ @Test
+ public void buildEntriesParsesParallelLists() {
+ List entries =
+ BatchedOrphanFilesDeletionSparkApp.buildEntries(
+ "db1.t1,db2.t2", "op-1,op-2", "uuid-1,uuid-2");
+
+ Assertions.assertEquals(2, entries.size());
+ Assertions.assertEquals("db1.t1", entries.get(0).getFqtn());
+ Assertions.assertEquals("db1", entries.get(0).getDatabaseName());
+ Assertions.assertEquals("t1", entries.get(0).getTableName());
+ Assertions.assertEquals("op-1", entries.get(0).getOperationId());
+ Assertions.assertEquals("uuid-1", entries.get(0).getTableUuid());
+ Assertions.assertEquals("db2.t2", entries.get(1).getFqtn());
+ Assertions.assertEquals("op-2", entries.get(1).getOperationId());
+ }
+
+ @Test
+ public void buildEntriesTrimsWhitespaceInEachEntry() {
+ List entries =
+ BatchedOrphanFilesDeletionSparkApp.buildEntries(
+ " db1.t1 , db2.t2 ", " op-1 , op-2 ", " uuid-1 , uuid-2 ");
+
+ Assertions.assertEquals("db1.t1", entries.get(0).getFqtn());
+ Assertions.assertEquals("op-1", entries.get(0).getOperationId());
+ Assertions.assertEquals("uuid-1", entries.get(0).getTableUuid());
+ }
+
+ @Test
+ public void buildEntriesRejectsMismatchedLengths() {
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ BatchedOrphanFilesDeletionSparkApp.buildEntries("db.a,db.b", "op-1", "uuid-1,uuid-2"));
+ }
+
+ @Test
+ public void buildEntriesRejectsNullArguments() {
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> BatchedOrphanFilesDeletionSparkApp.buildEntries(null, "op-1", "uuid-1"));
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> BatchedOrphanFilesDeletionSparkApp.buildEntries("db.a", null, "uuid-1"));
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> BatchedOrphanFilesDeletionSparkApp.buildEntries("db.a", "op-1", null));
+ }
+
+ @Test
+ public void buildEntriesRejectsEmptyStrings() {
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> BatchedOrphanFilesDeletionSparkApp.buildEntries("", "op-1", "uuid-1"));
+ }
+
+ @Test
+ public void buildEntriesRejectsNonFqtn() {
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> BatchedOrphanFilesDeletionSparkApp.buildEntries("just_a_table", "op-1", "uuid-1"));
+ }
+}
diff --git a/apps/spark/src/test/java/com/linkedin/openhouse/jobs/util/binpack/FirstFitDecreasingBinPackerTest.java b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/util/binpack/FirstFitDecreasingBinPackerTest.java
new file mode 100644
index 000000000..d77944772
--- /dev/null
+++ b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/util/binpack/FirstFitDecreasingBinPackerTest.java
@@ -0,0 +1,150 @@
+package com.linkedin.openhouse.jobs.util.binpack;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class FirstFitDecreasingBinPackerTest {
+
+ @Test
+ public void emptyInputProducesEmptyOutput() {
+ List bins = packer(100, 0, 50).pack(Collections.emptyList());
+ Assertions.assertTrue(bins.isEmpty());
+ }
+
+ @Test
+ public void nullInputProducesEmptyOutput() {
+ List bins = packer(100, 0, 50).pack(null);
+ Assertions.assertTrue(bins.isEmpty());
+ }
+
+ @Test
+ public void itemsSortDescendingByWeightBeforePacking() {
+ List items =
+ Arrays.asList(item("db.t_small", 10), item("db.t_big", 100), item("db.t_mid", 50));
+
+ List bins = packer(1000, 0, 50).pack(items);
+
+ // Everything fits in one bin since capacity is huge; order inside the bin must be descending.
+ Assertions.assertEquals(1, bins.size());
+ Bin only = bins.get(0);
+ Assertions.assertEquals(3, only.size());
+ Assertions.assertEquals("db.t_big", only.items().get(0).getFqtn());
+ Assertions.assertEquals("db.t_mid", only.items().get(1).getFqtn());
+ Assertions.assertEquals("db.t_small", only.items().get(2).getFqtn());
+ Assertions.assertEquals(160, only.getTotalWeight());
+ }
+
+ @Test
+ public void weightCapForcesMultipleBins() {
+ List items =
+ Arrays.asList(item("db.a", 60), item("db.b", 50), item("db.c", 40), item("db.d", 30));
+
+ List bins = packer(100, 0, 50).pack(items);
+
+ // FFD on [60, 50, 40, 30] with cap 100:
+ // bin0: 60 -> remaining 40
+ // bin0 tries 50 -> doesn't fit, new bin1: 50
+ // bin0 tries 40 -> fits, bin0: 60+40=100
+ // bin1 tries 30 -> fits, bin1: 50+30=80
+ Assertions.assertEquals(2, bins.size());
+ Assertions.assertEquals(100, bins.get(0).getTotalWeight());
+ Assertions.assertEquals(80, bins.get(1).getTotalWeight());
+ }
+
+ @Test
+ public void maxItemsPerBinCapHonored() {
+ List items =
+ IntStream.range(0, 5).mapToObj(i -> item("db.t" + i, 1)).collect(Collectors.toList());
+
+ List bins = packer(1000, 0, 2).pack(items);
+
+ Assertions.assertEquals(3, bins.size());
+ Assertions.assertEquals(2, bins.get(0).size());
+ Assertions.assertEquals(2, bins.get(1).size());
+ Assertions.assertEquals(1, bins.get(2).size());
+ }
+
+ @Test
+ public void maxSizeBytesCapHonored() {
+ List items =
+ Arrays.asList(
+ BinItem.builder()
+ .fqtn("db.a")
+ .operationId("op-a")
+ .tableUuid("uuid-a")
+ .databaseName("db")
+ .tableName("a")
+ .weight(1)
+ .sizeBytes(800L)
+ .build(),
+ BinItem.builder()
+ .fqtn("db.b")
+ .operationId("op-b")
+ .tableUuid("uuid-b")
+ .databaseName("db")
+ .tableName("b")
+ .weight(1)
+ .sizeBytes(800L)
+ .build());
+
+ List bins = packer(1000, 1000L, 50).pack(items);
+
+ Assertions.assertEquals(2, bins.size());
+ Assertions.assertEquals(800L, bins.get(0).getTotalSizeBytes());
+ Assertions.assertEquals(800L, bins.get(1).getTotalSizeBytes());
+ }
+
+ @Test
+ public void oversizedItemGetsItsOwnBinRatherThanBeingDropped() {
+ List items =
+ Arrays.asList(item("db.tiny1", 10), item("db.giant", 500), item("db.tiny2", 10));
+
+ List bins = packer(100, 0, 50).pack(items);
+
+ // Giant exceeds the cap on its own — must still appear in some bin.
+ long total = bins.stream().mapToLong(Bin::getTotalWeight).sum();
+ Assertions.assertEquals(520, total);
+ boolean giantPresent =
+ bins.stream()
+ .flatMap(b -> b.items().stream())
+ .anyMatch(i -> i.getFqtn().equals("db.giant"));
+ Assertions.assertTrue(giantPresent, "oversized item must not be dropped");
+ }
+
+ @Test
+ public void disabledCapsLetEverythingShareOneBin() {
+ List items =
+ IntStream.range(0, 20).mapToObj(i -> item("db.t" + i, 100)).collect(Collectors.toList());
+
+ List bins = packer(0, 0, 0).pack(items);
+
+ Assertions.assertEquals(1, bins.size());
+ Assertions.assertEquals(20, bins.get(0).size());
+ }
+
+ private static FirstFitDecreasingBinPacker packer(long maxWeight, long maxSize, int maxItems) {
+ return FirstFitDecreasingBinPacker.builder()
+ .maxWeightPerBin(maxWeight)
+ .maxSizeBytesPerBin(maxSize)
+ .maxItemsPerBin(maxItems)
+ .build();
+ }
+
+ private static BinItem item(String fqtn, long weight) {
+ String[] parts = fqtn.split("\\.", 2);
+ return BinItem.builder()
+ .fqtn(fqtn)
+ .operationId("op-" + parts[1])
+ .tableUuid("uuid-" + parts[1])
+ .databaseName(parts[0])
+ .tableName(parts[1])
+ .weight(weight)
+ .sizeBytes(0L)
+ .build();
+ }
+}