Skip to content

feat(optimizer): [4/N] Scheduler app#534

Open
mkuchenbecker wants to merge 62 commits into
mkuchenb/optimizer-3from
mkuchenb/optimizer-4
Open

feat(optimizer): [4/N] Scheduler app#534
mkuchenbecker wants to merge 62 commits into
mkuchenb/optimizer-3from
mkuchenb/optimizer-4

Conversation

@mkuchenbecker
Copy link
Copy Markdown
Collaborator

@mkuchenbecker mkuchenbecker commented Apr 7, 2026

Optimizer Stack

PR Content
#527 Data Model
#530 Database Repos
#531 REST service
#533 Analyzer app
#534 (this) Scheduler app
#tbd Spark BatchedOFD app
#tbd Infra, docker-compose, smoke test

Summary

PR 4 of N in the optimizer stack.

Introduces apps/optimizer-scheduler, a Spring Boot CommandLineRunner that claims PENDING operations and submits batched Spark jobs via the Jobs Service.

image

State machine:
Analyzer creates all Operations as PENDING

  1. Scheduler marks PENDING as SCHEDULING to reserve the operation for a Bin ahead of scheduling to reduce duplicate job submission.
  2. After claiming, the bin is submitted as a single job. If the batch is successfully submitted, the existing operations are updated to a status of scheduled with the associated jobID persisted.
  3. If the submission fails, the job will be reset to pending so another scheduler run can pick it up.
  4. Duplicate pending jobs are cancelled.
  5. SCHEDULING is not expected to be in that state for long, minutes at most, and any operations that are stuck in the SCHEDULING status for >T period of time should move to a state of CANCELLED so next analyzer iteration will create a new PENDING op. This might happen if the scheduler was to suddenly crash or pod rotation. It may or may not happened after the job is scheduled. Therefore its safest to transition to CANCELLED in case the job was successful but not transitioned to scheduled. This is as-compared to failing to submit a job, in which case we know the job will never complete.

Changes

  • Client-facing API Changes
  • Internal API Changes
  • Bug Fixes
  • New Features
  • Performance Improvements
  • Code Style
  • Refactoring
  • Documentation
  • Tests

Scheduler runner: Loads PENDING ops, bin-packs by file count, claims via two-step CAS (PENDING → SCHEDULING → SCHEDULED), submits one Spark job per bin.

Bin packer: Greedy first-fit descending algorithm. Oversized tables get their own bin (never dropped). Tables with no stats default to cost 0.

Jobs client: WebClient-based REST client submitting POST /jobs to the Jobs Service with table names, operation IDs, and results endpoint.

Repository additions: Three @Modifying CAS methods on TableOperationsRepositorycancelDuplicatePending, markScheduling, markScheduled — required for safe concurrent scheduling.

Testing Done

  • Manually Tested on local docker setup. Please include commands ran, and their output.
  • Added new tests for the changes made.
  • Updated existing tests to reflect the changes made.
  • No tests added or updated. Please explain why. If unsure, please feel free to ask for help.
  • Some other form of testing like staging or soak time in production. Please explain.

13 unit tests:

  • BinPackerTest (7 tests) — empty input, single table, under/over limit, oversized table, no stats, descending sort
  • SchedulerRunnerTest (6 tests) — no pending ops, two-step claim + schedule, launch failure, already-claimed skip, duplicate cancellation, multi-row bin claim
./gradlew :apps:optimizer-scheduler:test
# BUILD SUCCESSFUL — 13 tests pass

Additional Information

  • Breaking Changes
  • Deprecations
  • Large PR broken into smaller PRs, and PR plan linked in the description.

…park jobs

Introduces apps/optimizer-scheduler, a Spring Boot CommandLineRunner that:
1. Loads PENDING operations from the optimizer DB
2. Bin-packs them by file count (first-fit descending)
3. Claims rows via CAS (PENDING → SCHEDULING → SCHEDULED)
4. Submits one batched Spark job per bin via the Jobs Service

