diff --git a/build.gradle b/build.gradle index f9cd741f2..f7cdd71a8 100644 --- a/build.gradle +++ b/build.gradle @@ -177,6 +177,7 @@ tasks.register('CopyGitHooksTask', Copy) { // tables-service.Dockerfile -> :services:tables:bootJar // housetables-service.Dockerfile -> :services:housetables:bootJar // jobs-service.Dockerfile -> :services:jobs:bootJar +// optimizer-service.Dockerfile -> :services:optimizer:bootJar // jobs-scheduler.Dockerfile -> :apps:openhouse-spark-apps_2.12:shadowJar (uber JAR) // spark-base-hadoop2.8.dockerfile -> // :integrations:spark:spark-3.1:openhouse-spark-runtime_2.12:shadowJar (uber JAR) @@ -196,6 +197,7 @@ tasks.register('dockerPrereqs') { dependsOn ':services:tables:bootJar' dependsOn ':services:housetables:bootJar' dependsOn ':services:jobs:bootJar' + dependsOn ':services:optimizer:bootJar' // Spark runtime uber JARs (shadowJar) dependsOn ':integrations:spark:spark-3.1:openhouse-spark-runtime_2.12:shadowJar' @@ -219,6 +221,7 @@ tasks.register('dockerPrereqs') { println ' build/tables/libs/tables.jar' println ' build/housetables/libs/housetables.jar' println ' build/jobs/libs/jobs.jar' + println ' build/optimizer/libs/optimizer.jar' println ' build/openhouse-spark-runtime_2.12/libs/openhouse-spark-runtime_2.12-uber.jar' println ' build/openhouse-spark-3.5-runtime_2.12/libs/openhouse-spark-3.5-runtime_2.12-uber.jar' println ' build/openhouse-spark-apps_2.12/libs/openhouse-spark-apps_2.12-uber.jar' diff --git a/services/optimizer/build.gradle b/services/optimizer/build.gradle new file mode 100644 index 000000000..2de8fd5c7 --- /dev/null +++ b/services/optimizer/build.gradle @@ -0,0 +1,13 @@ +plugins { + id 'openhouse.springboot-ext-conventions' + id 'org.springframework.boot' version '2.7.8' +} + +dependencies { + implementation 'org.springframework.boot:spring-boot-starter-web:2.7.8' + testImplementation 'org.springframework.boot:spring-boot-starter-test:2.7.8' +} + +test { + useJUnitPlatform() +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/OptimizerServiceApplication.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/OptimizerServiceApplication.java new file mode 100644 index 000000000..38eb363a8 --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/OptimizerServiceApplication.java @@ -0,0 +1,13 @@ +package com.linkedin.openhouse.optimizer; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** Spring Boot entry point for the Optimizer Service. */ +@SpringBootApplication +public class OptimizerServiceApplication { + + public static void main(String[] args) { + SpringApplication.run(OptimizerServiceApplication.class, args); + } +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/HistoryStatus.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/HistoryStatus.java new file mode 100644 index 000000000..1d799818f --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/HistoryStatus.java @@ -0,0 +1,21 @@ +package com.linkedin.openhouse.optimizer.api.spec; + +/** Terminal states for a completed Spark maintenance job. */ +public enum HistoryStatus { + + /** The Spark job for this operation completed successfully. */ + SUCCESS, + + /** The Spark job for this operation failed. */ + FAILED; + + /** Convert to the internal-model counterpart. */ + public com.linkedin.openhouse.optimizer.model.HistoryStatusDto toModel() { + return com.linkedin.openhouse.optimizer.model.HistoryStatusDto.valueOf(name()); + } + + /** Build the api-layer enum from the internal-model counterpart. */ + public static HistoryStatus fromModel(com.linkedin.openhouse.optimizer.model.HistoryStatusDto v) { + return v == null ? null : HistoryStatus.valueOf(v.name()); + } +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/OperationStatus.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/OperationStatus.java new file mode 100644 index 000000000..b1cbe42b0 --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/OperationStatus.java @@ -0,0 +1,32 @@ +package com.linkedin.openhouse.optimizer.api.spec; + +/** Lifecycle states for a table operation recommendation. */ +public enum OperationStatus { + + /** Recommended by the Analyzer but not yet claimed by the Scheduler. */ + PENDING, + + /** Claimed by the Scheduler; waiting for the Jobs Service to return a job ID. */ + SCHEDULING, + + /** Job submitted to the Jobs Service; the row now carries a {@code jobId}. */ + SCHEDULED, + + /** + * Marked by the Scheduler when it detects duplicate PENDING rows for the same {@code (table_uuid, + * operation_type)}. Only the most-recent PENDING row is claimed; older duplicates are CANCELED + * before the claim step. + */ + CANCELED; + + /** Convert to the internal-model counterpart. */ + public com.linkedin.openhouse.optimizer.model.OperationStatusDto toModel() { + return com.linkedin.openhouse.optimizer.model.OperationStatusDto.valueOf(name()); + } + + /** Build the api-layer enum from the internal-model counterpart. */ + public static OperationStatus fromModel( + com.linkedin.openhouse.optimizer.model.OperationStatusDto v) { + return v == null ? null : OperationStatus.valueOf(v.name()); + } +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/OperationType.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/OperationType.java new file mode 100644 index 000000000..ea6d2797c --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/OperationType.java @@ -0,0 +1,17 @@ +package com.linkedin.openhouse.optimizer.api.spec; + +/** Maintenance operation types supported by the continuous optimizer. */ +public enum OperationType { + /** Removes orphaned data files no longer referenced by table metadata. */ + ORPHAN_FILES_DELETION; + + /** Convert to the internal-model counterpart. */ + public com.linkedin.openhouse.optimizer.model.OperationTypeDto toModel() { + return com.linkedin.openhouse.optimizer.model.OperationTypeDto.valueOf(name()); + } + + /** Build the api-layer enum from the internal-model counterpart. */ + public static OperationType fromModel(com.linkedin.openhouse.optimizer.model.OperationTypeDto v) { + return v == null ? null : OperationType.valueOf(v.name()); + } +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/TableOperations.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/TableOperations.java new file mode 100644 index 000000000..0bca95734 --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/TableOperations.java @@ -0,0 +1,76 @@ +package com.linkedin.openhouse.optimizer.api.spec; + +import com.linkedin.openhouse.optimizer.model.TableOperationDto; +import java.time.Instant; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** DTO for {@code table_operations} — Analyzer recommendations read by the Scheduler. */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class TableOperations { + + /** Client-generated UUID identifying this specific operation recommendation. */ + private String id; + + /** Stable table identity from the Tables Service. */ + private String tableUuid; + + /** Denormalized database name for display; not part of the primary key. */ + private String databaseName; + + /** Denormalized table name for display; not part of the primary key. */ + private String tableName; + + /** The type of maintenance operation (e.g. ORPHAN_FILES_DELETION). */ + private OperationType operationType; + + /** {@code PENDING} or {@code SCHEDULED}. Defaults to {@code PENDING} on creation. */ + private OperationStatus status; + + /** Server-set when the row is first created by the Analyzer. */ + private Instant createdAt; + + /** Set by the Scheduler when claiming; {@code null} while PENDING. */ + private Instant scheduledAt; + + /** Job ID returned by the Jobs Service after successful submission. */ + private String jobId; + + /** Convert to the internal-model counterpart. */ + public TableOperationDto toModel() { + return TableOperationDto.builder() + .id(id) + .tableUuid(tableUuid) + .databaseName(databaseName) + .tableName(tableName) + .operationType(operationType == null ? null : operationType.toModel()) + .status(status == null ? null : status.toModel()) + .createdAt(createdAt) + .scheduledAt(scheduledAt) + .jobId(jobId) + .build(); + } + + /** Build a wire DTO from the internal-model counterpart. */ + public static TableOperations fromModel(TableOperationDto op) { + if (op == null) { + return null; + } + return TableOperations.builder() + .id(op.getId()) + .tableUuid(op.getTableUuid()) + .databaseName(op.getDatabaseName()) + .tableName(op.getTableName()) + .operationType(OperationType.fromModel(op.getOperationType())) + .status(OperationStatus.fromModel(op.getStatus())) + .createdAt(op.getCreatedAt()) + .scheduledAt(op.getScheduledAt()) + .jobId(op.getJobId()) + .build(); + } +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/TableOperationsHistory.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/TableOperationsHistory.java new file mode 100644 index 000000000..7a000f840 --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/TableOperationsHistory.java @@ -0,0 +1,66 @@ +package com.linkedin.openhouse.optimizer.api.spec; + +import com.linkedin.openhouse.optimizer.model.TableOperationsHistoryDto; +import java.time.Instant; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** DTO for {@code table_operations_history} — append-only operation results. */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class TableOperationsHistory { + + /** Same UUID as the originating {@code table_operations.id}; supplied by the caller. */ + private String id; + + /** Stable table identity from the Tables Service. */ + private String tableUuid; + + /** Denormalized database name for display. */ + private String databaseName; + + /** Denormalized table name for display. */ + private String tableName; + + /** The type of maintenance operation this history row records. */ + private OperationType operationType; + + /** When the operation completed, as recorded by the complete endpoint. */ + private Instant completedAt; + + /** {@code SUCCESS} or {@code FAILED}. */ + private HistoryStatus status; + + /** Convert to the internal-model counterpart. */ + public TableOperationsHistoryDto toModel() { + return TableOperationsHistoryDto.builder() + .id(id) + .tableUuid(tableUuid) + .databaseName(databaseName) + .tableName(tableName) + .operationType(operationType == null ? null : operationType.toModel()) + .completedAt(completedAt) + .status(status == null ? null : status.toModel()) + .build(); + } + + /** Build a wire DTO from the internal-model counterpart. */ + public static TableOperationsHistory fromModel(TableOperationsHistoryDto h) { + if (h == null) { + return null; + } + return TableOperationsHistory.builder() + .id(h.getId()) + .tableUuid(h.getTableUuid()) + .databaseName(h.getDatabaseName()) + .tableName(h.getTableName()) + .operationType(OperationType.fromModel(h.getOperationType())) + .completedAt(h.getCompletedAt()) + .status(HistoryStatus.fromModel(h.getStatus())) + .build(); + } +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/TableStats.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/TableStats.java new file mode 100644 index 000000000..41f44f763 --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/TableStats.java @@ -0,0 +1,70 @@ +package com.linkedin.openhouse.optimizer.api.spec; + +import java.time.Instant; +import java.util.Collections; +import java.util.Map; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** DTO for {@code table_stats} — used for response payloads. */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class TableStats { + + /** Stable Iceberg table UUID. Primary key of the stats row. */ + private String tableUuid; + + /** Denormalized database name for display. */ + private String databaseName; + + /** Denormalized table name for display. */ + private String tableName; + + /** Combined snapshot + delta stats payload, stored as JSON. */ + private TableStatsPayload stats; + + /** Current table properties snapshot (e.g. maintenance opt-in flags). */ + private Map tableProperties; + + /** When this row was last written. Used for staleness monitoring. */ + private Instant updatedAt; + + /** Convert to the internal-model counterpart. */ + public com.linkedin.openhouse.optimizer.model.TableStatsDto toModel() { + com.linkedin.openhouse.optimizer.model.TableStatsDto payload = + stats == null + ? new com.linkedin.openhouse.optimizer.model.TableStatsDto() + : stats.toModel(); + return payload + .toBuilder() + .tableUuid(tableUuid) + .databaseName(databaseName) + .tableName(tableName) + .tableProperties(tableProperties != null ? tableProperties : Collections.emptyMap()) + .updatedAt(updatedAt) + .build(); + } + + /** Build a wire DTO from the internal-model counterpart. */ + public static TableStats fromModel(com.linkedin.openhouse.optimizer.model.TableStatsDto m) { + if (m == null) { + return null; + } + return TableStats.builder() + .tableUuid(m.getTableUuid()) + .databaseName(m.getDatabaseName()) + .tableName(m.getTableName()) + .stats( + TableStatsPayload.builder() + .snapshot(TableStatsPayload.SnapshotMetricsDto.fromModel(m.getSnapshot())) + .delta(TableStatsPayload.CommitDeltaDto.fromModel(m.getDelta())) + .build()) + .tableProperties(m.getTableProperties()) + .updatedAt(m.getUpdatedAt()) + .build(); + } +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/TableStatsHistory.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/TableStatsHistory.java new file mode 100644 index 000000000..5508aca27 --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/TableStatsHistory.java @@ -0,0 +1,61 @@ +package com.linkedin.openhouse.optimizer.api.spec; + +import com.linkedin.openhouse.optimizer.model.TableStatsHistoryDto; +import java.time.Instant; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** DTO for {@code table_stats_history} — used for response payloads. */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class TableStatsHistory { + + /** UUID primary key set by the caller. */ + private String id; + + /** Stable Iceberg table UUID. */ + private String tableUuid; + + /** Denormalized database name for display. */ + private String databaseName; + + /** Denormalized table name for display. */ + private String tableName; + + /** Snapshot + delta stats from this commit event. */ + private TableStatsPayload stats; + + /** When this history row was recorded. */ + private Instant recordedAt; + + /** Convert to the internal-model counterpart. */ + public TableStatsHistoryDto toModel() { + return TableStatsHistoryDto.builder() + .id(id) + .tableUuid(tableUuid) + .databaseName(databaseName) + .tableName(tableName) + .stats(stats == null ? null : stats.toModel()) + .recordedAt(recordedAt) + .build(); + } + + /** Build a wire DTO from the internal-model counterpart. */ + public static TableStatsHistory fromModel(TableStatsHistoryDto h) { + if (h == null) { + return null; + } + return TableStatsHistory.builder() + .id(h.getId()) + .tableUuid(h.getTableUuid()) + .databaseName(h.getDatabaseName()) + .tableName(h.getTableName()) + .stats(TableStatsPayload.fromModel(h.getStats())) + .recordedAt(h.getRecordedAt()) + .build(); + } +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/TableStatsPayload.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/TableStatsPayload.java new file mode 100644 index 000000000..c347bf385 --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/TableStatsPayload.java @@ -0,0 +1,137 @@ +package com.linkedin.openhouse.optimizer.api.spec; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Combined stats payload exposed on the optimizer wire API. + * + *

API-layer copy of the stats payload — self-contained, evolved only when the wire contract + * changes. + */ +@Data +@Builder(toBuilder = true) +@NoArgsConstructor +@AllArgsConstructor +@JsonIgnoreProperties(ignoreUnknown = true) +public class TableStatsPayload { + + /** Snapshot fields — overwritten on every upsert. */ + private SnapshotMetricsDto snapshot; + + /** Delta fields — accumulated across commit events. */ + private CommitDeltaDto delta; + + /** Convert to the internal-model counterpart. */ + public com.linkedin.openhouse.optimizer.model.TableStatsDto toModel() { + return com.linkedin.openhouse.optimizer.model.TableStatsDto.builder() + .snapshot(snapshot == null ? null : snapshot.toModel()) + .delta(delta == null ? null : delta.toModel()) + .build(); + } + + /** Build the api-layer payload from the internal-model counterpart. */ + public static TableStatsPayload fromModel( + com.linkedin.openhouse.optimizer.model.TableStatsDto m) { + if (m == null) { + return null; + } + return TableStatsPayload.builder() + .snapshot(SnapshotMetricsDto.fromModel(m.getSnapshot())) + .delta(CommitDeltaDto.fromModel(m.getDelta())) + .build(); + } + + /** Point-in-time metadata read from Iceberg at scan time. */ + @Data + @Builder(toBuilder = true) + @NoArgsConstructor + @AllArgsConstructor + @JsonIgnoreProperties(ignoreUnknown = true) + public static class SnapshotMetricsDto { + + /** Iceberg metadata version pointer for this snapshot. */ + private String tableVersion; + + /** Filesystem path (or URI) of the table's storage root. */ + private String tableLocation; + + /** Total on-disk size of the table at this snapshot, in bytes. */ + private Long tableSizeBytes; + + /** Total number of data files as of the latest snapshot — used for bin-packing. */ + private Long numCurrentFiles; + + /** Convert to the internal-model counterpart. */ + public com.linkedin.openhouse.optimizer.model.TableStatsDto.SnapshotMetrics toModel() { + return com.linkedin.openhouse.optimizer.model.TableStatsDto.SnapshotMetrics.builder() + .tableVersion(tableVersion) + .tableLocation(tableLocation) + .tableSizeBytes(tableSizeBytes) + .numCurrentFiles(numCurrentFiles) + .build(); + } + + /** Build the api-layer inner object from the internal-model counterpart. */ + public static SnapshotMetricsDto fromModel( + com.linkedin.openhouse.optimizer.model.TableStatsDto.SnapshotMetrics m) { + if (m == null) { + return null; + } + return SnapshotMetricsDto.builder() + .tableVersion(m.getTableVersion()) + .tableLocation(m.getTableLocation()) + .tableSizeBytes(m.getTableSizeBytes()) + .numCurrentFiles(m.getNumCurrentFiles()) + .build(); + } + } + + /** Per-commit incremental counters; accumulated across all recorded commit events. */ + @Data + @Builder(toBuilder = true) + @NoArgsConstructor + @AllArgsConstructor + @JsonIgnoreProperties(ignoreUnknown = true) + public static class CommitDeltaDto { + + /** Number of data files this commit added to the table. */ + private Long numFilesAdded; + + /** Number of data files this commit removed from the table. */ + private Long numFilesDeleted; + + /** Total bytes added by this commit. */ + private Long addedSizeBytes; + + /** Total bytes removed by this commit. */ + private Long deletedSizeBytes; + + /** Convert to the internal-model counterpart. */ + public com.linkedin.openhouse.optimizer.model.TableStatsDto.CommitDelta toModel() { + return com.linkedin.openhouse.optimizer.model.TableStatsDto.CommitDelta.builder() + .numFilesAdded(numFilesAdded) + .numFilesDeleted(numFilesDeleted) + .addedSizeBytes(addedSizeBytes) + .deletedSizeBytes(deletedSizeBytes) + .build(); + } + + /** Build the api-layer inner object from the internal-model counterpart. */ + public static CommitDeltaDto fromModel( + com.linkedin.openhouse.optimizer.model.TableStatsDto.CommitDelta m) { + if (m == null) { + return null; + } + return CommitDeltaDto.builder() + .numFilesAdded(m.getNumFilesAdded()) + .numFilesDeleted(m.getNumFilesDeleted()) + .addedSizeBytes(m.getAddedSizeBytes()) + .deletedSizeBytes(m.getDeletedSizeBytes()) + .build(); + } + } +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/UpdateOperationRequest.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/UpdateOperationRequest.java new file mode 100644 index 000000000..a216e9db3 --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/UpdateOperationRequest.java @@ -0,0 +1,47 @@ +package com.linkedin.openhouse.optimizer.api.spec; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Request body for {@code POST /v1/table-operations/update}. + * + *

Reports the outcome of a single operation update. The service looks up the operation row by + * {@link #operationId} and writes a history entry for it. + * + *

A single Spark job typically processes N tables and yields N independent (status) outcomes — + * one per operation. Callers issue one update request per operation; the service does not + * bulk-update by job. + * + *

The remaining fields ({@link #tableUuid}, {@link #databaseName}, {@link #tableName}, {@link + * #operationType}) are debug-only echo information. The server does not key off them; they are + * preserved on log lines and traces so an operator looking at a failing update call can see which + * (db, table, operation) the caller believed it was updating without joining back to the operation + * row. + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class UpdateOperationRequest { + + /** Operation row's UUID — the primary lookup key. */ + private String operationId; + + /** Terminal outcome for this single operation. */ + private HistoryStatus status; + + /** Debug echo: stable table identity the caller believed it was completing. */ + private String tableUuid; + + /** Debug echo: database name. */ + private String databaseName; + + /** Debug echo: table name. */ + private String tableName; + + /** Debug echo: operation type. */ + private OperationType operationType; +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/UpsertTableStatsRequest.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/UpsertTableStatsRequest.java new file mode 100644 index 000000000..d1b4a5fe2 --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/spec/UpsertTableStatsRequest.java @@ -0,0 +1,52 @@ +package com.linkedin.openhouse.optimizer.api.spec; + +import java.util.Collections; +import java.util.Map; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Request body for {@code PUT /v1/table-stats/{tableUuid}}. + * + *

{@code tableUuid} comes from the path variable. {@code databaseName} and {@code tableName} are + * denormalized display columns carried in the body. + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class UpsertTableStatsRequest { + + /** Denormalized database name for display. */ + private String databaseName; + + /** Denormalized table name for display. */ + private String tableName; + + /** Combined snapshot + delta stats payload from this commit. */ + private TableStatsPayload stats; + + /** Current table properties snapshot (e.g. maintenance opt-in flags). */ + private Map tableProperties; + + /** + * Build the internal-model {@link com.linkedin.openhouse.optimizer.model.TableStatsDto} described + * by this request. {@code tableUuid} comes from the URL path, not the body. {@code updatedAt} is + * left {@code null}; the service stamps it server-side at write time. + */ + public com.linkedin.openhouse.optimizer.model.TableStatsDto toModel(String tableUuid) { + com.linkedin.openhouse.optimizer.model.TableStatsDto payload = + stats == null + ? new com.linkedin.openhouse.optimizer.model.TableStatsDto() + : stats.toModel(); + return payload + .toBuilder() + .tableUuid(tableUuid) + .databaseName(databaseName) + .tableName(tableName) + .tableProperties(tableProperties != null ? tableProperties : Collections.emptyMap()) + .build(); + } +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/HistoryStatusDto.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/HistoryStatusDto.java new file mode 100644 index 000000000..463c62605 --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/HistoryStatusDto.java @@ -0,0 +1,17 @@ +package com.linkedin.openhouse.optimizer.model; + +/** + * Internal lifecycle outcomes for a completed operation. Mirrors the values written to {@code + * table_operations_history.status}; parsed at the boundary so callers switch on a typed value + * instead of comparing strings. + * + *

Intentionally separate from the wire-API and DB representations. + */ +public enum HistoryStatusDto { + + /** The operation completed successfully. */ + SUCCESS, + + /** The operation failed. */ + FAILED +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/OperationStatusDto.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/OperationStatusDto.java new file mode 100644 index 000000000..b766f7dbe --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/OperationStatusDto.java @@ -0,0 +1,23 @@ +package com.linkedin.openhouse.optimizer.model; + +/** + * Internal lifecycle states for an operation. The analyzer writes {@link #PENDING}; the scheduler + * transitions through {@link #SCHEDULING} and {@link #SCHEDULED}. {@link #CANCELED} marks + * deduplicated PENDING rows. + * + *

Intentionally separate from the wire-API and DB representations. + */ +public enum OperationStatusDto { + + /** Analyzer has written the row; not yet claimed by the scheduler. */ + PENDING, + + /** Scheduler has claimed the row and is launching a job; jobId not yet recorded. */ + SCHEDULING, + + /** Job has been submitted to the Jobs Service; the row carries a {@code jobId}. */ + SCHEDULED, + + /** Scheduler marked this row as a duplicate of another PENDING row; not claimable. */ + CANCELED +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/OperationTypeDto.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/OperationTypeDto.java new file mode 100644 index 000000000..39b299806 --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/OperationTypeDto.java @@ -0,0 +1,12 @@ +package com.linkedin.openhouse.optimizer.model; + +/** + * Internal enum for the operation types the analyzer and scheduler know about. Intentionally + * separate from the wire-API and DB representations so the internal model can evolve its set of + * supported operations without churning either boundary. + */ +public enum OperationTypeDto { + + /** Removes orphaned data files no longer referenced by table metadata. */ + ORPHAN_FILES_DELETION +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableDto.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableDto.java new file mode 100644 index 000000000..408bc4fc7 --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableDto.java @@ -0,0 +1,42 @@ +package com.linkedin.openhouse.optimizer.model; + +import java.time.Instant; +import java.util.Collections; +import java.util.Map; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * An OpenHouse table enriched with stats and properties, built by combining data sources. Consumed + * by the analyzer (decides whether to produce a {@link TableOperationDto}) and the scheduler (reads + * stats for bin-packing). + * + *

Pure internal-model type — no references to wire-API or DB types. Construct via {@link + * com.linkedin.openhouse.optimizer.model.mapper.ModelDbMapper#toTable} at the DB boundary. + */ +@Data +@Builder(toBuilder = true) +@NoArgsConstructor +@AllArgsConstructor +public class TableDto { + + /** Stable table identity from the Tables Service. Survives renames; rotates on drop+recreate. */ + private String tableUuid; + + /** Database the table lives in. */ + private String databaseName; + + /** Iceberg table identifier (table name, not UUID). */ + private String tableId; + + /** Current table-property map (e.g. maintenance opt-in flags). Never null. */ + @Builder.Default private Map tableProperties = Collections.emptyMap(); + + /** Latest snapshot stats for this table. Delta is null when read from the current-state row. */ + private TableStatsDto stats; + + /** When the current snapshot was last written. Stamped server-side on every upsert. */ + private Instant updatedAt; +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableOperationDto.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableOperationDto.java new file mode 100644 index 000000000..4cac14187 --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableOperationDto.java @@ -0,0 +1,71 @@ +package com.linkedin.openhouse.optimizer.model; + +import java.time.Instant; +import java.util.Comparator; +import java.util.UUID; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * An operation the analyzer has decided to schedule for a table, and that the scheduler later picks + * up and submits. + * + *

Pure internal-model type — no references to wire-API or DB types. Cross-layer construction + * happens via {@link com.linkedin.openhouse.optimizer.model.mapper.ModelDbMapper} (DB boundary) or + * {@link com.linkedin.openhouse.optimizer.model.mapper.ApiModelMapper} (API boundary). + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class TableOperationDto { + + /** Unique operation ID (UUID). */ + private String id; + + /** The table this operation targets. */ + private String tableUuid; + + /** Database name. */ + private String databaseName; + + /** Table name. */ + private String tableName; + + /** Operation type. */ + private OperationTypeDto operationType; + + /** Current lifecycle status. */ + private OperationStatusDto status; + + /** When this operation record was created. */ + private Instant createdAt; + + /** When the scheduler last submitted a job for this operation. */ + private Instant scheduledAt; + + /** Job ID returned by the Jobs Service after the scheduler submitted; null until SCHEDULED. */ + private String jobId; + + /** Create a new PENDING operation for the given table and operation type. */ + public static TableOperationDto pending(TableDto table, OperationTypeDto operationType) { + return TableOperationDto.builder() + .id(UUID.randomUUID().toString()) + .tableUuid(table.getTableUuid()) + .databaseName(table.getDatabaseName()) + .tableName(table.getTableId()) + .operationType(operationType) + .status(OperationStatusDto.PENDING) + .createdAt(Instant.now()) + .build(); + } + + /** Return the more recently created of two operations. */ + public static TableOperationDto mostRecent(TableOperationDto a, TableOperationDto b) { + Comparator byCreatedAt = + Comparator.comparing(r -> r.getCreatedAt() != null ? r.getCreatedAt() : Instant.EPOCH); + return byCreatedAt.compare(a, b) >= 0 ? a : b; + } +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableOperationsHistoryDto.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableOperationsHistoryDto.java new file mode 100644 index 000000000..e05bb641e --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableOperationsHistoryDto.java @@ -0,0 +1,41 @@ +package com.linkedin.openhouse.optimizer.model; + +import java.time.Instant; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Internal-model view of a completed operation history record. + * + *

Mirrors the field set of the underlying history row but in internal types only. Used by + * components that need to reason about completed operations (e.g., scheduling-cadence analyzers). + */ +@Data +@Builder(toBuilder = true) +@NoArgsConstructor +@AllArgsConstructor +public class TableOperationsHistoryDto { + + /** Same UUID as the originating live-operations row. */ + private String id; + + /** Stable table identity from the Tables Service. */ + private String tableUuid; + + /** Denormalized database name. */ + private String databaseName; + + /** Denormalized table name. */ + private String tableName; + + /** Operation type for this completed run. */ + private OperationTypeDto operationType; + + /** When the operation completed, as recorded by the complete endpoint. */ + private Instant completedAt; + + /** Terminal outcome: {@link HistoryStatusDto#SUCCESS} or {@link HistoryStatusDto#FAILED}. */ + private HistoryStatusDto status; +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableStatsDto.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableStatsDto.java new file mode 100644 index 000000000..d142dcc8b --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableStatsDto.java @@ -0,0 +1,91 @@ +package com.linkedin.openhouse.optimizer.model; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import java.time.Instant; +import java.util.Collections; +import java.util.Map; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Self-describing per-table stats record. Carries the table's identity and metadata alongside the + * snapshot + delta payload so consumers don't need an outer wrapper to know which table the stats + * belong to. + * + *

Identity ({@link #tableUuid}, {@link #databaseName}, {@link #tableName}) and metadata ({@link + * #tableProperties}, {@link #updatedAt}) are populated when read from a current-state row. When + * this record is built from a per-commit history row, {@link #delta} is populated and {@link + * #tableProperties} / {@link #updatedAt} are typically {@code null}. + */ +@Data +@Builder(toBuilder = true) +@NoArgsConstructor +@AllArgsConstructor +@JsonIgnoreProperties(ignoreUnknown = true) +public class TableStatsDto { + + /** Stable table identity from the Tables Service. Survives renames; rotates on drop+recreate. */ + private String tableUuid; + + /** Database the table lives in. */ + private String databaseName; + + /** Iceberg table name (the human-readable identifier, not the UUID). */ + private String tableName; + + /** Current table-property map (e.g. maintenance opt-in flags). Never null. */ + @Builder.Default private Map tableProperties = Collections.emptyMap(); + + /** Snapshot fields — overwritten on every upsert. */ + private SnapshotMetrics snapshot; + + /** Delta fields — accumulated across commit events. Null when read from a current-state row. */ + private CommitDelta delta; + + /** When the current snapshot was last written. Stamped server-side on every upsert. */ + private Instant updatedAt; + + /** Point-in-time metadata read from Iceberg at scan time. */ + @Data + @Builder(toBuilder = true) + @NoArgsConstructor + @AllArgsConstructor + @JsonIgnoreProperties(ignoreUnknown = true) + public static class SnapshotMetrics { + + /** Iceberg metadata version pointer for this snapshot. */ + private String tableVersion; + + /** Filesystem path (or URI) of the table's storage root. */ + private String tableLocation; + + /** Total on-disk size of the table at this snapshot, in bytes. */ + private Long tableSizeBytes; + + /** Total number of data files as of the latest snapshot — used for bin-packing. */ + private Long numCurrentFiles; + } + + /** Per-commit incremental counters; accumulated across all recorded commit events. */ + @Data + @Builder(toBuilder = true) + @NoArgsConstructor + @AllArgsConstructor + @JsonIgnoreProperties(ignoreUnknown = true) + public static class CommitDelta { + + /** Number of data files this commit added to the table. */ + private Long numFilesAdded; + + /** Number of data files this commit removed from the table. */ + private Long numFilesDeleted; + + /** Total bytes added by this commit. */ + private Long addedSizeBytes; + + /** Total bytes removed by this commit. */ + private Long deletedSizeBytes; + } +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableStatsHistoryDto.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableStatsHistoryDto.java new file mode 100644 index 000000000..5579c95ed --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/model/TableStatsHistoryDto.java @@ -0,0 +1,40 @@ +package com.linkedin.openhouse.optimizer.model; + +import java.time.Instant; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Internal-model view of an append-only per-commit stats history record. + * + *

One per Iceberg commit. {@link #stats} carries both the snapshot at commit time and the commit + * delta — consumers can reconstruct change rates over arbitrary time windows. + * + *

Pure internal-model type — no references to wire-API or DB types. + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class TableStatsHistoryDto { + + /** UUID primary key — set by the caller, not generated server-side. */ + private String id; + + /** Stable table identity from the Tables Service. */ + private String tableUuid; + + /** Denormalized database name for display. */ + private String databaseName; + + /** Denormalized table name for display. */ + private String tableName; + + /** Snapshot + delta for this commit event. */ + private TableStatsDto stats; + + /** When this history row was recorded. */ + private Instant recordedAt; +} diff --git a/settings.gradle b/settings.gradle index 035e54349..cad06785e 100644 --- a/settings.gradle +++ b/settings.gradle @@ -49,6 +49,7 @@ include ':libs:datalayout' include ':services:common' include ':services:housetables' include ':services:jobs' +include ':services:optimizer' include ':services:tables' include ':tables-test-fixtures:tables-test-fixtures-iceberg-1.2' include ':tables-test-fixtures:tables-test-fixtures-iceberg-1.5'