diff --git a/docker-compose.yaml b/docker-compose.yaml index b8656e0..3ba3263 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -47,6 +47,9 @@ services: env_file: - .env environment: + PG_URL: jdbc:postgresql://postgres:5432/${PG_DB} + PG_USER: ${PG_USER} + PG_PASSWORD: ${PG_PASSWORD} MINIO_ENDPOINT: http://minio:9000 RABBITMQ_HOST: rabbitmq SERVICE_MODE: processing diff --git a/docker_compose.swarm.yaml b/docker_compose.swarm.yaml index f2688bd..3f34bea 100644 --- a/docker_compose.swarm.yaml +++ b/docker_compose.swarm.yaml @@ -181,6 +181,9 @@ services: processing-service: image: video-streaming-app:latest environment: + PG_URL: ${PG_URL} + PG_USER: ${PG_USER} + PG_PASSWORD: ${PG_PASSWORD} MINIO_ENDPOINT: ${MINIO_ENDPOINT} MINIO_ACCESS_KEY: ${MINIO_ACCESS_KEY} MINIO_SECRET_KEY: ${MINIO_SECRET_KEY} diff --git a/frontend/app.js b/frontend/app.js index 39cdb62..f2c35f5 100644 --- a/frontend/app.js +++ b/frontend/app.js @@ -11,6 +11,16 @@ const processingBar = document.getElementById("processingBar"); const processingPercent = document.getElementById("processingPercent"); const processingTrack = document.getElementById("processingTrack"); const processingBlock = document.getElementById("processingBlock"); +const transcodeBlock = document.getElementById("transcodeBlock"); +const transcodeLowBar = document.getElementById("transcodeLowBar"); +const transcodeLowPercent = document.getElementById("transcodeLowPercent"); +const transcodeLowTrack = document.getElementById("transcodeLowTrack"); +const transcodeMediumBar = document.getElementById("transcodeMediumBar"); +const transcodeMediumPercent = document.getElementById("transcodeMediumPercent"); +const transcodeMediumTrack = document.getElementById("transcodeMediumTrack"); +const transcodeHighBar = document.getElementById("transcodeHighBar"); +const transcodeHighPercent = document.getElementById("transcodeHighPercent"); +const transcodeHighTrack = document.getElementById("transcodeHighTrack"); const doneMessage = document.getElementById("doneMessage"); const reconnectBtn = document.getElementById("reconnectBtn"); const player = document.getElementById("player"); @@ -23,6 +33,7 @@ let ws = null; let currentVideoId = null; let totalSegments = null; let completedSegments = 0; +let sourceSegmentsComplete = false; let currentWsUrl = null; let processingComplete = false; let uploadInFlight = false; @@ -37,6 +48,11 @@ const RETRY_INTERVAL_MS = 1000; let retrySecondsLeft = 0; let hlsInstance = null; let selectedVideoId = null; +const transcodeProfiles = { + low: { done: 0, transcoding: 0, uploading: 0, failed: 0, segments: new Map() }, + medium: { done: 0, transcoding: 0, uploading: 0, failed: 0, segments: new Map() }, + high: { done: 0, transcoding: 0, uploading: 0, failed: 0, segments: new Map() } +}; function clearRetryTimers() { if (retryTimerId) { @@ -96,14 +112,83 @@ function scheduleRetry(reason) { function resetStateForNextUpload() { totalSegments = null; completedSegments = 0; + sourceSegmentsComplete = false; processingComplete = false; failureTerminal = false; retryInFlight = false; retrySecondsLeft = 0; + ["low", "medium", "high"].forEach((profile) => { + const state = transcodeProfiles[profile]; + state.done = 0; + state.transcoding = 0; + state.uploading = 0; + state.failed = 0; + state.segments = new Map(); + }); clearRetryTimers(); teardownPlayer(); } +function getTranscodeDom(profile) { + if (profile === "low") { + return { bar: transcodeLowBar, percent: transcodeLowPercent, track: transcodeLowTrack }; + } + if (profile === "medium") { + return { bar: transcodeMediumBar, percent: transcodeMediumPercent, track: transcodeMediumTrack }; + } + return { bar: transcodeHighBar, percent: transcodeHighPercent, track: transcodeHighTrack }; +} + +function recalcTranscodeCounters(profile) { + const state = transcodeProfiles[profile]; + let transcoding = 0; + let uploading = 0; + let failed = 0; + state.segments.forEach((status) => { + if (status === "TRANSCODING") transcoding += 1; + if (status === "UPLOADING") uploading += 1; + if (status === "FAILED") failed += 1; + }); + state.transcoding = transcoding; + state.uploading = uploading; + state.failed = failed; +} + +function updateTranscodeProfileUi(profile) { + const state = transcodeProfiles[profile]; + const dom = getTranscodeDom(profile); + if (!dom.bar || !dom.percent || !dom.track) { + return; + } + if (totalSegments) { + const percent = Math.min(100, Math.round((state.done / totalSegments) * 100)); + dom.bar.style.width = `${percent}%`; + dom.percent.textContent = `${percent}% (${state.done}/${totalSegments})`; + dom.track.classList.remove("indeterminate"); + } else { + dom.percent.textContent = `${state.done} done`; + } +} + +function allProfilesDone() { + if (!totalSegments) { + return false; + } + return ["low", "medium", "high"].every((profile) => transcodeProfiles[profile].done >= totalSegments); +} + +function tryFinalizeSuccess() { + if (processingComplete || !sourceSegmentsComplete || !allProfilesDone()) { + return; + } + processingComplete = true; + setDoneMessage("Upload and transcoding complete.", { success: true }); + uploadBtn.disabled = false; + uploadInFlight = false; + refreshReadyList(); + setPlayerStatus("Ready to play. Select a video and press Play.", { success: true }); +} + function appendLog(message, tone = "") { if (!logEl) { return; @@ -284,6 +369,21 @@ function resetProgress({ preserveRetry } = {}) { if (processingBlock) { processingBlock.classList.add("hidden"); } + if (transcodeBlock) { + transcodeBlock.classList.add("hidden"); + } + ["low", "medium", "high"].forEach((profile) => { + const dom = getTranscodeDom(profile); + if (dom.bar) { + dom.bar.style.width = "0%"; + } + if (dom.percent) { + dom.percent.textContent = ""; + } + if (dom.track) { + dom.track.classList.add("indeterminate"); + } + }); if (doneMessage && !preserveRetry) { setDoneMessage("Upload complete.", { success: true, hidden: true }); } @@ -368,6 +468,7 @@ function connectWebSocket(wsUrl, videoId) { if (processingTrack) { processingTrack.classList.remove("indeterminate"); } + ["low", "medium", "high"].forEach((profile) => updateTranscodeProfileUi(profile)); return; } if (payload && payload.type === "progress" && typeof payload.completedSegments === "number") { @@ -377,19 +478,35 @@ function connectWebSocket(wsUrl, videoId) { processingBar.style.width = `${percent}%`; processingPercent.textContent = `${percent}% (${completedSegments}/${totalSegments})`; processingTrack.classList.remove("indeterminate"); - if (completedSegments >= totalSegments && doneMessage) { - processingComplete = true; - setDoneMessage("Upload complete.", { success: true }); - uploadBtn.disabled = false; - uploadInFlight = false; - refreshReadyList(); - setPlayerStatus("Ready to play. Select a video and press Play.", { success: true }); + if (completedSegments >= totalSegments) { + sourceSegmentsComplete = true; + tryFinalizeSuccess(); } } else { processingPercent.textContent = `${completedSegments} events`; } return; } + if (payload && payload.type === "transcode_progress" && payload.profile) { + const profile = `${payload.profile}`.toLowerCase(); + const state = transcodeProfiles[profile]; + if (!state) { + return; + } + if (typeof payload.doneSegments === "number") { + state.done = Math.max(state.done, payload.doneSegments); + } + if (typeof payload.totalSegments === "number" && payload.totalSegments > 0) { + totalSegments = payload.totalSegments; + } + if (typeof payload.segmentNumber === "number" && payload.segmentNumber >= 0 && payload.state) { + state.segments.set(payload.segmentNumber, payload.state); + recalcTranscodeCounters(profile); + } + updateTranscodeProfileUi(profile); + tryFinalizeSuccess(); + return; + } if (payload && payload.type === "failed") { const reason = `${payload.reason || ""}`.trim(); const normalizedReason = reason.toLowerCase().replace(/\s+/g, "_"); @@ -424,13 +541,9 @@ function connectWebSocket(wsUrl, videoId) { processingBar.style.width = `${percent}%`; processingPercent.textContent = `${percent}% (${completedSegments}/${totalSegments})`; processingTrack.classList.remove("indeterminate"); - if (completedSegments >= totalSegments && doneMessage) { - processingComplete = true; - setDoneMessage("Upload complete.", { success: true }); - uploadBtn.disabled = false; - uploadInFlight = false; - refreshReadyList(); - setPlayerStatus("Ready to play. Select a video and press Play.", { success: true }); + if (completedSegments >= totalSegments) { + sourceSegmentsComplete = true; + tryFinalizeSuccess(); } } else { processingPercent.textContent = `${completedSegments} events`; @@ -612,9 +725,13 @@ function uploadFile({ preserveLog, isRetry } = {}) { if (processingBlock) { processingBlock.classList.remove("hidden"); } + if (transcodeBlock) { + transcodeBlock.classList.remove("hidden"); + } if (processingPercent) { processingPercent.textContent = "0%"; } + ["low", "medium", "high"].forEach((profile) => updateTranscodeProfileUi(profile)); await fetchUploadInfo(baseUrl, currentVideoId, payload.uploadStatusUrl); setPlayerStatus("Ready list updates when processing completes.", { success: true }); diff --git a/frontend/index.html b/frontend/index.html index e568d1a..0c3eabe 100644 --- a/frontend/index.html +++ b/frontend/index.html @@ -49,6 +49,30 @@

Upload Console

+ +
diff --git a/processing-service/pom.xml b/processing-service/pom.xml index c913aaf..cf166e1 100644 --- a/processing-service/pom.xml +++ b/processing-service/pom.xml @@ -26,6 +26,11 @@ jackson-databind 2.16.1 + + org.postgresql + postgresql + 42.7.3 + org.mockito mockito-core @@ -40,4 +45,4 @@ - \ No newline at end of file + diff --git a/processing-service/src/main/java/com/distributed26/videostreaming/processing/ProcessingServiceApplication.java b/processing-service/src/main/java/com/distributed26/videostreaming/processing/ProcessingServiceApplication.java index f12199c..3a63e29 100644 --- a/processing-service/src/main/java/com/distributed26/videostreaming/processing/ProcessingServiceApplication.java +++ b/processing-service/src/main/java/com/distributed26/videostreaming/processing/ProcessingServiceApplication.java @@ -1,10 +1,13 @@ package com.distributed26.videostreaming.processing; +import com.distributed26.videostreaming.processing.db.TranscodedSegmentStatusRepository; import com.distributed26.videostreaming.shared.config.StorageConfig; import com.distributed26.videostreaming.shared.storage.ObjectStorageClient; import com.distributed26.videostreaming.shared.storage.S3StorageClient; import com.distributed26.videostreaming.shared.upload.RabbitMQJobTaskBus; import com.distributed26.videostreaming.shared.upload.events.JobTaskEvent; +import com.distributed26.videostreaming.shared.upload.events.TranscodeProgressEvent; +import com.distributed26.videostreaming.shared.upload.events.TranscodeSegmentState; import com.distributed26.videostreaming.shared.upload.events.UploadMetaEvent; import io.github.cdimascio.dotenv.Dotenv; import io.javalin.Javalin; @@ -42,6 +45,11 @@ public class ProcessingServiceApplication { private static final Logger LOGGER = LogManager.getLogger(ProcessingServiceApplication.class); private static final String MANIFEST_PROCESSOR_EXECUTOR_NAME = "AbrManifestGenerator"; private static final Set MANIFESTS_IN_FLIGHT = ConcurrentHashMap.newKeySet(); + private static final Map TOTAL_SEGMENTS_BY_VIDEO = new ConcurrentHashMap<>(); + private static volatile TranscodedSegmentStatusRepository transcodeStatusRepository; + private static volatile RabbitMQJobTaskBus statusBus; + private static volatile AbrManifestService manifestServiceRef; + private static volatile ExecutorService manifestExecutorRef; static final TranscodingProfile[] PROFILES = { TranscodingProfile.LOW, @@ -67,31 +75,35 @@ public static void main(String[] args) throws Exception { storageClient.ensureBucketExists(); LOGGER.info("Storage ready — bucket={}", storageConfig.getDefaultBucketName()); - // Worker pool - int poolSize = Integer.parseInt(getEnvOrDotenv(dotenv, "WORKER_POOL_SIZE", "4")); - BlockingQueue taskQueue = new LinkedBlockingQueue<>(); - List workers = new ArrayList<>(poolSize); - for (int i = 0; i < poolSize; i++) { - TranscodingWorker w = new TranscodingWorker("worker-" + i, storageClient, taskQueue); - w.start(); - workers.add(w); - } - LOGGER.info("Started {} transcoding worker(s)", poolSize); - // RabbitMQ bus — fromEnv() reads SERVICE_MODE; any value other than // "upload" enables the AMQP consumer. Set SERVICE_MODE=processing and // use a dedicated RABBITMQ_STATUS_QUEUE so the processing service gets // its own copy of every upload.status.* message (TOPIC fan-out). RabbitMQJobTaskBus bus = RabbitMQJobTaskBus.fromEnv(); + statusBus = bus; AbrManifestService manifestService = new AbrManifestService( storageClient, Integer.parseInt(getEnvOrDotenv(dotenv, "ABR_MANIFEST_WAIT_SECONDS", "120")) ); + manifestServiceRef = manifestService; ExecutorService manifestExecutor = Executors.newSingleThreadExecutor( r -> new Thread(r, MANIFEST_PROCESSOR_EXECUTOR_NAME) ); + manifestExecutorRef = manifestExecutor; LOGGER.info("RabbitMQJobTaskBus connected"); + // Worker pool + int poolSize = Integer.parseInt(getEnvOrDotenv(dotenv, "WORKER_POOL_SIZE", "4")); + BlockingQueue taskQueue = new LinkedBlockingQueue<>(); + transcodeStatusRepository = createTranscodeStatusRepository(); + List workers = new ArrayList<>(poolSize); + for (int i = 0; i < poolSize; i++) { + TranscodingWorker w = new TranscodingWorker("worker-" + i, storageClient, taskQueue, bus, transcodeStatusRepository); + w.start(); + workers.add(w); + } + LOGGER.info("Started {} transcoding worker(s)", poolSize); + // subscribeAll ensures every incoming event reaches onEvent() regardless // of whether we've seen that videoId before — fixing the first-event drop. bus.subscribeAll(ev -> onEvent(ev, taskQueue, manifestService, manifestExecutor)); @@ -105,6 +117,8 @@ public static void main(String[] args) throws Exception { LOGGER.info("Shutdown: stopping workers..."); workers.forEach(TranscodingWorker::stop); manifestExecutor.shutdownNow(); + manifestExecutorRef = null; + manifestServiceRef = null; try { bus.close(); } catch (Exception e) { LOGGER.warn("Error closing bus", e); } })); @@ -179,25 +193,8 @@ static void onEvent(JobTaskEvent event, BlockingQueue taskQueue // Build variant + master manifests after source segmentation is done. LOGGER.info("UploadMetaEvent: videoId={} totalSegments={} (tasks already in flight)", videoId, meta.getTotalSegments()); - - if (MANIFESTS_IN_FLIGHT.add(videoId)) { - try { - manifestExecutor.execute(() -> { - try { - manifestService.generateIfNeeded(videoId, meta.getTotalSegments()); - } catch (Exception e) { - LOGGER.error("Manifest generation failed for videoId={}", videoId, e); - } finally { - MANIFESTS_IN_FLIGHT.remove(videoId); - } - }); - } catch (RuntimeException e) { - MANIFESTS_IN_FLIGHT.remove(videoId); - LOGGER.error("Failed to submit manifest generation task for videoId={}", videoId, e); - } - } else { - LOGGER.debug("Manifest generation already running/skipped for videoId={}", videoId); - } + TOTAL_SEGMENTS_BY_VIDEO.put(videoId, meta.getTotalSegments()); + scheduleManifestGeneration(videoId, meta.getTotalSegments(), manifestService, manifestExecutor); return; } @@ -211,8 +208,10 @@ static void onEvent(JobTaskEvent event, BlockingQueue taskQueue } LOGGER.info("Chunk received: videoId={} key={} — queuing {} tasks", videoId, chunkKey, PROFILES.length); + int segmentNumber = parseSegmentNumber(chunkKey); for (TranscodingProfile profile : PROFILES) { taskQueue.offer(new TranscodingTask(UUID.randomUUID().toString(), videoId, chunkKey, profile)); + publishTranscodeState(videoId, profile.getName(), segmentNumber, TranscodeSegmentState.QUEUED); } } @@ -229,5 +228,97 @@ private static String getEnvOrDotenv(Dotenv dotenv, String key, String defaultVa static void resetState() { CHUNKS_BY_VIDEO.clear(); MANIFESTS_IN_FLIGHT.clear(); + TOTAL_SEGMENTS_BY_VIDEO.clear(); + } + + static void publishTranscodeState(String videoId, String profile, int segmentNumber, TranscodeSegmentState state) { + if (transcodeStatusRepository == null || statusBus == null || segmentNumber < 0) { + return; + } + try { + transcodeStatusRepository.upsertState(videoId, profile, segmentNumber, state); + int done = transcodeStatusRepository.countByState(videoId, profile, TranscodeSegmentState.DONE); + int total = TOTAL_SEGMENTS_BY_VIDEO.getOrDefault(videoId, 0); + int total = TOTAL_SEGMENTS_BY_VIDEO.getOrDefault(videoId, 0); + int done = 0; + if (state == TranscodeSegmentState.DONE) { + done = transcodeStatusRepository.countByState(videoId, profile, TranscodeSegmentState.DONE); + } + statusBus.publish(new TranscodeProgressEvent(videoId, profile, segmentNumber, state, done, total)); + if (state == TranscodeSegmentState.DONE + && total > 0 + && !MANIFESTS_IN_FLIGHT.contains(videoId) + && areAllProfilesDone(videoId, total)) { + scheduleManifestGeneration(videoId, total, manifestServiceRef, manifestExecutorRef); + } + } catch (Exception e) { + LOGGER.warn("Failed to persist/publish transcode progress videoId={} profile={} segment={} state={}", + videoId, profile, segmentNumber, state, e); + } + } + + private static boolean areAllProfilesDone(String videoId, int totalSegments) { + if (transcodeStatusRepository == null || totalSegments <= 0) { + return false; + } + for (TranscodingProfile profile : PROFILES) { + int done = transcodeStatusRepository.countByState(videoId, profile.getName(), TranscodeSegmentState.DONE); + if (done < totalSegments) { + return false; + } + } + return true; + } + + private static void scheduleManifestGeneration( + String videoId, + int totalSegments, + AbrManifestService manifestService, + ExecutorService manifestExecutor + ) { + if (manifestService == null || manifestExecutor == null) { + LOGGER.warn("Cannot schedule manifest generation for videoId={} because manifest generator is not configured", + videoId); + return; + } + if (!MANIFESTS_IN_FLIGHT.add(videoId)) { + LOGGER.debug("Manifest generation already running/skipped for videoId={}", videoId); + return; + } + try { + manifestExecutor.execute(() -> { + try { + manifestService.generateIfNeeded(videoId, totalSegments); + } catch (Exception e) { + LOGGER.error("Manifest generation failed for videoId={}", videoId, e); + } finally { + MANIFESTS_IN_FLIGHT.remove(videoId); + } + }); + } catch (RuntimeException e) { + MANIFESTS_IN_FLIGHT.remove(videoId); + LOGGER.error("Failed to submit manifest generation task for videoId={}", videoId, e); + } + } + + private static int parseSegmentNumber(String chunkKey) { + if (chunkKey == null || chunkKey.isBlank()) { + return -1; + } + java.util.regex.Matcher matcher = java.util.regex.Pattern.compile("(\\d+)").matcher(chunkKey); + int last = -1; + while (matcher.find()) { + last = Integer.parseInt(matcher.group(1)); + } + return last; + } + + private static TranscodedSegmentStatusRepository createTranscodeStatusRepository() { + try { + return TranscodedSegmentStatusRepository.fromEnv(); + } catch (IllegalStateException e) { + LOGGER.warn("Postgres not configured; transcoding progress disabled: {}", e.getMessage()); + return null; + } } } diff --git a/processing-service/src/main/java/com/distributed26/videostreaming/processing/TranscodingTask.java b/processing-service/src/main/java/com/distributed26/videostreaming/processing/TranscodingTask.java index a0e00cb..4f5f89a 100644 --- a/processing-service/src/main/java/com/distributed26/videostreaming/processing/TranscodingTask.java +++ b/processing-service/src/main/java/com/distributed26/videostreaming/processing/TranscodingTask.java @@ -79,6 +79,14 @@ private static final class FfmpegHolder { * Download source, transcode, upload result. Idempotent — skips if outputKey already exists. */ public void execute(ObjectStorageClient storageClient) throws IOException { + execute(storageClient, null); + } + + /** + * Download source, transcode, and upload output. Optional callback runs + * after transcoding completes and right before upload begins. + */ + public void execute(ObjectStorageClient storageClient, Runnable beforeUpload) throws IOException { if (storageClient.fileExists(outputKey)) { LOGGER.info("Output already exists, skipping: {}", outputKey); return; @@ -110,12 +118,17 @@ public void execute(ObjectStorageClient storageClient) throws IOException { .addExtraArgs("-maxrate", maxrate) .addExtraArgs("-bufsize", bufsize) .addExtraArgs("-threads", String.valueOf(FFMPEG_THREADS)) - .addExtraArgs("-an") + .addExtraArgs("-c:a", "aac") + .addExtraArgs("-b:a", "128k") + .addExtraArgs("-ac", "2") .done(); new FFmpegExecutor(FfmpegHolder.FFMPEG, FfmpegHolder.FFPROBE).createJob(builder).run(); long size = Files.size(outputTemp); + if (beforeUpload != null) { + beforeUpload.run(); + } LOGGER.info("Uploading transcoded output: {} ({} bytes)", outputKey, size); try (InputStream is = new FileInputStream(outputTemp.toFile())) { storageClient.uploadFile(outputKey, is, size); diff --git a/processing-service/src/main/java/com/distributed26/videostreaming/processing/TranscodingWorker.java b/processing-service/src/main/java/com/distributed26/videostreaming/processing/TranscodingWorker.java index 2b672f1..37c8ef0 100644 --- a/processing-service/src/main/java/com/distributed26/videostreaming/processing/TranscodingWorker.java +++ b/processing-service/src/main/java/com/distributed26/videostreaming/processing/TranscodingWorker.java @@ -1,9 +1,12 @@ package com.distributed26.videostreaming.processing; +import com.distributed26.videostreaming.processing.db.TranscodedSegmentStatusRepository; import com.distributed26.videostreaming.shared.jobs.Status; import com.distributed26.videostreaming.shared.jobs.Worker; import com.distributed26.videostreaming.shared.jobs.WorkerStatus; import com.distributed26.videostreaming.shared.storage.ObjectStorageClient; +import com.distributed26.videostreaming.shared.upload.RabbitMQJobTaskBus; +import com.distributed26.videostreaming.shared.upload.events.TranscodeSegmentState; import java.time.Instant; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; @@ -19,14 +22,20 @@ public class TranscodingWorker { private final Worker worker; private final ObjectStorageClient storageClient; private final BlockingQueue queue; + private final RabbitMQJobTaskBus bus; + private final TranscodedSegmentStatusRepository transcodeStatusRepository; private volatile boolean running = false; private Thread thread; public TranscodingWorker(String id, ObjectStorageClient storageClient, - BlockingQueue queue) { + BlockingQueue queue, + RabbitMQJobTaskBus bus, + TranscodedSegmentStatusRepository transcodeStatusRepository) { this.worker = new Worker(id, Instant.now()); this.storageClient = storageClient; this.queue = queue; + this.bus = bus; + this.transcodeStatusRepository = transcodeStatusRepository; } public String getId() { return worker.getId(); } @@ -62,13 +71,16 @@ private void runLoop() { task.setStatus(Status.RUNNING); LOGGER.info("Worker {} picked up task {} (chunk={} profile={})", worker.getId(), task.getId(), task.getChunkKey(), task.getProfile().getName()); + emitState(task, TranscodeSegmentState.TRANSCODING); try { - task.execute(storageClient); + task.execute(storageClient, () -> emitState(task, TranscodeSegmentState.UPLOADING)); task.setStatus(Status.SUCCEEDED); + emitState(task, TranscodeSegmentState.DONE); LOGGER.info("Task {} succeeded", task.getId()); } catch (Exception e) { task.setStatus(Status.FAILED); + emitState(task, TranscodeSegmentState.FAILED); LOGGER.error("Task {} failed: {}", task.getId(), e.getMessage(), e); } @@ -80,4 +92,32 @@ private void runLoop() { } } } + + private void emitState(TranscodingTask task, TranscodeSegmentState state) { + if (bus == null || transcodeStatusRepository == null) { + return; + } + int segmentNumber = extractSegmentNumber(task.getChunkKey()); + if (segmentNumber < 0) { + return; + } + ProcessingServiceApplication.publishTranscodeState( + task.getJobId(), + task.getProfile().getName(), + segmentNumber, + state + ); + } + + private static int extractSegmentNumber(String chunkKey) { + if (chunkKey == null || chunkKey.isBlank()) { + return -1; + } + java.util.regex.Matcher matcher = java.util.regex.Pattern.compile("(\\d+)").matcher(chunkKey); + int last = -1; + while (matcher.find()) { + last = Integer.parseInt(matcher.group(1)); + } + return last; + } } diff --git a/processing-service/src/main/java/com/distributed26/videostreaming/processing/db/TranscodedSegmentStatusRepository.java b/processing-service/src/main/java/com/distributed26/videostreaming/processing/db/TranscodedSegmentStatusRepository.java new file mode 100644 index 0000000..cd05645 --- /dev/null +++ b/processing-service/src/main/java/com/distributed26/videostreaming/processing/db/TranscodedSegmentStatusRepository.java @@ -0,0 +1,80 @@ +package com.distributed26.videostreaming.processing.db; + +import com.distributed26.videostreaming.shared.upload.events.TranscodeSegmentState; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.UUID; + +public class TranscodedSegmentStatusRepository { + private final String jdbcUrl; + private final String username; + private final String password; + + public TranscodedSegmentStatusRepository(String jdbcUrl, String username, String password) { + this.jdbcUrl = jdbcUrl; + this.username = username; + this.password = password; + } + + public static TranscodedSegmentStatusRepository fromEnv() { + String url = System.getenv("PG_URL"); + String user = System.getenv("PG_USER"); + String pass = System.getenv("PG_PASSWORD"); + + if (url == null || url.isBlank()) { + throw new IllegalStateException("PG_URL is not set"); + } + if (user == null || user.isBlank()) { + throw new IllegalStateException("PG_USER is not set"); + } + return new TranscodedSegmentStatusRepository(url, user, pass); + } + + public void upsertState(String videoId, String profile, int segmentNumber, TranscodeSegmentState state) { + String sql = """ + INSERT INTO transcoded_segment_status (video_id, profile, segment_number, state) + VALUES (?, ?, ?, ?) + ON CONFLICT (video_id, profile, segment_number) DO UPDATE + SET state = CASE + WHEN transcoded_segment_status.state = 'DONE' AND EXCLUDED.state <> 'DONE' + THEN transcoded_segment_status.state + ELSE EXCLUDED.state + END, + updated_at = NOW() + """; + try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password); + PreparedStatement ps = conn.prepareStatement(sql)) { + ps.setObject(1, UUID.fromString(videoId)); + ps.setString(2, profile); + ps.setInt(3, segmentNumber); + ps.setString(4, state.name()); + ps.executeUpdate(); + } catch (SQLException e) { + throw new RuntimeException("Failed to upsert transcoded_segment_status", e); + } + } + + public int countByState(String videoId, String profile, TranscodeSegmentState state) { + String sql = """ + SELECT COUNT(*) FROM transcoded_segment_status + WHERE video_id = ? AND profile = ? AND state = ? + """; + try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password); + PreparedStatement ps = conn.prepareStatement(sql)) { + ps.setObject(1, UUID.fromString(videoId)); + ps.setString(2, profile); + ps.setString(3, state.name()); + try (ResultSet rs = ps.executeQuery()) { + if (rs.next()) { + return rs.getInt(1); + } + return 0; + } + } catch (SQLException e) { + throw new RuntimeException("Failed to count transcoded_segment_status", e); + } + } +} diff --git a/shared/src/main/java/com/distributed26/videostreaming/shared/upload/RabbitMQJobTaskBus.java b/shared/src/main/java/com/distributed26/videostreaming/shared/upload/RabbitMQJobTaskBus.java index 12e7b4d..d261ae8 100644 --- a/shared/src/main/java/com/distributed26/videostreaming/shared/upload/RabbitMQJobTaskBus.java +++ b/shared/src/main/java/com/distributed26/videostreaming/shared/upload/RabbitMQJobTaskBus.java @@ -1,6 +1,8 @@ package com.distributed26.videostreaming.shared.upload; import com.distributed26.videostreaming.shared.upload.events.JobTaskEvent; +import com.distributed26.videostreaming.shared.upload.events.TranscodeProgressEvent; +import com.distributed26.videostreaming.shared.upload.events.TranscodeSegmentState; import com.distributed26.videostreaming.shared.upload.events.UploadFailedEvent; import com.distributed26.videostreaming.shared.upload.events.UploadMetaEvent; import com.fasterxml.jackson.databind.JsonNode; @@ -184,6 +186,20 @@ private JobTaskEvent toEvent(JsonNode node) { if ("meta".equals(type) && node.has("totalSegments")) { return new UploadMetaEvent(jobId, node.path("totalSegments").asInt()); } + if ("transcode_progress".equals(type)) { + String profile = node.path("profile").asText(""); + int segmentNumber = node.path("segmentNumber").asInt(-1); + int doneSegments = node.path("doneSegments").asInt(0); + int totalSegments = node.path("totalSegments").asInt(0); + String stateRaw = node.path("state").asText("FAILED"); + TranscodeSegmentState state; + try { + state = TranscodeSegmentState.valueOf(stateRaw.toUpperCase()); + } catch (IllegalArgumentException e) { + state = TranscodeSegmentState.FAILED; + } + return new TranscodeProgressEvent(jobId, profile, segmentNumber, state, doneSegments, totalSegments); + } String taskId = node.path("taskId").asText("task"); return new JobTaskEvent(jobId, taskId); } @@ -195,6 +211,9 @@ private String describeEventType(JobTaskEvent event) { if (event instanceof UploadMetaEvent) { return "meta"; } + if (event instanceof TranscodeProgressEvent) { + return "transcode_progress"; + } return "task"; } diff --git a/shared/src/main/java/com/distributed26/videostreaming/shared/upload/events/TranscodeProgressEvent.java b/shared/src/main/java/com/distributed26/videostreaming/shared/upload/events/TranscodeProgressEvent.java new file mode 100644 index 0000000..14c1659 --- /dev/null +++ b/shared/src/main/java/com/distributed26/videostreaming/shared/upload/events/TranscodeProgressEvent.java @@ -0,0 +1,52 @@ +package com.distributed26.videostreaming.shared.upload.events; + +import java.util.Objects; + +public class TranscodeProgressEvent extends JobTaskEvent { + private final String type = "transcode_progress"; + private final String profile; + private final int segmentNumber; + private final TranscodeSegmentState state; + private final int doneSegments; + private final int totalSegments; + + public TranscodeProgressEvent( + String jobId, + String profile, + int segmentNumber, + TranscodeSegmentState state, + int doneSegments, + int totalSegments + ) { + super(jobId, "transcode:" + Objects.requireNonNull(profile, "profile is null") + ":" + segmentNumber); + this.profile = profile; + this.segmentNumber = segmentNumber; + this.state = Objects.requireNonNull(state, "state is null"); + this.doneSegments = doneSegments; + this.totalSegments = totalSegments; + } + + public String getType() { + return type; + } + + public String getProfile() { + return profile; + } + + public int getSegmentNumber() { + return segmentNumber; + } + + public TranscodeSegmentState getState() { + return state; + } + + public int getDoneSegments() { + return doneSegments; + } + + public int getTotalSegments() { + return totalSegments; + } +} diff --git a/shared/src/main/java/com/distributed26/videostreaming/shared/upload/events/TranscodeSegmentState.java b/shared/src/main/java/com/distributed26/videostreaming/shared/upload/events/TranscodeSegmentState.java new file mode 100644 index 0000000..aeed4ab --- /dev/null +++ b/shared/src/main/java/com/distributed26/videostreaming/shared/upload/events/TranscodeSegmentState.java @@ -0,0 +1,9 @@ +package com.distributed26.videostreaming.shared.upload.events; + +public enum TranscodeSegmentState { + QUEUED, + TRANSCODING, + UPLOADING, + DONE, + FAILED +} diff --git a/upload-service/docs/db/schema.sql b/upload-service/docs/db/schema.sql index 5c74da7..ede0add 100644 --- a/upload-service/docs/db/schema.sql +++ b/upload-service/docs/db/schema.sql @@ -23,3 +23,16 @@ CREATE TABLE IF NOT EXISTS segment_upload ( CREATE INDEX IF NOT EXISTS idx_segment_upload_video_id ON segment_upload(video_id); + +CREATE TABLE IF NOT EXISTS transcoded_segment_status ( + id SERIAL PRIMARY KEY, + video_id UUID NOT NULL REFERENCES video_upload(video_id) ON DELETE CASCADE, + profile VARCHAR(32) NOT NULL, + segment_number INTEGER NOT NULL, + state VARCHAR(32) NOT NULL, + updated_at TIMESTAMP NOT NULL DEFAULT NOW(), + UNIQUE (video_id, profile, segment_number) +); + +CREATE INDEX IF NOT EXISTS idx_transcoded_segment_status_video_profile_state + ON transcoded_segment_status(video_id, profile, state); diff --git a/upload-service/src/main/java/com/distributed26/videostreaming/upload/db/TranscodedSegmentStatusRepository.java b/upload-service/src/main/java/com/distributed26/videostreaming/upload/db/TranscodedSegmentStatusRepository.java new file mode 100644 index 0000000..2029c1c --- /dev/null +++ b/upload-service/src/main/java/com/distributed26/videostreaming/upload/db/TranscodedSegmentStatusRepository.java @@ -0,0 +1,65 @@ +package com.distributed26.videostreaming.upload.db; + +import io.github.cdimascio.dotenv.Dotenv; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.UUID; + +public class TranscodedSegmentStatusRepository { + private final String jdbcUrl; + private final String username; + private final String password; + + public TranscodedSegmentStatusRepository(String jdbcUrl, String username, String password) { + this.jdbcUrl = jdbcUrl; + this.username = username; + this.password = password; + } + + public static TranscodedSegmentStatusRepository fromEnv() { + Dotenv dotenv = Dotenv.configure().directory("./").ignoreIfMissing().load(); + String url = getenvOrDotenv(dotenv, "PG_URL"); + String user = getenvOrDotenv(dotenv, "PG_USER"); + String pass = getenvOrDotenv(dotenv, "PG_PASSWORD"); + + if (url == null || url.isBlank()) { + throw new IllegalStateException("PG_URL is not set"); + } + if (user == null || user.isBlank()) { + throw new IllegalStateException("PG_USER is not set"); + } + return new TranscodedSegmentStatusRepository(url, user, pass); + } + + private static String getenvOrDotenv(Dotenv dotenv, String key) { + String envVal = System.getenv(key); + if (envVal != null && !envVal.isBlank()) { + return envVal; + } + return dotenv.get(key); + } + + public int countByState(String videoId, String profile, String state) { + String sql = """ + SELECT COUNT(*) FROM transcoded_segment_status + WHERE video_id = ? AND profile = ? AND state = ? + """; + try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password); + PreparedStatement ps = conn.prepareStatement(sql)) { + ps.setObject(1, UUID.fromString(videoId)); + ps.setString(2, profile); + ps.setString(3, state); + try (ResultSet rs = ps.executeQuery()) { + if (rs.next()) { + return rs.getInt(1); + } + return 0; + } + } catch (SQLException e) { + throw new RuntimeException("Failed to count transcoded_segment_status", e); + } + } +} diff --git a/upload-service/src/main/java/com/distributed26/videostreaming/upload/upload/UploadServiceApplication.java b/upload-service/src/main/java/com/distributed26/videostreaming/upload/upload/UploadServiceApplication.java index edc8ccd..8fd6b17 100644 --- a/upload-service/src/main/java/com/distributed26/videostreaming/upload/upload/UploadServiceApplication.java +++ b/upload-service/src/main/java/com/distributed26/videostreaming/upload/upload/UploadServiceApplication.java @@ -5,6 +5,7 @@ import com.distributed26.videostreaming.shared.storage.S3StorageClient; import com.distributed26.videostreaming.shared.upload.JobTaskBus; import com.distributed26.videostreaming.shared.upload.RabbitMQJobTaskBus; +import com.distributed26.videostreaming.upload.db.TranscodedSegmentStatusRepository; import com.distributed26.videostreaming.upload.db.VideoUploadRepository; import io.javalin.Javalin; import io.javalin.config.SizeUnit; @@ -82,7 +83,9 @@ static Javalin createStatusApp(JobTaskBus jobTaskBus) { ensureLogsDirectory(); VideoUploadRepository videoUploadRepository = createVideoUploadRepository(); com.distributed26.videostreaming.upload.db.SegmentUploadRepository segmentUploadRepository = createSegmentUploadRepository(); - UploadStatusWebSocket uploadStatusWebSocket = new UploadStatusWebSocket(jobTaskBus, segmentUploadRepository); + TranscodedSegmentStatusRepository transcodedSegmentStatusRepository = createTranscodedSegmentStatusRepository(); + UploadStatusWebSocket uploadStatusWebSocket = + new UploadStatusWebSocket(jobTaskBus, segmentUploadRepository, transcodedSegmentStatusRepository); UploadInfoHandler uploadInfoHandler = new UploadInfoHandler(videoUploadRepository); Javalin app = Javalin.create(); @@ -142,6 +145,15 @@ private static com.distributed26.videostreaming.upload.db.SegmentUploadRepositor } } + private static TranscodedSegmentStatusRepository createTranscodedSegmentStatusRepository() { + try { + return TranscodedSegmentStatusRepository.fromEnv(); + } catch (IllegalStateException e) { + logger.warn("Postgres not configured; transcoded segment status disabled: {}", e.getMessage()); + return null; + } + } + private static void ensureLogsDirectory() { try { java.nio.file.Files.createDirectories(java.nio.file.Path.of("logs")); diff --git a/upload-service/src/main/java/com/distributed26/videostreaming/upload/upload/UploadStatusWebSocket.java b/upload-service/src/main/java/com/distributed26/videostreaming/upload/upload/UploadStatusWebSocket.java index 66296a7..8d45d1a 100644 --- a/upload-service/src/main/java/com/distributed26/videostreaming/upload/upload/UploadStatusWebSocket.java +++ b/upload-service/src/main/java/com/distributed26/videostreaming/upload/upload/UploadStatusWebSocket.java @@ -2,8 +2,11 @@ import com.distributed26.videostreaming.shared.upload.JobTaskBus; import com.distributed26.videostreaming.shared.upload.JobTaskListener; +import com.distributed26.videostreaming.shared.upload.events.TranscodeProgressEvent; +import com.distributed26.videostreaming.shared.upload.events.TranscodeSegmentState; import com.distributed26.videostreaming.shared.upload.events.UploadProgressEvent; import com.distributed26.videostreaming.upload.db.SegmentUploadRepository; +import com.distributed26.videostreaming.upload.db.TranscodedSegmentStatusRepository; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; @@ -27,12 +30,18 @@ public class UploadStatusWebSocket { .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); private final JobTaskBus jobTaskBus; private final SegmentUploadRepository segmentUploadRepository; + private final TranscodedSegmentStatusRepository transcodedSegmentStatusRepository; private final Map jobListenersByContext = new ConcurrentHashMap<>(); private final Map jobIdByContext = new ConcurrentHashMap<>(); - public UploadStatusWebSocket(JobTaskBus jobTaskBus, SegmentUploadRepository segmentUploadRepository) { + public UploadStatusWebSocket( + JobTaskBus jobTaskBus, + SegmentUploadRepository segmentUploadRepository, + TranscodedSegmentStatusRepository transcodedSegmentStatusRepository + ) { this.jobTaskBus = Objects.requireNonNull(jobTaskBus, "jobTaskBus is null"); this.segmentUploadRepository = segmentUploadRepository; + this.transcodedSegmentStatusRepository = transcodedSegmentStatusRepository; } public void configure(WsConfig ws) { @@ -95,6 +104,16 @@ private void bindJob(WsContext ctx, String jobId) { } else { logger.warn("SegmentUploadRepository is null; progress snapshot disabled"); } + if (transcodedSegmentStatusRepository != null) { + try { + sendTranscodeSnapshot(ctx, jobId); + } catch (Exception e) { + logger.warn("Failed to fetch transcode progress for jobId={}", jobId, e); + ctx.send("{\"error\":\"transcode_progress_lookup_failed\"}"); + } + } else { + logger.warn("TranscodedSegmentStatusRepository is null; transcode snapshot disabled"); + } } private void cleanup(WsCloseContext ctx) { @@ -120,6 +139,16 @@ private void sendProgressSnapshot(WsContext ctx, String jobId) throws JsonProces ctx.send(objectMapper.writeValueAsString(new UploadProgressEvent(jobId, completedSegments))); } + private void sendTranscodeSnapshot(WsContext ctx, String jobId) throws JsonProcessingException { + String[] profiles = {"low", "medium", "high"}; + for (String profile : profiles) { + int done = transcodedSegmentStatusRepository.countByState(jobId, profile, TranscodeSegmentState.DONE.name()); + ctx.send(objectMapper.writeValueAsString( + new TranscodeProgressEvent(jobId, profile, -1, TranscodeSegmentState.DONE, done, 0) + )); + } + } + private String describeEventType(com.distributed26.videostreaming.shared.upload.events.JobTaskEvent event) { if (event instanceof com.distributed26.videostreaming.shared.upload.events.UploadFailedEvent failed) { return failed.getType(); @@ -127,6 +156,9 @@ private String describeEventType(com.distributed26.videostreaming.shared.upload. if (event instanceof com.distributed26.videostreaming.shared.upload.events.UploadMetaEvent) { return "meta"; } + if (event instanceof TranscodeProgressEvent) { + return "transcode_progress"; + } return "task"; } }