Adds three @Modifying CAS methods to TableOperationsRepository:
cancelDuplicatePending, markScheduling, markScheduled — required for
safe concurrent scheduling without double-submitting.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@mkuchenbecker mkuchenbecker changed the title feat(optimizer): [4/N] Scheduler app [BDP-102028] feat(optimizer): [4/N] Scheduler app May 13, 2026
@@ -0,0 +1,74 @@
package com.linkedin.openhouse.scheduler;

import com.linkedin.openhouse.optimizer.entity.TableOperationRow;
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every operation needs it own binpacker. Its different based on each operation as constraints are different. Rather than generic binpacking, we should have an OFD scheduler that is attempting to binpack pending ofd jobs. it should be a similar pattern to the analyzer where we run the scheduler requires an operation type but database and table name are optional.

So we should have a Binpacker interface and a mapping from operation to binpacker.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every operation needs it own binpacker. Its different based on each operation as constraints are different. Rather than generic binpacking, we should have an OFD scheduler that is attempting to binpack pending ofd jobs. it should be a similar pattern to the analyzer where we run the scheduler requires an operation type but database and table name are optional.

So we should have a Binpacker interface and a mapping from operation to binpacker.

Done. BinPacker is now an interface with a single pack(List<TableOperation>) method. SchedulerConfig defines a Map<OperationType, BinPacker> bean — ORPHAN_FILES_DELETION is wired to a FileCountBinPacker instance. SchedulerApplication's CommandLineRunner loops over the map's keys and calls runner.schedule(opType) per entry, mirroring AnalyzerApplication. SchedulerRunner.schedule(OperationType, Optional<String> db, Optional<String> tableName) is the new signature.

* @return list of bins, each bin being a non-empty list of rows
*/
public static List<List<TableOperationRow>> pack(
List<TableOperationRow> pending, Map<String, Long> fileCountByUuid, long maxFilesPerBin) {
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface is bad and inflexible. TableOperationRow should contain all the nessasary information. FileCount should be a field. maxFilesPerBin is specific to OFD and it should not be a aparameter but rather something done when constructing the bin packer instance (or a conf)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface is bad and inflexible. TableOperationRow should contain all the nessasary information. FileCount should be a field. maxFilesPerBin is specific to OFD and it should not be a aparameter but rather something done when constructing the bin packer instance (or a conf)

Done. fileCount is now a nullable field on the shared TableOperation domain object — populated by the scheduler at read time from table_stats. FileCountBinPacker.pack(...) reads it off each TableOperation directly; no more Map<uuid, fileCount> parameter. maxFilesPerBin is constructor-injected on FileCountBinPacker via the scheduler.ofd.max-files-per-bin config; not on pack().

private String resultsEndpoint;

@Transactional
public void schedule() {
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

operation type required, database and table names optional.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

operation type required, database and table names optional.

Done. New signature: schedule(OperationType op) and schedule(OperationType op, Optional<String> databaseName, Optional<String> tableName) (SchedulerRunner.java:43,50). Operation type is required; database and table name are Optional<String>. Mirrors AnalyzerRunner.analyze.

return count != null ? count : 0L;
}));

List<List<TableOperationRow>> bins = BinPacker.pack(pending, fileCountByUuid, maxFiles);
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not be handling rows, that is the database model. Use TableOperation from the analyzer pr.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not be handling rows, that is the database model. Use TableOperation from the analyzer pr.

Done. TableOperation (and its companion enums) moved from the analyzer-internal package to apps/optimizer/.../model/ so both the analyzer and the scheduler consume the same domain type. SchedulerRunner now loads rows, converts them via TableOperation.from(row), enriches each with fileCount from table_stats, and hands List<TableOperation> to the BinPacker. Per-bin logic and the BinPacker interface work on TableOperation, not the JPA row.

List<TableOperationRow> claimed =
bin.stream()
.filter(
r -> operationsRepo.markScheduling(r.getId(), r.getVersion(), Instant.now()) == 1)
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we batch the writes to limit round trips?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we batch the writes to limit round trips?

