diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableOperationsController.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableOperationsController.java new file mode 100644 index 000000000..2ee40802f --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableOperationsController.java @@ -0,0 +1,125 @@ +package com.linkedin.openhouse.optimizer.api.controller; + +import com.linkedin.openhouse.optimizer.api.spec.OperationStatus; +import com.linkedin.openhouse.optimizer.api.spec.OperationType; +import com.linkedin.openhouse.optimizer.api.spec.TableOperations; +import com.linkedin.openhouse.optimizer.api.spec.TableOperationsHistory; +import com.linkedin.openhouse.optimizer.api.spec.UpdateOperationRequest; +import com.linkedin.openhouse.optimizer.service.OptimizerDataService; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import io.swagger.v3.oas.annotations.responses.ApiResponses; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; +import lombok.RequiredArgsConstructor; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.util.StringUtils; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.server.ResponseStatusException; + +/** REST controller for {@code table_operations}. */ +@RestController +@RequestMapping("/v1/optimizer/operations") +@RequiredArgsConstructor +public class TableOperationsController { + + private final OptimizerDataService service; + + /** + * Report an update to an operation. {@code id} comes from the URL; the body's {@code operationId} + * must match (the controller rejects mismatched requests with 400). The backend looks up the + * operation row, writes a history entry with the operation's table metadata, and returns 201 + * Created with the history row, or 404 if the operation does not exist. + */ + @ApiResponses( + value = { + @ApiResponse(responseCode = "201", description = "Operation UPDATE: CREATED"), + @ApiResponse(responseCode = "400", description = "Operation UPDATE: BAD_REQUEST"), + @ApiResponse(responseCode = "404", description = "Operation UPDATE: NOT_FOUND") + }) + @PostMapping("/{id}/update") + public ResponseEntity updateOperation( + @PathVariable String id, @RequestBody UpdateOperationRequest request) { + if (!StringUtils.hasText(request.getOperationId())) { + throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "operationId is required"); + } + if (!Objects.equals(id, request.getOperationId())) { + throw new ResponseStatusException( + HttpStatus.BAD_REQUEST, + String.format( + "operationId in body (%s) does not match path id (%s)", + request.getOperationId(), id)); + } + if (request.getStatus() == null) { + throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "status is required"); + } + return service + .updateOperation(id, request.getStatus().toModel()) + .map( + history -> + ResponseEntity.status(HttpStatus.CREATED) + .body(TableOperationsHistory.fromModel(history))) + .orElseThrow( + () -> + new ResponseStatusException( + HttpStatus.NOT_FOUND, String.format("no operation with id %s", id))); + } + + /** Fetch a single operation row by its ID, regardless of status. Returns 404 if not found. */ + @ApiResponses( + value = { + @ApiResponse(responseCode = "200", description = "Operation GET: OK"), + @ApiResponse(responseCode = "404", description = "Operation GET: NOT_FOUND") + }) + @GetMapping("/{id}") + public ResponseEntity getTableOperation(@PathVariable String id) { + return service + .getTableOperation(id) + .map(TableOperations::fromModel) + .map(ResponseEntity::ok) + .orElseThrow( + () -> + new ResponseStatusException( + HttpStatus.NOT_FOUND, String.format("no operation with id %s", id))); + } + + /** + * List operations matching the given filters, capped at {@code limit} rows. Every filter is + * optional; {@code limit} is required so callers always state how much they want back. + */ + @ApiResponses( + value = { + @ApiResponse(responseCode = "200", description = "Operation SEARCH: OK"), + @ApiResponse(responseCode = "400", description = "Operation SEARCH: BAD_REQUEST") + }) + @GetMapping + public ResponseEntity> listTableOperations( + @RequestParam(required = false) OperationType operationType, + @RequestParam(required = false) OperationStatus status, + @RequestParam(required = false) String databaseName, + @RequestParam(required = false) String tableName, + @RequestParam(required = false) String tableUuid, + @RequestParam int limit) { + List result = + service + .listTableOperations( + Optional.ofNullable(operationType).map(OperationType::toModel), + Optional.ofNullable(status).map(OperationStatus::toModel), + Optional.ofNullable(databaseName), + Optional.ofNullable(tableName), + Optional.ofNullable(tableUuid), + limit) + .stream() + .map(TableOperations::fromModel) + .collect(Collectors.toList()); + return ResponseEntity.ok(result); + } +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableOperationsHistoryController.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableOperationsHistoryController.java new file mode 100644 index 000000000..873d51d2e --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableOperationsHistoryController.java @@ -0,0 +1,58 @@ +package com.linkedin.openhouse.optimizer.api.controller; + +import com.linkedin.openhouse.optimizer.api.spec.TableOperationsHistory; +import com.linkedin.openhouse.optimizer.service.OptimizerDataService; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import io.swagger.v3.oas.annotations.responses.ApiResponses; +import java.util.List; +import java.util.stream.Collectors; +import lombok.RequiredArgsConstructor; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +/** REST controller for {@code table_operations_history}. */ +@RestController +@RequestMapping("/v1/optimizer/operations-history") +@RequiredArgsConstructor +public class TableOperationsHistoryController { + + private final OptimizerDataService service; + + /** Append a completed-job result. Called by the SparkJob after each run (success or failure). */ + @ApiResponses( + value = { + @ApiResponse(responseCode = "201", description = "OperationsHistory CREATE: CREATED") + }) + @PostMapping + public ResponseEntity appendHistory( + @RequestBody TableOperationsHistory dto) { + return ResponseEntity.status(HttpStatus.CREATED) + .body(TableOperationsHistory.fromModel(service.appendHistory(dto.toModel()))); + } + + /** + * Return the most recent history for a table, newest first, capped at {@code limit} rows. {@code + * limit} is required. + */ + @ApiResponses( + value = { + @ApiResponse(responseCode = "200", description = "OperationsHistory GET: OK"), + @ApiResponse(responseCode = "400", description = "OperationsHistory GET: BAD_REQUEST") + }) + @GetMapping("/{tableUuid}") + public ResponseEntity> getHistory( + @PathVariable String tableUuid, @RequestParam int limit) { + List result = + service.getHistory(tableUuid, limit).stream() + .map(TableOperationsHistory::fromModel) + .collect(Collectors.toList()); + return ResponseEntity.ok(result); + } +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableStatsController.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableStatsController.java new file mode 100644 index 000000000..b119dd1c7 --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/api/controller/TableStatsController.java @@ -0,0 +1,111 @@ +package com.linkedin.openhouse.optimizer.api.controller; + +import com.linkedin.openhouse.optimizer.api.spec.TableStats; +import com.linkedin.openhouse.optimizer.api.spec.TableStatsHistory; +import com.linkedin.openhouse.optimizer.api.spec.UpsertTableStatsRequest; +import com.linkedin.openhouse.optimizer.service.OptimizerDataService; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import io.swagger.v3.oas.annotations.responses.ApiResponses; +import java.time.Instant; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import lombok.RequiredArgsConstructor; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PutMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.server.ResponseStatusException; + +/** REST controller for managing per-table stats in the optimizer DB. */ +@RestController +@RequestMapping("/v1/optimizer/stats") +@RequiredArgsConstructor +public class TableStatsController { + + private final OptimizerDataService service; + + /** + * Create or overwrite the stats row for {@code tableUuid}. Called by the Tables Service on every + * Iceberg commit. Idempotent. + */ + @ApiResponses(value = {@ApiResponse(responseCode = "200", description = "Stats PUT: OK")}) + @PutMapping("/{tableUuid}") + public ResponseEntity upsertTableStats( + @PathVariable String tableUuid, @RequestBody UpsertTableStatsRequest request) { + return ResponseEntity.ok( + TableStats.fromModel(service.upsertTableStats(request.toModel(tableUuid)))); + } + + /** Fetch the stats row for {@code tableUuid}. Returns 404 if no stats have been written yet. */ + @ApiResponses( + value = { + @ApiResponse(responseCode = "200", description = "Stats GET: OK"), + @ApiResponse(responseCode = "404", description = "Stats GET: NOT_FOUND") + }) + @GetMapping("/{tableUuid}") + public ResponseEntity getTableStats(@PathVariable String tableUuid) { + return service + .getTableStats(tableUuid) + .map(TableStats::fromModel) + .map(ResponseEntity::ok) + .orElseThrow( + () -> + new ResponseStatusException( + HttpStatus.NOT_FOUND, String.format("no stats for tableUuid %s", tableUuid))); + } + + /** + * List stats rows matching the given filters, capped at {@code limit} rows. Every filter is + * optional; {@code limit} is required so callers always state how much they want back. + */ + @ApiResponses( + value = { + @ApiResponse(responseCode = "200", description = "Stats SEARCH: OK"), + @ApiResponse(responseCode = "400", description = "Stats SEARCH: BAD_REQUEST") + }) + @GetMapping + public ResponseEntity> listTableStats( + @RequestParam(required = false) String databaseName, + @RequestParam(required = false) String tableName, + @RequestParam(required = false) String tableUuid, + @RequestParam int limit) { + List result = + service + .listTableStats( + Optional.ofNullable(databaseName), + Optional.ofNullable(tableName), + Optional.ofNullable(tableUuid), + limit) + .stream() + .map(TableStats::fromModel) + .collect(Collectors.toList()); + return ResponseEntity.ok(result); + } + + /** + * Return per-commit stats history for {@code tableUuid}, newest first, capped at {@code limit} + * rows. Optional {@code since} filter (inclusive). {@code limit} is required. + */ + @ApiResponses( + value = { + @ApiResponse(responseCode = "200", description = "StatsHistory GET: OK"), + @ApiResponse(responseCode = "400", description = "StatsHistory GET: BAD_REQUEST") + }) + @GetMapping("/{tableUuid}/history") + public ResponseEntity> getStatsHistory( + @PathVariable String tableUuid, + @RequestParam(required = false) Instant since, + @RequestParam int limit) { + List result = + service.getStatsHistory(tableUuid, Optional.ofNullable(since), limit).stream() + .map(TableStatsHistory::fromModel) + .collect(Collectors.toList()); + return ResponseEntity.ok(result); + } +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/service/OptimizerDataService.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/service/OptimizerDataService.java new file mode 100644 index 000000000..c20ae7bf2 --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/service/OptimizerDataService.java @@ -0,0 +1,94 @@ +package com.linkedin.openhouse.optimizer.service; + +import com.linkedin.openhouse.optimizer.model.HistoryStatusDto; +import com.linkedin.openhouse.optimizer.model.OperationStatusDto; +import com.linkedin.openhouse.optimizer.model.OperationTypeDto; +import com.linkedin.openhouse.optimizer.model.TableOperationDto; +import com.linkedin.openhouse.optimizer.model.TableOperationsHistoryDto; +import com.linkedin.openhouse.optimizer.model.TableStatsDto; +import com.linkedin.openhouse.optimizer.model.TableStatsHistoryDto; +import java.time.Instant; +import java.util.List; +import java.util.Optional; + +/** + * Service interface for optimizer data operations. + * + *

The service is the boundary between the wire-API surface and the database. Inputs and outputs + * are internal-model types only — callers (controllers, future CLI, in-process consumers) + * convert at their own edge. No api/-package types appear here. + */ +public interface OptimizerDataService { + + // --- TableOperations --- + + /** + * List operations matching the given filters, capped at {@code limit} rows. Every filter + * parameter is optional — pass {@link Optional#empty()} to skip that filter. + */ + List listTableOperations( + Optional operationType, + Optional status, + Optional databaseName, + Optional tableName, + Optional tableUuid, + int limit); + + /** + * Update an operation by writing a history entry. Looks up the operation row by {@code + * operationId}, copies its table metadata into a new history row with the supplied terminal + * {@code status}, and saves it. Returns the history record, or empty if the operation does not + * exist. + */ + Optional updateOperation(String operationId, HistoryStatusDto status); + + /** + * Return the operation row for {@code id} regardless of status, or empty if it does not exist. + * Used to poll a specific operation (e.g. waiting for SUCCESS after a Spark job completes). + */ + Optional getTableOperation(String id); + + // --- TableStatsDto --- + + /** + * Create or update the stats row for {@code stats.getTableUuid()}. Fully idempotent: the same + * call overwrites the previous snapshot with the latest commit values. The service stamps {@link + * TableStatsDto#getUpdatedAt()} server-side and returns the resulting {@link TableStatsDto}. + */ + TableStatsDto upsertTableStats(TableStatsDto stats); + + /** Return the stats row for {@code tableUuid}, or empty if none exists. */ + Optional getTableStats(String tableUuid); + + /** + * List stats rows matching the given filters, capped at {@code limit} rows. Every filter + * parameter is optional — pass {@link Optional#empty()} to skip that filter. + */ + List listTableStats( + Optional databaseName, + Optional tableName, + Optional tableUuid, + int limit); + + /** + * Return per-commit stats history for {@code tableUuid}, newest first. + * + * @param tableUuid the stable table UUID + * @param since if present, only return rows recorded at or after this instant + * @param limit maximum number of rows to return + */ + List getStatsHistory(String tableUuid, Optional since, int limit); + + // --- TableOperationsHistoryDto --- + + /** Append a completed-job result record. */ + TableOperationsHistoryDto appendHistory(TableOperationsHistoryDto history); + + /** + * Return the most recent history rows for a table UUID, newest first. + * + * @param tableUuid the stable table UUID + * @param limit maximum number of rows to return + */ + List getHistory(String tableUuid, int limit); +} diff --git a/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/service/OptimizerDataServiceImpl.java b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/service/OptimizerDataServiceImpl.java new file mode 100644 index 000000000..29fd0eeee --- /dev/null +++ b/services/optimizer/src/main/java/com/linkedin/openhouse/optimizer/service/OptimizerDataServiceImpl.java @@ -0,0 +1,175 @@ +package com.linkedin.openhouse.optimizer.service; + +import com.linkedin.openhouse.optimizer.db.TableStatsHistoryRow; +import com.linkedin.openhouse.optimizer.db.TableStatsRow; +import com.linkedin.openhouse.optimizer.model.HistoryStatusDto; +import com.linkedin.openhouse.optimizer.model.OperationStatusDto; +import com.linkedin.openhouse.optimizer.model.OperationTypeDto; +import com.linkedin.openhouse.optimizer.model.TableOperationDto; +import com.linkedin.openhouse.optimizer.model.TableOperationsHistoryDto; +import com.linkedin.openhouse.optimizer.model.TableStatsDto; +import com.linkedin.openhouse.optimizer.model.TableStatsHistoryDto; +import com.linkedin.openhouse.optimizer.repository.TableOperationsHistoryRepository; +import com.linkedin.openhouse.optimizer.repository.TableOperationsRepository; +import com.linkedin.openhouse.optimizer.repository.TableStatsHistoryRepository; +import com.linkedin.openhouse.optimizer.repository.TableStatsRepository; +import java.time.Instant; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import java.util.stream.Collectors; +import lombok.RequiredArgsConstructor; +import org.springframework.data.domain.PageRequest; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +/** + * Implementation of {@link OptimizerDataService}. + * + *

Operates purely on model/ and db/ types. Conversion happens via the {@code toRow()} / {@code + * fromRow(...)} methods on the model types themselves — no injected mapper. No api/-package types + * appear in this class. + */ +@Service +@RequiredArgsConstructor +public class OptimizerDataServiceImpl implements OptimizerDataService { + + private final TableOperationsRepository operationsRepository; + private final TableOperationsHistoryRepository historyRepository; + private final TableStatsRepository statsRepository; + private final TableStatsHistoryRepository statsHistoryRepository; + + // --- TableOperations --- + + @Override + public List listTableOperations( + Optional operationType, + Optional status, + Optional databaseName, + Optional tableName, + Optional tableUuid, + int limit) { + return operationsRepository + .find( + operationType.map(OperationTypeDto::toDb), + status.map(OperationStatusDto::toDb), + tableUuid, + databaseName, + tableName, + Optional.empty(), + Optional.empty(), + PageRequest.of(0, limit)) + .stream() + .map(TableOperationDto::fromRow) + .collect(Collectors.toList()); + } + + @Override + @Transactional + public Optional updateOperation( + String operationId, HistoryStatusDto status) { + return operationsRepository + .findById(operationId) + .map( + row -> + TableOperationsHistoryDto.builder() + .id(row.getId()) + .tableUuid(row.getTableUuid()) + .databaseName(row.getDatabaseName()) + .tableName(row.getTableName()) + .operationType(OperationTypeDto.fromDb(row.getOperationType())) + .completedAt(Instant.now()) + .status(status) + .build()) + .map(history -> TableOperationsHistoryDto.fromRow(historyRepository.save(history.toRow()))); + } + + @Override + public Optional getTableOperation(String id) { + return operationsRepository.findById(id).map(TableOperationDto::fromRow); + } + + // --- TableStatsDto --- + + @Override + @Transactional + public TableStatsDto upsertTableStats(TableStatsDto stats) { + Instant now = Instant.now(); + String tableUuid = stats.getTableUuid(); + + TableStatsRow row = + statsRepository + .findById(tableUuid) + .map( + existing -> + existing + .toBuilder() + .databaseName(stats.getDatabaseName()) + .tableName(stats.getTableName()) + .snapshot(stats.toSnapshotRow()) + .tableProperties(stats.getTableProperties()) + .updatedAt(now) + .build()) + .orElse(stats.toBuilder().updatedAt(now).build().toRow()); + TableStatsRow saved = statsRepository.save(row); + + statsHistoryRepository.save( + TableStatsHistoryRow.builder() + .id(UUID.randomUUID().toString()) + .tableUuid(tableUuid) + .databaseName(stats.getDatabaseName()) + .tableName(stats.getTableName()) + .snapshot(stats.toSnapshotRow()) + .delta(stats.toDeltaRow()) + .recordedAt(now) + .build()); + + return TableStatsDto.fromRow(saved); + } + + @Override + public Optional getTableStats(String tableUuid) { + return statsRepository.findById(tableUuid).map(TableStatsDto::fromRow); + } + + @Override + public List listTableStats( + Optional databaseName, + Optional tableName, + Optional tableUuid, + int limit) { + return statsRepository.find(databaseName, tableName, tableUuid, PageRequest.of(0, limit)) + .stream() + .map(TableStatsDto::fromRow) + .collect(Collectors.toList()); + } + + @Override + public List getStatsHistory( + String tableUuid, Optional since, int limit) { + return statsHistoryRepository.find(tableUuid, since, PageRequest.of(0, limit)).stream() + .map(TableStatsHistoryDto::fromRow) + .collect(Collectors.toList()); + } + + // --- TableOperationsHistoryDto --- + + @Override + @Transactional + public TableOperationsHistoryDto appendHistory(TableOperationsHistoryDto history) { + TableOperationsHistoryDto toWrite = + history + .toBuilder() + .completedAt( + history.getCompletedAt() != null ? history.getCompletedAt() : Instant.now()) + .build(); + return TableOperationsHistoryDto.fromRow(historyRepository.save(toWrite.toRow())); + } + + @Override + public List getHistory(String tableUuid, int limit) { + return historyRepository.find(tableUuid, PageRequest.of(0, limit)).stream() + .map(TableOperationsHistoryDto::fromRow) + .collect(Collectors.toList()); + } +} diff --git a/services/optimizer/src/main/resources/application.properties b/services/optimizer/src/main/resources/application.properties new file mode 100644 index 000000000..e7f082b47 --- /dev/null +++ b/services/optimizer/src/main/resources/application.properties @@ -0,0 +1,25 @@ +spring.application.name=openhouse-optimizer-service +server.port=8080 + +spring.jpa.hibernate.ddl-auto=none +spring.sql.init.mode=always +spring.jpa.defer-datasource-initialization=true +spring.sql.init.schema-locations=classpath:db/optimizer-schema.sql + +spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.MySQL8Dialect +spring.jpa.properties.hibernate.show_sql=false +spring.jpa.properties.hibernate.physical_naming_strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl + +spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver +spring.datasource.url=${OPTIMIZER_DB_URL:jdbc:mysql://localhost:3306/oh_db} +spring.datasource.username=${OPTIMIZER_DB_USER:oh_user} +spring.datasource.password=${OPTIMIZER_DB_PASSWORD:oh_password} +spring.datasource.hikari.maximum-pool-size=20 + +management.endpoints.web.exposure.include=health,prometheus +management.endpoint.health.enabled=true + +# Include ResponseStatusException.reason in the default error response body. Without this, Spring +# Boot 2.7 omits the `message` field, and the human-readable detail from a thrown +# ResponseStatusException never reaches the caller. +server.error.include-message=always diff --git a/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/api/controller/ControllerErrorHandlingTest.java b/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/api/controller/ControllerErrorHandlingTest.java new file mode 100644 index 000000000..b9c8dc3dc --- /dev/null +++ b/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/api/controller/ControllerErrorHandlingTest.java @@ -0,0 +1,124 @@ +package com.linkedin.openhouse.optimizer.api.controller; + +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + +import com.linkedin.openhouse.optimizer.db.OperationType; +import com.linkedin.openhouse.optimizer.db.TableOperationsRow; +import com.linkedin.openhouse.optimizer.repository.TableOperationsRepository; +import java.time.Instant; +import java.util.UUID; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.http.MediaType; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.web.servlet.MockMvc; +import org.springframework.transaction.annotation.Transactional; + +/** + * Exercises what the controllers own: server-side validation on {@code updateOperation} (path/body + * mismatch, missing fields) and 404s on missing rows. Assertions are status-code-only: MockMvc does + * not trigger Spring's error-dispatch to {@code BasicErrorController}, so the response body of a + * {@link org.springframework.web.server.ResponseStatusException} is empty in tests even though it + * is populated in production (with {@code server.error.include-message=always}). Framework-level + * 4xx (missing query param, malformed JSON, etc.) is left to Spring's defaults and not asserted. + */ +@SpringBootTest +@AutoConfigureMockMvc +@ActiveProfiles("test") +@Transactional +class ControllerErrorHandlingTest { + + @Autowired MockMvc mockMvc; + @Autowired TableOperationsRepository operationsRepository; + + @Test + void updateOperation_notFound_returns404() throws Exception { + String id = UUID.randomUUID().toString(); + String body = String.format("{\"operationId\":\"%s\",\"status\":\"SUCCESS\"}", id); + mockMvc + .perform( + post("/v1/optimizer/operations/" + id + "/update") + .contentType(MediaType.APPLICATION_JSON) + .content(body)) + .andExpect(status().isNotFound()); + } + + @Test + void updateOperation_pathBodyMismatch_returns400() throws Exception { + String pathId = UUID.randomUUID().toString(); + String bodyId = UUID.randomUUID().toString(); + String body = String.format("{\"operationId\":\"%s\",\"status\":\"SUCCESS\"}", bodyId); + mockMvc + .perform( + post("/v1/optimizer/operations/" + pathId + "/update") + .contentType(MediaType.APPLICATION_JSON) + .content(body)) + .andExpect(status().isBadRequest()); + } + + @Test + void updateOperation_missingOperationId_returns400() throws Exception { + String pathId = UUID.randomUUID().toString(); + String body = "{\"status\":\"SUCCESS\"}"; + mockMvc + .perform( + post("/v1/optimizer/operations/" + pathId + "/update") + .contentType(MediaType.APPLICATION_JSON) + .content(body)) + .andExpect(status().isBadRequest()); + } + + @Test + void updateOperation_missingStatus_returns400() throws Exception { + String id = UUID.randomUUID().toString(); + String body = String.format("{\"operationId\":\"%s\"}", id); + mockMvc + .perform( + post("/v1/optimizer/operations/" + id + "/update") + .contentType(MediaType.APPLICATION_JSON) + .content(body)) + .andExpect(status().isBadRequest()); + } + + @Test + void getTableOperation_notFound_returns404() throws Exception { + String id = UUID.randomUUID().toString(); + mockMvc.perform(get("/v1/optimizer/operations/" + id)).andExpect(status().isNotFound()); + } + + @Test + void getTableStats_notFound_returns404() throws Exception { + String uuid = UUID.randomUUID().toString(); + mockMvc.perform(get("/v1/optimizer/stats/" + uuid)).andExpect(status().isNotFound()); + } + + @Test + void updateOperation_happyPath_returns201() throws Exception { + String id = UUID.randomUUID().toString(); + operationsRepository.save( + TableOperationsRow.builder() + .id(id) + .tableUuid(UUID.randomUUID().toString()) + .databaseName("db1") + .tableName("tbl1") + .operationType(OperationType.ORPHAN_FILES_DELETION) + .status(com.linkedin.openhouse.optimizer.db.OperationStatus.SCHEDULED) + .createdAt(Instant.now()) + .scheduledAt(Instant.now()) + .jobId("job-x") + .build()); + String body = String.format("{\"operationId\":\"%s\",\"status\":\"SUCCESS\"}", id); + mockMvc + .perform( + post("/v1/optimizer/operations/" + id + "/update") + .contentType(MediaType.APPLICATION_JSON) + .content(body)) + .andExpect(status().isCreated()) + .andExpect(jsonPath("$.status").value("SUCCESS")); + } +} diff --git a/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/service/OptimizerDataServiceImplTest.java b/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/service/OptimizerDataServiceImplTest.java new file mode 100644 index 000000000..2a3c1e676 --- /dev/null +++ b/services/optimizer/src/test/java/com/linkedin/openhouse/optimizer/service/OptimizerDataServiceImplTest.java @@ -0,0 +1,173 @@ +package com.linkedin.openhouse.optimizer.service; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.linkedin.openhouse.optimizer.db.TableOperationsRow; +import com.linkedin.openhouse.optimizer.db.TableStatsHistoryRow; +import com.linkedin.openhouse.optimizer.model.HistoryStatusDto; +import com.linkedin.openhouse.optimizer.model.OperationStatusDto; +import com.linkedin.openhouse.optimizer.model.OperationTypeDto; +import com.linkedin.openhouse.optimizer.model.TableOperationsHistoryDto; +import com.linkedin.openhouse.optimizer.model.TableStatsDto; +import com.linkedin.openhouse.optimizer.repository.TableOperationsRepository; +import com.linkedin.openhouse.optimizer.repository.TableStatsHistoryRepository; +import com.linkedin.openhouse.optimizer.repository.TableStatsRepository; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.data.domain.PageRequest; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.transaction.annotation.Transactional; + +@SpringBootTest +@ActiveProfiles("test") +@Transactional +class OptimizerDataServiceImplTest { + + @Autowired OptimizerDataService service; + @Autowired TableOperationsRepository operationsRepository; + @Autowired TableStatsRepository statsRepository; + @Autowired TableStatsHistoryRepository statsHistoryRepository; + + // --- updateOperation --- + + @Test + void completeOperation_writesHistoryFromOperationRow() { + String operationId = UUID.randomUUID().toString(); + String tableUuid = UUID.randomUUID().toString(); + operationsRepository.save( + TableOperationsRow.builder() + .id(operationId) + .tableUuid(tableUuid) + .databaseName("db1") + .tableName("tbl1") + .operationType(com.linkedin.openhouse.optimizer.db.OperationType.ORPHAN_FILES_DELETION) + .status(com.linkedin.openhouse.optimizer.db.OperationStatus.SCHEDULED) + .createdAt(Instant.now()) + .scheduledAt(Instant.now()) + .jobId("spark-job-123") + .build()); + + Optional result = + service.updateOperation(operationId, HistoryStatusDto.SUCCESS); + + assertThat(result).isPresent(); + assertThat(result.get().getStatus()).isEqualTo(HistoryStatusDto.SUCCESS); + assertThat(result.get().getTableUuid()).isEqualTo(tableUuid); + assertThat(result.get().getOperationType()).isEqualTo(OperationTypeDto.ORPHAN_FILES_DELETION); + assertThat(result.get().getDatabaseName()).isEqualTo("db1"); + assertThat(result.get().getCompletedAt()).isNotNull(); + } + + @Test + void completeOperation_notFound_returnsEmpty() { + Optional result = + service.updateOperation(UUID.randomUUID().toString(), HistoryStatusDto.FAILED); + + assertThat(result).isEmpty(); + } + + // --- upsertTableStats --- + + @Test + void upsertTableStats_createsNewRow() { + String tableUuid = UUID.randomUUID().toString(); + TableStatsDto input = + TableStatsDto.builder() + .tableUuid(tableUuid) + .databaseName("db1") + .tableName("tbl1") + .tableProperties(Map.of("maintenance.optimizer.ofd.enabled", "true")) + .snapshot(TableStatsDto.SnapshotMetrics.builder().tableSizeBytes(1024L).build()) + .build(); + + TableStatsDto result = service.upsertTableStats(input); + + assertThat(result.getTableUuid()).isEqualTo(tableUuid); + assertThat(result.getDatabaseName()).isEqualTo("db1"); + assertThat(result.getSnapshot().getTableSizeBytes()).isEqualTo(1024L); + assertThat(result.getTableProperties()) + .containsEntry("maintenance.optimizer.ofd.enabled", "true"); + assertThat(result.getUpdatedAt()).isNotNull(); + assertThat(statsRepository.findById(tableUuid)).isPresent(); + } + + @Test + void upsertTableStats_updatesExistingRow_andAppendsHistory() { + String tableUuid = UUID.randomUUID().toString(); + TableStatsDto first = + TableStatsDto.builder() + .tableUuid(tableUuid) + .databaseName("db1") + .tableName("tbl1") + .snapshot(TableStatsDto.SnapshotMetrics.builder().tableSizeBytes(100L).build()) + .delta( + TableStatsDto.CommitDelta.builder().numFilesAdded(5L).numFilesDeleted(1L).build()) + .build(); + TableStatsDto second = + TableStatsDto.builder() + .tableUuid(tableUuid) + .databaseName("db1") + .tableName("tbl1") + .snapshot(TableStatsDto.SnapshotMetrics.builder().tableSizeBytes(200L).build()) + .delta( + TableStatsDto.CommitDelta.builder().numFilesAdded(3L).numFilesDeleted(0L).build()) + .build(); + + service.upsertTableStats(first); + TableStatsDto result = service.upsertTableStats(second); + + assertThat(result.getSnapshot().getTableSizeBytes()).isEqualTo(200L); + assertThat(statsRepository.findAll()).hasSize(1); + + List history = + statsHistoryRepository.find(tableUuid, Optional.empty(), PageRequest.of(0, 100)); + assertThat(history).hasSize(2); + assertThat(history.get(0).getDelta().getNumFilesAdded()).isEqualTo(3L); + assertThat(history.get(1).getDelta().getNumFilesAdded()).isEqualTo(5L); + } + + // --- list filters touch the operations enum mapping path --- + + @Test + void listTableOperations_filtersByOperationTypeAndStatus() { + String pendingId = UUID.randomUUID().toString(); + String scheduledId = UUID.randomUUID().toString(); + operationsRepository.save( + TableOperationsRow.builder() + .id(pendingId) + .tableUuid(UUID.randomUUID().toString()) + .databaseName("db1") + .tableName("tbl1") + .operationType(com.linkedin.openhouse.optimizer.db.OperationType.ORPHAN_FILES_DELETION) + .status(com.linkedin.openhouse.optimizer.db.OperationStatus.PENDING) + .createdAt(Instant.now()) + .build()); + operationsRepository.save( + TableOperationsRow.builder() + .id(scheduledId) + .tableUuid(UUID.randomUUID().toString()) + .databaseName("db1") + .tableName("tbl2") + .operationType(com.linkedin.openhouse.optimizer.db.OperationType.ORPHAN_FILES_DELETION) + .status(com.linkedin.openhouse.optimizer.db.OperationStatus.SCHEDULED) + .createdAt(Instant.now()) + .build()); + + assertThat( + service.listTableOperations( + Optional.of(OperationTypeDto.ORPHAN_FILES_DELETION), + Optional.of(OperationStatusDto.PENDING), + Optional.empty(), + Optional.empty(), + Optional.empty(), + 100)) + .extracting(op -> op.getId()) + .containsExactly(pendingId); + } +}