feat(optimizer): [4/N] Scheduler app#534
Conversation
…park jobs Introduces apps/optimizer-scheduler, a Spring Boot CommandLineRunner that: 1. Loads PENDING operations from the optimizer DB 2. Bin-packs them by file count (first-fit descending) 3. Claims rows via CAS (PENDING → SCHEDULING → SCHEDULED) 4. Submits one batched Spark job per bin via the Jobs Service Adds three @Modifying CAS methods to TableOperationsRepository: cancelDuplicatePending, markScheduling, markScheduled — required for safe concurrent scheduling without double-submitting. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
| @@ -0,0 +1,74 @@ | |||
| package com.linkedin.openhouse.scheduler; | |||
|
|
|||
| import com.linkedin.openhouse.optimizer.entity.TableOperationRow; | |||
There was a problem hiding this comment.
Every operation needs it own binpacker. Its different based on each operation as constraints are different. Rather than generic binpacking, we should have an OFD scheduler that is attempting to binpack pending ofd jobs. it should be a similar pattern to the analyzer where we run the scheduler requires an operation type but database and table name are optional.
So we should have a Binpacker interface and a mapping from operation to binpacker.
There was a problem hiding this comment.
Every operation needs it own binpacker. Its different based on each operation as constraints are different. Rather than generic binpacking, we should have an OFD scheduler that is attempting to binpack pending ofd jobs. it should be a similar pattern to the analyzer where we run the scheduler requires an operation type but database and table name are optional.
So we should have a Binpacker interface and a mapping from operation to binpacker.
Done. BinPacker is now an interface with a single pack(List<TableOperation>) method. SchedulerConfig defines a Map<OperationType, BinPacker> bean — ORPHAN_FILES_DELETION is wired to a FileCountBinPacker instance. SchedulerApplication's CommandLineRunner loops over the map's keys and calls runner.schedule(opType) per entry, mirroring AnalyzerApplication. SchedulerRunner.schedule(OperationType, Optional<String> db, Optional<String> tableName) is the new signature.
| * @return list of bins, each bin being a non-empty list of rows | ||
| */ | ||
| public static List<List<TableOperationRow>> pack( | ||
| List<TableOperationRow> pending, Map<String, Long> fileCountByUuid, long maxFilesPerBin) { |
There was a problem hiding this comment.
This interface is bad and inflexible. TableOperationRow should contain all the nessasary information. FileCount should be a field. maxFilesPerBin is specific to OFD and it should not be a aparameter but rather something done when constructing the bin packer instance (or a conf)
There was a problem hiding this comment.
This interface is bad and inflexible. TableOperationRow should contain all the nessasary information. FileCount should be a field. maxFilesPerBin is specific to OFD and it should not be a aparameter but rather something done when constructing the bin packer instance (or a conf)
Done. fileCount is now a nullable field on the shared TableOperation domain object — populated by the scheduler at read time from table_stats. FileCountBinPacker.pack(...) reads it off each TableOperation directly; no more Map<uuid, fileCount> parameter. maxFilesPerBin is constructor-injected on FileCountBinPacker via the scheduler.ofd.max-files-per-bin config; not on pack().
| private String resultsEndpoint; | ||
|
|
||
| @Transactional | ||
| public void schedule() { |
There was a problem hiding this comment.
operation type required, database and table names optional.
There was a problem hiding this comment.
operation type required, database and table names optional.
Done. New signature: schedule(OperationType op) and schedule(OperationType op, Optional<String> databaseName, Optional<String> tableName) (SchedulerRunner.java:43,50). Operation type is required; database and table name are Optional<String>. Mirrors AnalyzerRunner.analyze.
| return count != null ? count : 0L; | ||
| })); | ||
|
|
||
| List<List<TableOperationRow>> bins = BinPacker.pack(pending, fileCountByUuid, maxFiles); |
There was a problem hiding this comment.
We should not be handling rows, that is the database model. Use TableOperation from the analyzer pr.
There was a problem hiding this comment.
We should not be handling rows, that is the database model. Use TableOperation from the analyzer pr.
Done. TableOperation (and its companion enums) moved from the analyzer-internal package to apps/optimizer/.../model/ so both the analyzer and the scheduler consume the same domain type. SchedulerRunner now loads rows, converts them via TableOperation.from(row), enriches each with fileCount from table_stats, and hands List<TableOperation> to the BinPacker. Per-bin logic and the BinPacker interface work on TableOperation, not the JPA row.
| List<TableOperationRow> claimed = | ||
| bin.stream() | ||
| .filter( | ||
| r -> operationsRepo.markScheduling(r.getId(), r.getVersion(), Instant.now()) == 1) |
There was a problem hiding this comment.
Can we batch the writes to limit round trips?
There was a problem hiding this comment.
Can we batch the writes to limit round trips?
Done. Added cancelDuplicatePendingBatch, markSchedulingBatch, markScheduledBatch, and markPendingBatch @Modifying @Query methods on TableOperationsRepository. Each issues one UPDATE/DELETE for a list of IDs with the status-guarded WHERE clause preserved. SchedulerRunner.submitBin now uses these instead of the per-row variants.
…tion domain, batched writes, retry Addresses unresolved review threads on #534: - BinPacker becomes an interface with a single pack(...) method. Strategy is decoupled from operation type; binding to OFD lives in SchedulerConfig as a Map<OperationType, BinPacker> bean. - FileCountBinPacker is the first strategy — named after the dimension it packs on, not the op it serves. maxFilesPerBin is constructor-injected via scheduler.ofd.max-files-per-bin; pack(...) reads fileCount off TableOperation rather than taking a side map. - SchedulerRunner.schedule now takes required OperationType plus optional databaseName/tableName (mirror of AnalyzerRunner.analyze). It loads PENDING rows, converts row → TableOperation, fetches stats for fileCount enrichment, and dispatches to the registered BinPacker. No raw TableOperationRow flows through the per-bin logic. - Writes are batched: cancelDuplicatePendingBatch, markSchedulingBatch, markScheduledBatch, markPendingBatch on TableOperationsRepository each issue one UPDATE/DELETE. - On job-launch failure the scheduler reverts claimed rows from SCHEDULING back to PENDING via markPendingBatch so the next pass retries (replaces the previous log-and-leak that relied on the analyzer's removed scheduledTimeout). - SchedulerApplication's CommandLineRunner loops over the configured Map<OperationType, BinPacker> and calls schedule(opType) per entry. Tests: FileCountBinPackerTest covers the packing strategy; SchedulerRunnerTest covers per-op dispatch, batched claim/mark-scheduled on success, batch-revert on submit failure, duplicate-PENDING dedup, and the unknown-operation-type no-op path. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
SchedulerApplication scanned com.linkedin.openhouse.optimizer.entity for JPA-managed entities, but the entities live in com.linkedin.openhouse.optimizer.db. The app crashed at startup against a real database (MySQL backend in docker) with "Not a managed type: class ...db.TableStatsRow". Unit tests didn't catch it because they configure JPA differently. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Adds Dto suffix to model type refs in the scheduler app: TableOperationDto, TableStatsDto, OperationTypeDto, etc. Also fixes one perl-overreach: SchedulerRunner.PENDING was the db.OperationStatus enum value (not model), restored to OperationStatus. Mechanical follow-up to the opt-0 rename (b31decf). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Relocated from opt-5's commit 1ec3acf to their proper owner (opt-4 introduced these files): 1. FileCountBinPacker.cost() NPE on stats with null snapshot. Analyzer can schedule against a fresh table with no snapshot data; treat that as cost 0 instead of crashing. 2. SchedulerRunner.schedule(OperationType) was non-@transactional; the 3-arg overload was annotated but self-invocation bypasses the CGLIB proxy, so @Modifying repo calls hit TransactionRequiredException. Annotate the no-arg variant too. 3. scheduler.results-endpoint default URL: stale /v1/table-operations → /v1/optimizer/operations (where the controller actually mounts). Both application.properties and application-test.properties. 4. SchedulerRunnerTest's RESULTS_ENDPOINT constant: same URL fix. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
…527) ## Optimizer Stack | PR | Content | |---|---| | #527 **(this)** | API and internal models | | #530 | Database Repos | | #531 | REST service | | #533 | Analyzer app | | #534 | Scheduler app | | #tbd | Spark BatchedOFD app | | #tbd | Infra, docker-compose, smoke test | ## Summary PR 0 of N in the optimizer stack. [Overall Project](https://docs.google.com/document/d/1oGQWkmlVw0HG-D4Nx37q0oUEQd53Ni4q5Q7h1voaZIQ/edit?tab=t.0#heading=h.vtl818e4m9f7) [Service Design doc](https://docs.google.com/document/d/1xYOD7iuPjZO05UWfT1Dkmf4lYNw4iOjqY10UT2X1FAg/edit?tab=t.0). Introduces the optimizer service API and internal model <img width="631" height="410" alt="image" src="https://github.com/user-attachments/assets/8833471d-069a-49a2-9a10-5eb7f3b96a72" /> ## Changes - [ ] Client-facing API Changes - [x] Internal API Changes - [ ] Bug Fixes - [x] New Features - [ ] Performance Improvements - [ ] Code Style - [ ] Refactoring - [ ] Documentation - [ ] Tests ## Testing Done - [ ] Manually Tested on local docker setup. Please include commands ran, and their output. - [ ] Added new tests for the changes made. - [ ] Updated existing tests to reflect the changes made. - [x] No tests added or updated. Please explain why. If unsure, please feel free to ask for help. - [ ] Some other form of testing like staging or soak time in production. Please explain. This PR contains only the data model (entities, DTOs, converters). Repository tests follow in PR 1. Verified: - `./gradlew :services:optimizer:compileJava` passes - `./gradlew compileJava` (full project) passes with no regressions - Spotless formatting passes # Additional Information - [ ] Breaking Changes - [ ] Deprecations - [x] Large PR broken into smaller PRs, and PR plan linked in the description. --------- Co-authored-by: mkuchenbecker <mkuchenbecker@users.noreply.github.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
SchedulerRunner now uses the unified find(...) / updateBatch(...) / cancel(...) shape. The CAS calls become updateBatch(ids, fromStatus, toStatus, Optional<Instant>, Optional<String>) for each transition. findClaimedIds is replaced by a find(...) call keyed on (SCHEDULING, scheduledAt watermark, claimed-ids) that returns rows; caller extracts ids. Dedup move: cancelDuplicatePendingBatch was per-bin AND blind (NOT IN keepIds); two bins in the same cycle nuked each other's rows. The new shape is per-cycle, computed by the runner: group pendingRows by tableUuid, keep the oldest per group (lex tiebreak on id), and call repo.cancel(duplicateIds) once before bin packing. optimizer.repo.default-limit threads through via @value; field is initialised inline so pure-Mockito tests see a sane value. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
## Optimizer Stack | PR | Content | |---|---| | #527 | Data Model | | #530 **(this)** | Database Repos | | #531 | REST service | | #533 | Analyzer app | | #534 | Scheduler app | | #tbd | Spark BatchedOFD app | | #tbd | Infra, docker-compose, smoke test | ## Summary PR 1 of N in the optimizer stack. [Overall Project](https://docs.google.com/document/d/1oGQWkmlVw0HG-D4Nx37q0oUEQd53Ni4q5Q7h1voaZIQ/edit?tab=t.0#heading=h.vtl818e4m9f7) [Service Design doc](https://docs.google.com/document/d/1xYOD7iuPjZO05UWfT1Dkmf4lYNw4iOjqY10UT2X1FAg/edit?tab=t.0). Spring Data JPA repositories for all four optimizer tables with filtered query support, plus tests exercising save/find, filtered queries, upsert semantics, and append-only history. ## Changes - [ ] Client-facing API Changes - [ ] Internal API Changes - [ ] Bug Fixes - [x] New Features - [ ] Performance Improvements - [ ] Code Style - [ ] Refactoring - [ ] Documentation - [x] Tests **Repositories**: `TableOperationsRepository`, `TableOperationsHistoryRepository`, `TableStatsRepository`, `TableStatsHistoryRepository` — each with JPQL filtered query methods. **Tests**: Repository tests for all four tables plus `OptimizerServiceContextTest` verifying the Spring context loads. ## Testing Done - [ ] Manually Tested on local docker setup. Please include commands ran, and their output. - [x] Added new tests for the changes made. - [ ] Updated existing tests to reflect the changes made. - [ ] No tests added or updated. Please explain why. If unsure, please feel free to ask for help. - [ ] Some other form of testing like staging or soak time in production. Please explain. `./gradlew :services:optimizer:test` — all tests pass (H2 in MySQL mode). # Additional Information - [ ] Breaking Changes - [ ] Deprecations - [x] Large PR broken into smaller PRs, and PR plan linked in the description. --------- Co-authored-by: mkuchenbecker <mkuchenbecker@users.noreply.github.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
## Optimizer Stack | PR | Content | |---|---| | #527 | Data Model | | #530 | Database Repos | | #531 **(this)** | REST service | | #533 | Analyzer app | | #534 | Scheduler app | | #tbd | Spark BatchedOFD app | | #tbd | Infra, docker-compose, smoke test | ## Summary PR 2 of N in the optimizer stack. Service layer and REST controllers for the optimizer service, plus the `apps/optimizer` shared module providing lightweight entity/repo copies for the analyzer and scheduler apps. ## Changes - [ ] Client-facing API Changes - [x] Internal API Changes - [ ] Bug Fixes - [x] New Features - [ ] Performance Improvements - [ ] Code Style - [ ] Refactoring - [ ] Documentation - [x] Tests **Service layer**: `OptimizerDataService` interface and `OptimizerDataServiceImpl` — CRUD operations, complete-operation lifecycle, stats upsert with history double-write, filtered queries. **Controllers**: `TableOperationsController`, `TableOperationsHistoryController`, `TableStatsController` — REST endpoints per the design doc API spec. **Shared module** (`apps/optimizer`): Lightweight entity and repository copies used by the analyzer and scheduler apps to read optimizer state directly from MySQL. ## Testing Done - [ ] Manually Tested on local docker setup. Please include commands ran, and their output. - [x] Added new tests for the changes made. - [ ] Updated existing tests to reflect the changes made. - [ ] No tests added or updated. Please explain why. If unsure, please feel free to ask for help. - [ ] Some other form of testing like staging or soak time in production. Please explain. H2 integration tests in `OptimizerDataServiceImplTest` (5 tests): - `completeOperation_writesHistoryFromOperationRow` — saves SCHEDULED row, completes it, asserts history DTO fields - `completeOperation_notFound_returnsEmpty` — completes nonexistent ID, asserts empty - `upsertTableStats_createsNewRow` — upserts new table, asserts DTO and repo row - `upsertTableStats_updatesExistingRow` — upserts twice, asserts overwrite with single row - `upsertTableStats_appendsHistoryOnEveryCall` — upserts twice, asserts 2 history rows ``` ./gradlew :services:optimizer:test # BUILD SUCCESSFUL — all 25 tests pass (repo tests from PR 1 + 5 new service tests) ``` # Additional Information - [ ] Breaking Changes - [ ] Deprecations - [x] Large PR broken into smaller PRs, and PR plan linked in the description. --------- Co-authored-by: mkuchenbecker <mkuchenbecker@users.noreply.github.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Optimizer Stack
Summary
PR 4 of N in the optimizer stack.
Introduces
apps/optimizer-scheduler, a Spring Boot CommandLineRunner that claims PENDING operations and submits batched Spark jobs via the Jobs Service.State machine:
Analyzer creates all Operations as PENDING
Changes
Scheduler runner: Loads PENDING ops, bin-packs by file count, claims via two-step CAS (PENDING → SCHEDULING → SCHEDULED), submits one Spark job per bin.
Bin packer: Greedy first-fit descending algorithm. Oversized tables get their own bin (never dropped). Tables with no stats default to cost 0.
Jobs client: WebClient-based REST client submitting
POST /jobsto the Jobs Service with table names, operation IDs, and results endpoint.Repository additions: Three
@ModifyingCAS methods onTableOperationsRepository—cancelDuplicatePending,markScheduling,markScheduled— required for safe concurrent scheduling.Testing Done
13 unit tests:
BinPackerTest(7 tests) — empty input, single table, under/over limit, oversized table, no stats, descending sortSchedulerRunnerTest(6 tests) — no pending ops, two-step claim + schedule, launch failure, already-claimed skip, duplicate cancellation, multi-row bin claimAdditional Information