Done. Added cancelDuplicatePendingBatch, markSchedulingBatch, markScheduledBatch, and markPendingBatch @Modifying @Query methods on TableOperationsRepository. Each issues one UPDATE/DELETE for a list of IDs with the status-guarded WHERE clause preserved. SchedulerRunner.submitBin now uses these instead of the per-row variants.

mkuchenbecker and others added 3 commits May 13, 2026 17:04
…tion domain, batched writes, retry

Addresses unresolved review threads on #534:

- BinPacker becomes an interface with a single pack(...) method. Strategy
  is decoupled from operation type; binding to OFD lives in
  SchedulerConfig as a Map<OperationType, BinPacker> bean.
- FileCountBinPacker is the first strategy — named after the dimension it
  packs on, not the op it serves. maxFilesPerBin is constructor-injected
  via scheduler.ofd.max-files-per-bin; pack(...) reads fileCount off
  TableOperation rather than taking a side map.
- SchedulerRunner.schedule now takes required OperationType plus optional
  databaseName/tableName (mirror of AnalyzerRunner.analyze). It loads
  PENDING rows, converts row → TableOperation, fetches stats for
  fileCount enrichment, and dispatches to the registered BinPacker. No
  raw TableOperationRow flows through the per-bin logic.
- Writes are batched: cancelDuplicatePendingBatch, markSchedulingBatch,
  markScheduledBatch, markPendingBatch on TableOperationsRepository each
  issue one UPDATE/DELETE.
- On job-launch failure the scheduler reverts claimed rows from SCHEDULING
  back to PENDING via markPendingBatch so the next pass retries (replaces
  the previous log-and-leak that relied on the analyzer's removed
  scheduledTimeout).
- SchedulerApplication's CommandLineRunner loops over the configured
  Map<OperationType, BinPacker> and calls schedule(opType) per entry.

Tests: FileCountBinPackerTest covers the packing strategy;
SchedulerRunnerTest covers per-op dispatch, batched claim/mark-scheduled
on success, batch-revert on submit failure, duplicate-PENDING dedup, and
the unknown-operation-type no-op path.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
mkuchenbecker and others added 3 commits May 18, 2026 15:12
…avadoc

Removed the "suitable for ... e.g. snapshot expiration" framing; the
javadoc now states only what the class does.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@mkuchenbecker mkuchenbecker marked this pull request as ready for review May 18, 2026 23:55
mkuchenbecker and others added 8 commits May 19, 2026 14:27
SchedulerApplication scanned com.linkedin.openhouse.optimizer.entity for
JPA-managed entities, but the entities live in
com.linkedin.openhouse.optimizer.db. The app crashed at startup against
a real database (MySQL backend in docker) with "Not a managed type:
class ...db.TableStatsRow". Unit tests didn't catch it because they
configure JPA differently.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Adds Dto suffix to model type refs in the scheduler app:
TableOperationDto, TableStatsDto, OperationTypeDto, etc.

Also fixes one perl-overreach: SchedulerRunner.PENDING was the
db.OperationStatus enum value (not model), restored to OperationStatus.

Mechanical follow-up to the opt-0 rename (b31decf).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Relocated from opt-5's commit 1ec3acf to their proper owner (opt-4
introduced these files):

  1. FileCountBinPacker.cost() NPE on stats with null snapshot.
     Analyzer can schedule against a fresh table with no snapshot data;
     treat that as cost 0 instead of crashing.

  2. SchedulerRunner.schedule(OperationType) was non-@transactional;
     the 3-arg overload was annotated but self-invocation bypasses the
     CGLIB proxy, so @Modifying repo calls hit TransactionRequiredException.
     Annotate the no-arg variant too.

  3. scheduler.results-endpoint default URL: stale /v1/table-operations
     → /v1/optimizer/operations (where the controller actually mounts).
     Both application.properties and application-test.properties.

  4. SchedulerRunnerTest's RESULTS_ENDPOINT constant: same URL fix.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
mkuchenbecker added a commit that referenced this pull request May 20, 2026
…527)

## Optimizer Stack

| PR | Content |
|---|---|
| #527 **(this)** | API and
internal models |
| #530 | Database Repos |
| #531 | REST service |
| #533 | Analyzer app |
| #534 | Scheduler app |
| #tbd | Spark BatchedOFD app |
| #tbd | Infra, docker-compose, smoke test |

## Summary

PR 0 of N in the optimizer stack. 
[Overall
Project](https://docs.google.com/document/d/1oGQWkmlVw0HG-D4Nx37q0oUEQd53Ni4q5Q7h1voaZIQ/edit?tab=t.0#heading=h.vtl818e4m9f7)
[Service Design
doc](https://docs.google.com/document/d/1xYOD7iuPjZO05UWfT1Dkmf4lYNw4iOjqY10UT2X1FAg/edit?tab=t.0).

Introduces the optimizer service API and internal model

<img width="631" height="410" alt="image"
src="https://github.com/user-attachments/assets/8833471d-069a-49a2-9a10-5eb7f3b96a72"
/>

## Changes

- [ ] Client-facing API Changes
- [x] Internal API Changes
- [ ] Bug Fixes
- [x] New Features
- [ ] Performance Improvements
- [ ] Code Style
- [ ] Refactoring
- [ ] Documentation
- [ ] Tests

## Testing Done

- [ ] Manually Tested on local docker setup. Please include commands
ran, and their output.
- [ ] Added new tests for the changes made.
- [ ] Updated existing tests to reflect the changes made.
- [x] No tests added or updated. Please explain why. If unsure, please
feel free to ask for help.
- [ ] Some other form of testing like staging or soak time in
production. Please explain.

This PR contains only the data model (entities, DTOs, converters).
Repository tests follow in PR 1. Verified:
- `./gradlew :services:optimizer:compileJava` passes
- `./gradlew compileJava` (full project) passes with no regressions
- Spotless formatting passes

# Additional Information

- [ ] Breaking Changes
- [ ] Deprecations
- [x] Large PR broken into smaller PRs, and PR plan linked in the
description.

---------

Co-authored-by: mkuchenbecker <mkuchenbecker@users.noreply.github.com>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
mkuchenbecker and others added 3 commits May 20, 2026 17:25
SchedulerRunner now uses the unified find(...) / updateBatch(...) /
cancel(...) shape. The CAS calls become updateBatch(ids, fromStatus,
toStatus, Optional<Instant>, Optional<String>) for each transition.
findClaimedIds is replaced by a find(...) call keyed on (SCHEDULING,
scheduledAt watermark, claimed-ids) that returns rows; caller extracts
ids.

Dedup move:
  cancelDuplicatePendingBatch was per-bin AND blind (NOT IN keepIds);
  two bins in the same cycle nuked each other's rows. The new shape is
  per-cycle, computed by the runner: group pendingRows by tableUuid,
  keep the oldest per group (lex tiebreak on id), and call
  repo.cancel(duplicateIds) once before bin packing.

optimizer.repo.default-limit threads through via @value; field is
initialised inline so pure-Mockito tests see a sane value.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
mkuchenbecker added a commit that referenced this pull request May 21, 2026
## Optimizer Stack

| PR | Content |
|---|---|
| #527 | Data Model |
| #530 **(this)** | Database
Repos |
| #531 | REST service |
| #533 | Analyzer app |
| #534 | Scheduler app |
| #tbd | Spark BatchedOFD app |
| #tbd | Infra, docker-compose, smoke test |

## Summary

PR 1 of N in the optimizer stack. 
[Overall
Project](https://docs.google.com/document/d/1oGQWkmlVw0HG-D4Nx37q0oUEQd53Ni4q5Q7h1voaZIQ/edit?tab=t.0#heading=h.vtl818e4m9f7)
[Service Design
doc](https://docs.google.com/document/d/1xYOD7iuPjZO05UWfT1Dkmf4lYNw4iOjqY10UT2X1FAg/edit?tab=t.0).


Spring Data JPA repositories for all four optimizer tables with filtered
query support, plus tests exercising save/find, filtered queries, upsert
semantics, and append-only history.

## Changes

- [ ] Client-facing API Changes
- [ ] Internal API Changes
- [ ] Bug Fixes
- [x] New Features
- [ ] Performance Improvements
- [ ] Code Style
- [ ] Refactoring
- [ ] Documentation
- [x] Tests

**Repositories**: `TableOperationsRepository`,
`TableOperationsHistoryRepository`, `TableStatsRepository`,
`TableStatsHistoryRepository` — each with JPQL filtered query methods.

**Tests**: Repository tests for all four tables plus
`OptimizerServiceContextTest` verifying the Spring context loads.

## Testing Done

- [ ] Manually Tested on local docker setup. Please include commands
ran, and their output.
- [x] Added new tests for the changes made.
- [ ] Updated existing tests to reflect the changes made.
- [ ] No tests added or updated. Please explain why. If unsure, please
feel free to ask for help.
- [ ] Some other form of testing like staging or soak time in
production. Please explain.

`./gradlew :services:optimizer:test` — all tests pass (H2 in MySQL
mode).

# Additional Information

- [ ] Breaking Changes
- [ ] Deprecations
- [x] Large PR broken into smaller PRs, and PR plan linked in the
description.

---------

Co-authored-by: mkuchenbecker <mkuchenbecker@users.noreply.github.com>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
@mkuchenbecker mkuchenbecker changed the title [BDP-102028] feat(optimizer): [4/N] Scheduler app feat(optimizer): [4/N] Scheduler app May 22, 2026
mkuchenbecker added a commit that referenced this pull request May 22, 2026
## Optimizer Stack

| PR | Content |
|---|---|
| #527 | Data Model |
| #530 | Database Repos |
| #531 **(this)** | REST
service |
| #533 | Analyzer app |
| #534 | Scheduler app |
| #tbd | Spark BatchedOFD app |
| #tbd | Infra, docker-compose, smoke test |

## Summary

PR 2 of N in the optimizer stack. 

Service layer and REST controllers for the optimizer service, plus the
`apps/optimizer` shared module providing lightweight entity/repo copies
for the analyzer and scheduler apps.

## Changes

- [ ] Client-facing API Changes
- [x] Internal API Changes
- [ ] Bug Fixes
- [x] New Features
- [ ] Performance Improvements
- [ ] Code Style
- [ ] Refactoring
- [ ] Documentation
- [x] Tests

**Service layer**: `OptimizerDataService` interface and
`OptimizerDataServiceImpl` — CRUD operations, complete-operation
lifecycle, stats upsert with history double-write, filtered queries.

**Controllers**: `TableOperationsController`,
`TableOperationsHistoryController`, `TableStatsController` — REST
endpoints per the design doc API spec.

**Shared module** (`apps/optimizer`): Lightweight entity and repository
copies used by the analyzer and scheduler apps to read optimizer state
directly from MySQL.

## Testing Done

- [ ] Manually Tested on local docker setup. Please include commands
ran, and their output.
- [x] Added new tests for the changes made.
- [ ] Updated existing tests to reflect the changes made.
- [ ] No tests added or updated. Please explain why. If unsure, please
feel free to ask for help.
- [ ] Some other form of testing like staging or soak time in
production. Please explain.

H2 integration tests in `OptimizerDataServiceImplTest` (5 tests):
- `completeOperation_writesHistoryFromOperationRow` — saves SCHEDULED
row, completes it, asserts history DTO fields
- `completeOperation_notFound_returnsEmpty` — completes nonexistent ID,
asserts empty
- `upsertTableStats_createsNewRow` — upserts new table, asserts DTO and
repo row
- `upsertTableStats_updatesExistingRow` — upserts twice, asserts
overwrite with single row
- `upsertTableStats_appendsHistoryOnEveryCall` — upserts twice, asserts
2 history rows

```
./gradlew :services:optimizer:test
# BUILD SUCCESSFUL — all 25 tests pass (repo tests from PR 1 + 5 new service tests)
```

# Additional Information

- [ ] Breaking Changes
- [ ] Deprecations
- [x] Large PR broken into smaller PRs, and PR plan linked in the
description.

---------

Co-authored-by: mkuchenbecker <mkuchenbecker@users.noreply.github.com>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant