Optimizer: Batched orphan file deletion using bin packing#599
Optimizer: Batched orphan file deletion using bin packing#599abhisheknath2011 wants to merge 1 commit into
Conversation
| .connectTimeout(10, TimeUnit.SECONDS) | ||
| .readTimeout(30, TimeUnit.SECONDS) | ||
| .writeTimeout(30, TimeUnit.SECONDS) |
There was a problem hiding this comment.
Ideally these would be configs.
| private static String stripTrailingSlash(String url) { | ||
| if (url == null || url.isEmpty()) { | ||
| throw new IllegalArgumentException("Optimizer Service base URL must be non-empty"); | ||
| } | ||
| return url.endsWith("/") ? url.substring(0, url.length() - 1) : url; | ||
| } |
There was a problem hiding this comment.
(1) Seems heavyweight
(2) Can we just assume not null or attempt to use optional instead if null might be present? Under what conditions is the url valid to be null?
| private final List<BatchEntry> entries; | ||
| private final String resultsEndpoint; | ||
| private final int driverParallelism; | ||
| private final long ttlSeconds; | ||
| private final String backupDir; | ||
| private final int concurrentDeletes; | ||
| private final boolean streamResults; | ||
| private final int maxOrphanFileSampleSize; |
There was a problem hiding this comment.
Thoughts on a OFD parameters object that we use a builder to construct? The main thought was to encapsulate parameters rather than having them be manually maintained. I'm generally a fan of generating with annotations to avoid this boilerplate as lines 62-92 are all just defining parameters and a public funciton to supply them.
| int concurrentDeletes, | ||
| boolean streamResults, | ||
| int maxOrphanFileSampleSize) { | ||
| super(jobId, stateManager, otelEmitter); |
There was a problem hiding this comment.
There is a callback to complete job on HTS right? Do we need to adapt that or is it fine to leave as-is?
| futures.add(pool.submit(new TableWorker(ops, entry, client))); | ||
| } | ||
| int successes = 0; | ||
| for (int i = 0; i < futures.size(); i++) { |
There was a problem hiding this comment.
nit, can we use collection funcitonal apis over for loops? Mainly just a style preference I have.
futures.map(future -> {})
It avoids entries.get(i) from possibly having a problem.
| fqtn, | ||
| AttributeKey.stringKey(AppConstants.JOB_NAME), | ||
| BatchedOrphanFilesDeletionSparkApp.class.getSimpleName())); | ||
| throw e; |
There was a problem hiding this comment.
log too? We might want to standardize all errors get a log and a counter for later observability.
| try { | ||
| client.updateOperation(body); | ||
| } catch (IOException e) { | ||
| log.error( |
There was a problem hiding this comment.
counter? We can get signal on how many jobs are failing to emit metrics.
|
|
||
| private long resolveTtlSeconds(Table table) { | ||
| long resolved = ttlSeconds; | ||
| boolean oneDayTtlEnabled = |
There was a problem hiding this comment.
Nit. I don't think we need to cache this bool. The appconst is clear.
| private int countOrphans(DeleteOrphanFiles.Result result) { | ||
| int count = 0; | ||
| for (String unused : result.orphanFileLocations()) { | ||
| count++; | ||
| } | ||
| return count; | ||
| } | ||
| } |
There was a problem hiding this comment.
Can we do result.count()?
| if (tableNames == null | ||
| || operationIds == null | ||
| || tableUuids == null | ||
| || tableNames.isEmpty() |
There was a problem hiding this comment.
Is there a practical limit to the number of tables in the job based on the input string length limits?
Summary
Introduces
BatchedOrphanFilesDeletionSparkApp, the multi-table counterpart of the existing single-tableOrphanFilesDeletionSparkApp. One Spark job now processes a list of(table, operationId)pairs that the optimizer scheduler bin-packed into a single batch, reporting SUCCESS/FAILED per operation back to the Optimizer Service.Also lands a first-fit-decreasing bin packer (
jobs.util.binpack) that the scheduler (#534) will use to assemble those batches. The packer has no caller in this PR — it ships alongside the Spark app so the algorithm can be reviewed independently from the scheduler wiring. Keeping it inapps/sparkfor now since the scheduler module isn't merged yet; it can move along side the scheduler once PR #534 is merged.Key design choices:
POST /v1/optimizer/operations/updateitself fails, the row staysSCHEDULEDand the Analyzer's stale-timeout will re-queue it. No retry storms in the Spark driver.--driverParallelismis honoured verbatim; the app does not pick its own thread count.OperationUpdateRequestis mirrored inapps/sparkso this PR compiles independently of the optimizer-service module's merge order. Replace with the shared DTO once the optimizer module lives inapps/.Bin packer (
com.linkedin.openhouse.jobs.util.binpack):-
BinItem— fqtn, operationId, tableUuid, db/table, weight (numFiles), sizeBytesBin— mutable accumulator with three-dimensional fit checkFirstFitDecreasingBinPacker— FFD by weight with secondary caps on bytes and item count; oversized items get a dedicated binOptimizer Service client (
com.linkedin.openhouse.jobs.spark.optimizer):OperationUpdateRequest— wire-compatible body forPOST /v1/optimizer/operations/updateOptimizerServiceClient— thin OkHttp client with sensible timeoutsBatched Spark app (
com.linkedin.openhouse.jobs.spark):BatchedOrphanFilesDeletionSparkApp— extendsBaseSparkApp; iterates entries via a fixed thread pool, reusesOperations.deleteOrphanFiles(...)per table, posts per-operation status, runs the existingTableStateValidatorper tableCLI:
--tableNames db.t1,db.t2,db.t3
--operationIds op-uuid-1,op-uuid-2,op-uuid-3
--tableUuids tab-uuid-1,tab-uuid-2,tab-uuid-3
--resultsEndpoint http://optimizer.svc:8080
--driverParallelism 4
plus existing OFD knobs (
--ttl,--backupDir,--concurrentDeletes,--streamResults,--maxOrphanFileSampleSize).Issue] Briefly discuss the summary of the changes made in this
pull request in 2-3 lines.
Changes
For all the boxes checked, please include additional details of the changes made in this pull request.
Testing Done
For all the boxes checked, include a detailed description of the testing done for the changes made in this pull request.
Additional Information
Breaking Changes
Deprecations
Large PR broken into smaller PRs, and PR plan linked in the description.
Open items for reviewers:
OperationUpdateRequestis a local mirror of feat(optimizer): [2/N] Optimizer REST Service and Controller #531'sUpdateOperationRequest. Should we introduce anapps/optimizershared module here (mirroring the analyzer's pattern in feat(optimizer): [3/N] Analyzer #533) and depend on it instead?apps/spark/util/binpackor move to the scheduler module in feat(optimizer): [4/N] Scheduler app #534 alongside its only caller?POST /v1/optimizer/operations/updateper feat(optimizer): [2/N] Optimizer REST Service and Controller #531. If that endpoint changes name (e.g. toPOST /{id}/complete),OptimizerServiceClient.UPDATE_PATHis the only place that needs to change.For all the boxes checked, include additional details of the changes made in this pull request.