Skip to content

Optimizer: Batched orphan file deletion using bin packing#599

Draft
abhisheknath2011 wants to merge 1 commit into
linkedin:mainfrom
abhisheknath2011:batched-ofd
Draft

Optimizer: Batched orphan file deletion using bin packing#599
abhisheknath2011 wants to merge 1 commit into
linkedin:mainfrom
abhisheknath2011:batched-ofd

Conversation

@abhisheknath2011
Copy link
Copy Markdown
Member

@abhisheknath2011 abhisheknath2011 commented May 22, 2026

Summary

Introduces BatchedOrphanFilesDeletionSparkApp, the multi-table counterpart of the existing single-table OrphanFilesDeletionSparkApp. One Spark job now processes a list of (table, operationId) pairs that the optimizer scheduler bin-packed into a single batch, reporting SUCCESS/FAILED per operation back to the Optimizer Service.

Also lands a first-fit-decreasing bin packer (jobs.util.binpack) that the scheduler (#534) will use to assemble those batches. The packer has no caller in this PR — it ships alongside the Spark app so the algorithm can be reviewed independently from the scheduler wiring. Keeping it in apps/spark for now since the scheduler module isn't merged yet; it can move along side the scheduler once PR #534 is merged.

Key design choices:

  • Per-table failure isolation — exceptions in one table are caught, FAILED is posted for that operationId, and remaining tables continue. The job exits 0 if at least one table succeeds.
  • Recoverable result reporting — if POST /v1/optimizer/operations/update itself fails, the row stays SCHEDULED and the Analyzer's stale-timeout will re-queue it. No retry storms in the Spark driver.
  • Scheduler decides parallelism, not the app--driverParallelism is honoured verbatim; the app does not pick its own thread count.
  • Bin packer never drops oversized tables — an item exceeding any single cap is placed in a dedicated bin rather than silently skipped.
  • Self-contained wire DTOOperationUpdateRequest is mirrored in apps/spark so this PR compiles independently of the optimizer-service module's merge order. Replace with the shared DTO once the optimizer module lives in apps/.
    Bin packer (com.linkedin.openhouse.jobs.util.binpack):
    -BinItem — fqtn, operationId, tableUuid, db/table, weight (numFiles), sizeBytes
  • Bin — mutable accumulator with three-dimensional fit check
  • FirstFitDecreasingBinPacker — FFD by weight with secondary caps on bytes and item count; oversized items get a dedicated bin

Optimizer Service client (com.linkedin.openhouse.jobs.spark.optimizer):

  • OperationUpdateRequest — wire-compatible body for POST /v1/optimizer/operations/update
  • OptimizerServiceClient — thin OkHttp client with sensible timeouts

Batched Spark app (com.linkedin.openhouse.jobs.spark):

  • BatchedOrphanFilesDeletionSparkApp — extends BaseSparkApp; iterates entries via a fixed thread pool, reuses Operations.deleteOrphanFiles(...) per table, posts per-operation status, runs the existing TableStateValidator per table

CLI:
--tableNames db.t1,db.t2,db.t3
--operationIds op-uuid-1,op-uuid-2,op-uuid-3
--tableUuids tab-uuid-1,tab-uuid-2,tab-uuid-3
--resultsEndpoint http://optimizer.svc:8080
--driverParallelism 4
plus existing OFD knobs (--ttl, --backupDir, --concurrentDeletes, --streamResults, --maxOrphanFileSampleSize).

Issue] Briefly discuss the summary of the changes made in this
pull request in 2-3 lines.

Changes

  • Client-facing API Changes
  • Internal API Changes
  • Bug Fixes
  • New Features
  • Performance Improvements
  • Code Style
  • Refactoring
  • Documentation
  • Tests

For all the boxes checked, please include additional details of the changes made in this pull request.

Testing Done

  • Manually Tested on local docker setup. Please include commands ran, and their output.
  • Added new tests for the changes made.
  • Updated existing tests to reflect the changes made.
  • No tests added or updated. Please explain why. If unsure, please feel free to ask for help.
  • Some other form of testing like staging or soak time in production. Please explain.

For all the boxes checked, include a detailed description of the testing done for the changes made in this pull request.

Additional Information

For all the boxes checked, include additional details of the changes made in this pull request.

Comment on lines +69 to +71
.connectTimeout(10, TimeUnit.SECONDS)
.readTimeout(30, TimeUnit.SECONDS)
.writeTimeout(30, TimeUnit.SECONDS)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally these would be configs.

Comment on lines +75 to +80
private static String stripTrailingSlash(String url) {
if (url == null || url.isEmpty()) {
throw new IllegalArgumentException("Optimizer Service base URL must be non-empty");
}
return url.endsWith("/") ? url.substring(0, url.length() - 1) : url;
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(1) Seems heavyweight
(2) Can we just assume not null or attempt to use optional instead if null might be present? Under what conditions is the url valid to be null?

Comment on lines +62 to +69
private final List<BatchEntry> entries;
private final String resultsEndpoint;
private final int driverParallelism;
private final long ttlSeconds;
private final String backupDir;
private final int concurrentDeletes;
private final boolean streamResults;
private final int maxOrphanFileSampleSize;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on a OFD parameters object that we use a builder to construct? The main thought was to encapsulate parameters rather than having them be manually maintained. I'm generally a fan of generating with annotations to avoid this boilerplate as lines 62-92 are all just defining parameters and a public funciton to supply them.

int concurrentDeletes,
boolean streamResults,
int maxOrphanFileSampleSize) {
super(jobId, stateManager, otelEmitter);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a callback to complete job on HTS right? Do we need to adapt that or is it fine to leave as-is?

futures.add(pool.submit(new TableWorker(ops, entry, client)));
}
int successes = 0;
for (int i = 0; i < futures.size(); i++) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, can we use collection funcitonal apis over for loops? Mainly just a style preference I have.

futures.map(future -> {})

It avoids entries.get(i) from possibly having a problem.

fqtn,
AttributeKey.stringKey(AppConstants.JOB_NAME),
BatchedOrphanFilesDeletionSparkApp.class.getSimpleName()));
throw e;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log too? We might want to standardize all errors get a log and a counter for later observability.

try {
client.updateOperation(body);
} catch (IOException e) {
log.error(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

counter? We can get signal on how many jobs are failing to emit metrics.


private long resolveTtlSeconds(Table table) {
long resolved = ttlSeconds;
boolean oneDayTtlEnabled =
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit. I don't think we need to cache this bool. The appconst is clear.

Comment on lines +278 to +285
private int countOrphans(DeleteOrphanFiles.Result result) {
int count = 0;
for (String unused : result.orphanFileLocations()) {
count++;
}
return count;
}
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do result.count()?

if (tableNames == null
|| operationIds == null
|| tableUuids == null
|| tableNames.isEmpty()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a practical limit to the number of tables in the job based on the input string length limits?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants