diff --git a/docker-compose.yaml b/docker-compose.yaml
index b8656e0..3ba3263 100644
--- a/docker-compose.yaml
+++ b/docker-compose.yaml
@@ -47,6 +47,9 @@ services:
env_file:
- .env
environment:
+ PG_URL: jdbc:postgresql://postgres:5432/${PG_DB}
+ PG_USER: ${PG_USER}
+ PG_PASSWORD: ${PG_PASSWORD}
MINIO_ENDPOINT: http://minio:9000
RABBITMQ_HOST: rabbitmq
SERVICE_MODE: processing
diff --git a/docker_compose.swarm.yaml b/docker_compose.swarm.yaml
index f2688bd..3f34bea 100644
--- a/docker_compose.swarm.yaml
+++ b/docker_compose.swarm.yaml
@@ -181,6 +181,9 @@ services:
processing-service:
image: video-streaming-app:latest
environment:
+ PG_URL: ${PG_URL}
+ PG_USER: ${PG_USER}
+ PG_PASSWORD: ${PG_PASSWORD}
MINIO_ENDPOINT: ${MINIO_ENDPOINT}
MINIO_ACCESS_KEY: ${MINIO_ACCESS_KEY}
MINIO_SECRET_KEY: ${MINIO_SECRET_KEY}
diff --git a/frontend/app.js b/frontend/app.js
index 39cdb62..f2c35f5 100644
--- a/frontend/app.js
+++ b/frontend/app.js
@@ -11,6 +11,16 @@ const processingBar = document.getElementById("processingBar");
const processingPercent = document.getElementById("processingPercent");
const processingTrack = document.getElementById("processingTrack");
const processingBlock = document.getElementById("processingBlock");
+const transcodeBlock = document.getElementById("transcodeBlock");
+const transcodeLowBar = document.getElementById("transcodeLowBar");
+const transcodeLowPercent = document.getElementById("transcodeLowPercent");
+const transcodeLowTrack = document.getElementById("transcodeLowTrack");
+const transcodeMediumBar = document.getElementById("transcodeMediumBar");
+const transcodeMediumPercent = document.getElementById("transcodeMediumPercent");
+const transcodeMediumTrack = document.getElementById("transcodeMediumTrack");
+const transcodeHighBar = document.getElementById("transcodeHighBar");
+const transcodeHighPercent = document.getElementById("transcodeHighPercent");
+const transcodeHighTrack = document.getElementById("transcodeHighTrack");
const doneMessage = document.getElementById("doneMessage");
const reconnectBtn = document.getElementById("reconnectBtn");
const player = document.getElementById("player");
@@ -23,6 +33,7 @@ let ws = null;
let currentVideoId = null;
let totalSegments = null;
let completedSegments = 0;
+let sourceSegmentsComplete = false;
let currentWsUrl = null;
let processingComplete = false;
let uploadInFlight = false;
@@ -37,6 +48,11 @@ const RETRY_INTERVAL_MS = 1000;
let retrySecondsLeft = 0;
let hlsInstance = null;
let selectedVideoId = null;
+const transcodeProfiles = {
+ low: { done: 0, transcoding: 0, uploading: 0, failed: 0, segments: new Map() },
+ medium: { done: 0, transcoding: 0, uploading: 0, failed: 0, segments: new Map() },
+ high: { done: 0, transcoding: 0, uploading: 0, failed: 0, segments: new Map() }
+};
function clearRetryTimers() {
if (retryTimerId) {
@@ -96,14 +112,83 @@ function scheduleRetry(reason) {
function resetStateForNextUpload() {
totalSegments = null;
completedSegments = 0;
+ sourceSegmentsComplete = false;
processingComplete = false;
failureTerminal = false;
retryInFlight = false;
retrySecondsLeft = 0;
+ ["low", "medium", "high"].forEach((profile) => {
+ const state = transcodeProfiles[profile];
+ state.done = 0;
+ state.transcoding = 0;
+ state.uploading = 0;
+ state.failed = 0;
+ state.segments = new Map();
+ });
clearRetryTimers();
teardownPlayer();
}
+function getTranscodeDom(profile) {
+ if (profile === "low") {
+ return { bar: transcodeLowBar, percent: transcodeLowPercent, track: transcodeLowTrack };
+ }
+ if (profile === "medium") {
+ return { bar: transcodeMediumBar, percent: transcodeMediumPercent, track: transcodeMediumTrack };
+ }
+ return { bar: transcodeHighBar, percent: transcodeHighPercent, track: transcodeHighTrack };
+}
+
+function recalcTranscodeCounters(profile) {
+ const state = transcodeProfiles[profile];
+ let transcoding = 0;
+ let uploading = 0;
+ let failed = 0;
+ state.segments.forEach((status) => {
+ if (status === "TRANSCODING") transcoding += 1;
+ if (status === "UPLOADING") uploading += 1;
+ if (status === "FAILED") failed += 1;
+ });
+ state.transcoding = transcoding;
+ state.uploading = uploading;
+ state.failed = failed;
+}
+
+function updateTranscodeProfileUi(profile) {
+ const state = transcodeProfiles[profile];
+ const dom = getTranscodeDom(profile);
+ if (!dom.bar || !dom.percent || !dom.track) {
+ return;
+ }
+ if (totalSegments) {
+ const percent = Math.min(100, Math.round((state.done / totalSegments) * 100));
+ dom.bar.style.width = `${percent}%`;
+ dom.percent.textContent = `${percent}% (${state.done}/${totalSegments})`;
+ dom.track.classList.remove("indeterminate");
+ } else {
+ dom.percent.textContent = `${state.done} done`;
+ }
+}
+
+function allProfilesDone() {
+ if (!totalSegments) {
+ return false;
+ }
+ return ["low", "medium", "high"].every((profile) => transcodeProfiles[profile].done >= totalSegments);
+}
+
+function tryFinalizeSuccess() {
+ if (processingComplete || !sourceSegmentsComplete || !allProfilesDone()) {
+ return;
+ }
+ processingComplete = true;
+ setDoneMessage("Upload and transcoding complete.", { success: true });
+ uploadBtn.disabled = false;
+ uploadInFlight = false;
+ refreshReadyList();
+ setPlayerStatus("Ready to play. Select a video and press Play.", { success: true });
+}
+
function appendLog(message, tone = "") {
if (!logEl) {
return;
@@ -284,6 +369,21 @@ function resetProgress({ preserveRetry } = {}) {
if (processingBlock) {
processingBlock.classList.add("hidden");
}
+ if (transcodeBlock) {
+ transcodeBlock.classList.add("hidden");
+ }
+ ["low", "medium", "high"].forEach((profile) => {
+ const dom = getTranscodeDom(profile);
+ if (dom.bar) {
+ dom.bar.style.width = "0%";
+ }
+ if (dom.percent) {
+ dom.percent.textContent = "";
+ }
+ if (dom.track) {
+ dom.track.classList.add("indeterminate");
+ }
+ });
if (doneMessage && !preserveRetry) {
setDoneMessage("Upload complete.", { success: true, hidden: true });
}
@@ -368,6 +468,7 @@ function connectWebSocket(wsUrl, videoId) {
if (processingTrack) {
processingTrack.classList.remove("indeterminate");
}
+ ["low", "medium", "high"].forEach((profile) => updateTranscodeProfileUi(profile));
return;
}
if (payload && payload.type === "progress" && typeof payload.completedSegments === "number") {
@@ -377,19 +478,35 @@ function connectWebSocket(wsUrl, videoId) {
processingBar.style.width = `${percent}%`;
processingPercent.textContent = `${percent}% (${completedSegments}/${totalSegments})`;
processingTrack.classList.remove("indeterminate");
- if (completedSegments >= totalSegments && doneMessage) {
- processingComplete = true;
- setDoneMessage("Upload complete.", { success: true });
- uploadBtn.disabled = false;
- uploadInFlight = false;
- refreshReadyList();
- setPlayerStatus("Ready to play. Select a video and press Play.", { success: true });
+ if (completedSegments >= totalSegments) {
+ sourceSegmentsComplete = true;
+ tryFinalizeSuccess();
}
} else {
processingPercent.textContent = `${completedSegments} events`;
}
return;
}
+ if (payload && payload.type === "transcode_progress" && payload.profile) {
+ const profile = `${payload.profile}`.toLowerCase();
+ const state = transcodeProfiles[profile];
+ if (!state) {
+ return;
+ }
+ if (typeof payload.doneSegments === "number") {
+ state.done = Math.max(state.done, payload.doneSegments);
+ }
+ if (typeof payload.totalSegments === "number" && payload.totalSegments > 0) {
+ totalSegments = payload.totalSegments;
+ }
+ if (typeof payload.segmentNumber === "number" && payload.segmentNumber >= 0 && payload.state) {
+ state.segments.set(payload.segmentNumber, payload.state);
+ recalcTranscodeCounters(profile);
+ }
+ updateTranscodeProfileUi(profile);
+ tryFinalizeSuccess();
+ return;
+ }
if (payload && payload.type === "failed") {
const reason = `${payload.reason || ""}`.trim();
const normalizedReason = reason.toLowerCase().replace(/\s+/g, "_");
@@ -424,13 +541,9 @@ function connectWebSocket(wsUrl, videoId) {
processingBar.style.width = `${percent}%`;
processingPercent.textContent = `${percent}% (${completedSegments}/${totalSegments})`;
processingTrack.classList.remove("indeterminate");
- if (completedSegments >= totalSegments && doneMessage) {
- processingComplete = true;
- setDoneMessage("Upload complete.", { success: true });
- uploadBtn.disabled = false;
- uploadInFlight = false;
- refreshReadyList();
- setPlayerStatus("Ready to play. Select a video and press Play.", { success: true });
+ if (completedSegments >= totalSegments) {
+ sourceSegmentsComplete = true;
+ tryFinalizeSuccess();
}
} else {
processingPercent.textContent = `${completedSegments} events`;
@@ -612,9 +725,13 @@ function uploadFile({ preserveLog, isRetry } = {}) {
if (processingBlock) {
processingBlock.classList.remove("hidden");
}
+ if (transcodeBlock) {
+ transcodeBlock.classList.remove("hidden");
+ }
if (processingPercent) {
processingPercent.textContent = "0%";
}
+ ["low", "medium", "high"].forEach((profile) => updateTranscodeProfileUi(profile));
await fetchUploadInfo(baseUrl, currentVideoId, payload.uploadStatusUrl);
setPlayerStatus("Ready list updates when processing completes.", { success: true });
diff --git a/frontend/index.html b/frontend/index.html
index e568d1a..0c3eabe 100644
--- a/frontend/index.html
+++ b/frontend/index.html
@@ -49,6 +49,30 @@
Upload Console
+
+
+
+ Transcode (Low)
+
+
+
+
+ Transcode (Medium)
+
+
+
+
+ Transcode (High)
+
+
+
+
diff --git a/processing-service/pom.xml b/processing-service/pom.xml
index c913aaf..cf166e1 100644
--- a/processing-service/pom.xml
+++ b/processing-service/pom.xml
@@ -26,6 +26,11 @@
jackson-databind
2.16.1
+
+ org.postgresql
+ postgresql
+ 42.7.3
+
org.mockito
mockito-core
@@ -40,4 +45,4 @@
-
\ No newline at end of file
+
diff --git a/processing-service/src/main/java/com/distributed26/videostreaming/processing/ProcessingServiceApplication.java b/processing-service/src/main/java/com/distributed26/videostreaming/processing/ProcessingServiceApplication.java
index f12199c..3a63e29 100644
--- a/processing-service/src/main/java/com/distributed26/videostreaming/processing/ProcessingServiceApplication.java
+++ b/processing-service/src/main/java/com/distributed26/videostreaming/processing/ProcessingServiceApplication.java
@@ -1,10 +1,13 @@
package com.distributed26.videostreaming.processing;
+import com.distributed26.videostreaming.processing.db.TranscodedSegmentStatusRepository;
import com.distributed26.videostreaming.shared.config.StorageConfig;
import com.distributed26.videostreaming.shared.storage.ObjectStorageClient;
import com.distributed26.videostreaming.shared.storage.S3StorageClient;
import com.distributed26.videostreaming.shared.upload.RabbitMQJobTaskBus;
import com.distributed26.videostreaming.shared.upload.events.JobTaskEvent;
+import com.distributed26.videostreaming.shared.upload.events.TranscodeProgressEvent;
+import com.distributed26.videostreaming.shared.upload.events.TranscodeSegmentState;
import com.distributed26.videostreaming.shared.upload.events.UploadMetaEvent;
import io.github.cdimascio.dotenv.Dotenv;
import io.javalin.Javalin;
@@ -42,6 +45,11 @@ public class ProcessingServiceApplication {
private static final Logger LOGGER = LogManager.getLogger(ProcessingServiceApplication.class);
private static final String MANIFEST_PROCESSOR_EXECUTOR_NAME = "AbrManifestGenerator";
private static final Set MANIFESTS_IN_FLIGHT = ConcurrentHashMap.newKeySet();
+ private static final Map TOTAL_SEGMENTS_BY_VIDEO = new ConcurrentHashMap<>();
+ private static volatile TranscodedSegmentStatusRepository transcodeStatusRepository;
+ private static volatile RabbitMQJobTaskBus statusBus;
+ private static volatile AbrManifestService manifestServiceRef;
+ private static volatile ExecutorService manifestExecutorRef;
static final TranscodingProfile[] PROFILES = {
TranscodingProfile.LOW,
@@ -67,31 +75,35 @@ public static void main(String[] args) throws Exception {
storageClient.ensureBucketExists();
LOGGER.info("Storage ready — bucket={}", storageConfig.getDefaultBucketName());
- // Worker pool
- int poolSize = Integer.parseInt(getEnvOrDotenv(dotenv, "WORKER_POOL_SIZE", "4"));
- BlockingQueue taskQueue = new LinkedBlockingQueue<>();
- List workers = new ArrayList<>(poolSize);
- for (int i = 0; i < poolSize; i++) {
- TranscodingWorker w = new TranscodingWorker("worker-" + i, storageClient, taskQueue);
- w.start();
- workers.add(w);
- }
- LOGGER.info("Started {} transcoding worker(s)", poolSize);
-
// RabbitMQ bus — fromEnv() reads SERVICE_MODE; any value other than
// "upload" enables the AMQP consumer. Set SERVICE_MODE=processing and
// use a dedicated RABBITMQ_STATUS_QUEUE so the processing service gets
// its own copy of every upload.status.* message (TOPIC fan-out).
RabbitMQJobTaskBus bus = RabbitMQJobTaskBus.fromEnv();
+ statusBus = bus;
AbrManifestService manifestService = new AbrManifestService(
storageClient,
Integer.parseInt(getEnvOrDotenv(dotenv, "ABR_MANIFEST_WAIT_SECONDS", "120"))
);
+ manifestServiceRef = manifestService;
ExecutorService manifestExecutor = Executors.newSingleThreadExecutor(
r -> new Thread(r, MANIFEST_PROCESSOR_EXECUTOR_NAME)
);
+ manifestExecutorRef = manifestExecutor;
LOGGER.info("RabbitMQJobTaskBus connected");
+ // Worker pool
+ int poolSize = Integer.parseInt(getEnvOrDotenv(dotenv, "WORKER_POOL_SIZE", "4"));
+ BlockingQueue taskQueue = new LinkedBlockingQueue<>();
+ transcodeStatusRepository = createTranscodeStatusRepository();
+ List workers = new ArrayList<>(poolSize);
+ for (int i = 0; i < poolSize; i++) {
+ TranscodingWorker w = new TranscodingWorker("worker-" + i, storageClient, taskQueue, bus, transcodeStatusRepository);
+ w.start();
+ workers.add(w);
+ }
+ LOGGER.info("Started {} transcoding worker(s)", poolSize);
+
// subscribeAll ensures every incoming event reaches onEvent() regardless
// of whether we've seen that videoId before — fixing the first-event drop.
bus.subscribeAll(ev -> onEvent(ev, taskQueue, manifestService, manifestExecutor));
@@ -105,6 +117,8 @@ public static void main(String[] args) throws Exception {
LOGGER.info("Shutdown: stopping workers...");
workers.forEach(TranscodingWorker::stop);
manifestExecutor.shutdownNow();
+ manifestExecutorRef = null;
+ manifestServiceRef = null;
try { bus.close(); } catch (Exception e) { LOGGER.warn("Error closing bus", e); }
}));
@@ -179,25 +193,8 @@ static void onEvent(JobTaskEvent event, BlockingQueue taskQueue
// Build variant + master manifests after source segmentation is done.
LOGGER.info("UploadMetaEvent: videoId={} totalSegments={} (tasks already in flight)",
videoId, meta.getTotalSegments());
-
- if (MANIFESTS_IN_FLIGHT.add(videoId)) {
- try {
- manifestExecutor.execute(() -> {
- try {
- manifestService.generateIfNeeded(videoId, meta.getTotalSegments());
- } catch (Exception e) {
- LOGGER.error("Manifest generation failed for videoId={}", videoId, e);
- } finally {
- MANIFESTS_IN_FLIGHT.remove(videoId);
- }
- });
- } catch (RuntimeException e) {
- MANIFESTS_IN_FLIGHT.remove(videoId);
- LOGGER.error("Failed to submit manifest generation task for videoId={}", videoId, e);
- }
- } else {
- LOGGER.debug("Manifest generation already running/skipped for videoId={}", videoId);
- }
+ TOTAL_SEGMENTS_BY_VIDEO.put(videoId, meta.getTotalSegments());
+ scheduleManifestGeneration(videoId, meta.getTotalSegments(), manifestService, manifestExecutor);
return;
}
@@ -211,8 +208,10 @@ static void onEvent(JobTaskEvent event, BlockingQueue taskQueue
}
LOGGER.info("Chunk received: videoId={} key={} — queuing {} tasks",
videoId, chunkKey, PROFILES.length);
+ int segmentNumber = parseSegmentNumber(chunkKey);
for (TranscodingProfile profile : PROFILES) {
taskQueue.offer(new TranscodingTask(UUID.randomUUID().toString(), videoId, chunkKey, profile));
+ publishTranscodeState(videoId, profile.getName(), segmentNumber, TranscodeSegmentState.QUEUED);
}
}
@@ -229,5 +228,97 @@ private static String getEnvOrDotenv(Dotenv dotenv, String key, String defaultVa
static void resetState() {
CHUNKS_BY_VIDEO.clear();
MANIFESTS_IN_FLIGHT.clear();
+ TOTAL_SEGMENTS_BY_VIDEO.clear();
+ }
+
+ static void publishTranscodeState(String videoId, String profile, int segmentNumber, TranscodeSegmentState state) {
+ if (transcodeStatusRepository == null || statusBus == null || segmentNumber < 0) {
+ return;
+ }
+ try {
+ transcodeStatusRepository.upsertState(videoId, profile, segmentNumber, state);
+ int done = transcodeStatusRepository.countByState(videoId, profile, TranscodeSegmentState.DONE);
+ int total = TOTAL_SEGMENTS_BY_VIDEO.getOrDefault(videoId, 0);
+ int total = TOTAL_SEGMENTS_BY_VIDEO.getOrDefault(videoId, 0);
+ int done = 0;
+ if (state == TranscodeSegmentState.DONE) {
+ done = transcodeStatusRepository.countByState(videoId, profile, TranscodeSegmentState.DONE);
+ }
+ statusBus.publish(new TranscodeProgressEvent(videoId, profile, segmentNumber, state, done, total));
+ if (state == TranscodeSegmentState.DONE
+ && total > 0
+ && !MANIFESTS_IN_FLIGHT.contains(videoId)
+ && areAllProfilesDone(videoId, total)) {
+ scheduleManifestGeneration(videoId, total, manifestServiceRef, manifestExecutorRef);
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Failed to persist/publish transcode progress videoId={} profile={} segment={} state={}",
+ videoId, profile, segmentNumber, state, e);
+ }
+ }
+
+ private static boolean areAllProfilesDone(String videoId, int totalSegments) {
+ if (transcodeStatusRepository == null || totalSegments <= 0) {
+ return false;
+ }
+ for (TranscodingProfile profile : PROFILES) {
+ int done = transcodeStatusRepository.countByState(videoId, profile.getName(), TranscodeSegmentState.DONE);
+ if (done < totalSegments) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static void scheduleManifestGeneration(
+ String videoId,
+ int totalSegments,
+ AbrManifestService manifestService,
+ ExecutorService manifestExecutor
+ ) {
+ if (manifestService == null || manifestExecutor == null) {
+ LOGGER.warn("Cannot schedule manifest generation for videoId={} because manifest generator is not configured",
+ videoId);
+ return;
+ }
+ if (!MANIFESTS_IN_FLIGHT.add(videoId)) {
+ LOGGER.debug("Manifest generation already running/skipped for videoId={}", videoId);
+ return;
+ }
+ try {
+ manifestExecutor.execute(() -> {
+ try {
+ manifestService.generateIfNeeded(videoId, totalSegments);
+ } catch (Exception e) {
+ LOGGER.error("Manifest generation failed for videoId={}", videoId, e);
+ } finally {
+ MANIFESTS_IN_FLIGHT.remove(videoId);
+ }
+ });
+ } catch (RuntimeException e) {
+ MANIFESTS_IN_FLIGHT.remove(videoId);
+ LOGGER.error("Failed to submit manifest generation task for videoId={}", videoId, e);
+ }
+ }
+
+ private static int parseSegmentNumber(String chunkKey) {
+ if (chunkKey == null || chunkKey.isBlank()) {
+ return -1;
+ }
+ java.util.regex.Matcher matcher = java.util.regex.Pattern.compile("(\\d+)").matcher(chunkKey);
+ int last = -1;
+ while (matcher.find()) {
+ last = Integer.parseInt(matcher.group(1));
+ }
+ return last;
+ }
+
+ private static TranscodedSegmentStatusRepository createTranscodeStatusRepository() {
+ try {
+ return TranscodedSegmentStatusRepository.fromEnv();
+ } catch (IllegalStateException e) {
+ LOGGER.warn("Postgres not configured; transcoding progress disabled: {}", e.getMessage());
+ return null;
+ }
}
}
diff --git a/processing-service/src/main/java/com/distributed26/videostreaming/processing/TranscodingTask.java b/processing-service/src/main/java/com/distributed26/videostreaming/processing/TranscodingTask.java
index a0e00cb..4f5f89a 100644
--- a/processing-service/src/main/java/com/distributed26/videostreaming/processing/TranscodingTask.java
+++ b/processing-service/src/main/java/com/distributed26/videostreaming/processing/TranscodingTask.java
@@ -79,6 +79,14 @@ private static final class FfmpegHolder {
* Download source, transcode, upload result. Idempotent — skips if outputKey already exists.
*/
public void execute(ObjectStorageClient storageClient) throws IOException {
+ execute(storageClient, null);
+ }
+
+ /**
+ * Download source, transcode, and upload output. Optional callback runs
+ * after transcoding completes and right before upload begins.
+ */
+ public void execute(ObjectStorageClient storageClient, Runnable beforeUpload) throws IOException {
if (storageClient.fileExists(outputKey)) {
LOGGER.info("Output already exists, skipping: {}", outputKey);
return;
@@ -110,12 +118,17 @@ public void execute(ObjectStorageClient storageClient) throws IOException {
.addExtraArgs("-maxrate", maxrate)
.addExtraArgs("-bufsize", bufsize)
.addExtraArgs("-threads", String.valueOf(FFMPEG_THREADS))
- .addExtraArgs("-an")
+ .addExtraArgs("-c:a", "aac")
+ .addExtraArgs("-b:a", "128k")
+ .addExtraArgs("-ac", "2")
.done();
new FFmpegExecutor(FfmpegHolder.FFMPEG, FfmpegHolder.FFPROBE).createJob(builder).run();
long size = Files.size(outputTemp);
+ if (beforeUpload != null) {
+ beforeUpload.run();
+ }
LOGGER.info("Uploading transcoded output: {} ({} bytes)", outputKey, size);
try (InputStream is = new FileInputStream(outputTemp.toFile())) {
storageClient.uploadFile(outputKey, is, size);
diff --git a/processing-service/src/main/java/com/distributed26/videostreaming/processing/TranscodingWorker.java b/processing-service/src/main/java/com/distributed26/videostreaming/processing/TranscodingWorker.java
index 2b672f1..37c8ef0 100644
--- a/processing-service/src/main/java/com/distributed26/videostreaming/processing/TranscodingWorker.java
+++ b/processing-service/src/main/java/com/distributed26/videostreaming/processing/TranscodingWorker.java
@@ -1,9 +1,12 @@
package com.distributed26.videostreaming.processing;
+import com.distributed26.videostreaming.processing.db.TranscodedSegmentStatusRepository;
import com.distributed26.videostreaming.shared.jobs.Status;
import com.distributed26.videostreaming.shared.jobs.Worker;
import com.distributed26.videostreaming.shared.jobs.WorkerStatus;
import com.distributed26.videostreaming.shared.storage.ObjectStorageClient;
+import com.distributed26.videostreaming.shared.upload.RabbitMQJobTaskBus;
+import com.distributed26.videostreaming.shared.upload.events.TranscodeSegmentState;
import java.time.Instant;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -19,14 +22,20 @@ public class TranscodingWorker {
private final Worker worker;
private final ObjectStorageClient storageClient;
private final BlockingQueue queue;
+ private final RabbitMQJobTaskBus bus;
+ private final TranscodedSegmentStatusRepository transcodeStatusRepository;
private volatile boolean running = false;
private Thread thread;
public TranscodingWorker(String id, ObjectStorageClient storageClient,
- BlockingQueue queue) {
+ BlockingQueue queue,
+ RabbitMQJobTaskBus bus,
+ TranscodedSegmentStatusRepository transcodeStatusRepository) {
this.worker = new Worker(id, Instant.now());
this.storageClient = storageClient;
this.queue = queue;
+ this.bus = bus;
+ this.transcodeStatusRepository = transcodeStatusRepository;
}
public String getId() { return worker.getId(); }
@@ -62,13 +71,16 @@ private void runLoop() {
task.setStatus(Status.RUNNING);
LOGGER.info("Worker {} picked up task {} (chunk={} profile={})",
worker.getId(), task.getId(), task.getChunkKey(), task.getProfile().getName());
+ emitState(task, TranscodeSegmentState.TRANSCODING);
try {
- task.execute(storageClient);
+ task.execute(storageClient, () -> emitState(task, TranscodeSegmentState.UPLOADING));
task.setStatus(Status.SUCCEEDED);
+ emitState(task, TranscodeSegmentState.DONE);
LOGGER.info("Task {} succeeded", task.getId());
} catch (Exception e) {
task.setStatus(Status.FAILED);
+ emitState(task, TranscodeSegmentState.FAILED);
LOGGER.error("Task {} failed: {}", task.getId(), e.getMessage(), e);
}
@@ -80,4 +92,32 @@ private void runLoop() {
}
}
}
+
+ private void emitState(TranscodingTask task, TranscodeSegmentState state) {
+ if (bus == null || transcodeStatusRepository == null) {
+ return;
+ }
+ int segmentNumber = extractSegmentNumber(task.getChunkKey());
+ if (segmentNumber < 0) {
+ return;
+ }
+ ProcessingServiceApplication.publishTranscodeState(
+ task.getJobId(),
+ task.getProfile().getName(),
+ segmentNumber,
+ state
+ );
+ }
+
+ private static int extractSegmentNumber(String chunkKey) {
+ if (chunkKey == null || chunkKey.isBlank()) {
+ return -1;
+ }
+ java.util.regex.Matcher matcher = java.util.regex.Pattern.compile("(\\d+)").matcher(chunkKey);
+ int last = -1;
+ while (matcher.find()) {
+ last = Integer.parseInt(matcher.group(1));
+ }
+ return last;
+ }
}
diff --git a/processing-service/src/main/java/com/distributed26/videostreaming/processing/db/TranscodedSegmentStatusRepository.java b/processing-service/src/main/java/com/distributed26/videostreaming/processing/db/TranscodedSegmentStatusRepository.java
new file mode 100644
index 0000000..cd05645
--- /dev/null
+++ b/processing-service/src/main/java/com/distributed26/videostreaming/processing/db/TranscodedSegmentStatusRepository.java
@@ -0,0 +1,80 @@
+package com.distributed26.videostreaming.processing.db;
+
+import com.distributed26.videostreaming.shared.upload.events.TranscodeSegmentState;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.UUID;
+
+public class TranscodedSegmentStatusRepository {
+ private final String jdbcUrl;
+ private final String username;
+ private final String password;
+
+ public TranscodedSegmentStatusRepository(String jdbcUrl, String username, String password) {
+ this.jdbcUrl = jdbcUrl;
+ this.username = username;
+ this.password = password;
+ }
+
+ public static TranscodedSegmentStatusRepository fromEnv() {
+ String url = System.getenv("PG_URL");
+ String user = System.getenv("PG_USER");
+ String pass = System.getenv("PG_PASSWORD");
+
+ if (url == null || url.isBlank()) {
+ throw new IllegalStateException("PG_URL is not set");
+ }
+ if (user == null || user.isBlank()) {
+ throw new IllegalStateException("PG_USER is not set");
+ }
+ return new TranscodedSegmentStatusRepository(url, user, pass);
+ }
+
+ public void upsertState(String videoId, String profile, int segmentNumber, TranscodeSegmentState state) {
+ String sql = """
+ INSERT INTO transcoded_segment_status (video_id, profile, segment_number, state)
+ VALUES (?, ?, ?, ?)
+ ON CONFLICT (video_id, profile, segment_number) DO UPDATE
+ SET state = CASE
+ WHEN transcoded_segment_status.state = 'DONE' AND EXCLUDED.state <> 'DONE'
+ THEN transcoded_segment_status.state
+ ELSE EXCLUDED.state
+ END,
+ updated_at = NOW()
+ """;
+ try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password);
+ PreparedStatement ps = conn.prepareStatement(sql)) {
+ ps.setObject(1, UUID.fromString(videoId));
+ ps.setString(2, profile);
+ ps.setInt(3, segmentNumber);
+ ps.setString(4, state.name());
+ ps.executeUpdate();
+ } catch (SQLException e) {
+ throw new RuntimeException("Failed to upsert transcoded_segment_status", e);
+ }
+ }
+
+ public int countByState(String videoId, String profile, TranscodeSegmentState state) {
+ String sql = """
+ SELECT COUNT(*) FROM transcoded_segment_status
+ WHERE video_id = ? AND profile = ? AND state = ?
+ """;
+ try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password);
+ PreparedStatement ps = conn.prepareStatement(sql)) {
+ ps.setObject(1, UUID.fromString(videoId));
+ ps.setString(2, profile);
+ ps.setString(3, state.name());
+ try (ResultSet rs = ps.executeQuery()) {
+ if (rs.next()) {
+ return rs.getInt(1);
+ }
+ return 0;
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException("Failed to count transcoded_segment_status", e);
+ }
+ }
+}
diff --git a/shared/src/main/java/com/distributed26/videostreaming/shared/upload/RabbitMQJobTaskBus.java b/shared/src/main/java/com/distributed26/videostreaming/shared/upload/RabbitMQJobTaskBus.java
index 12e7b4d..d261ae8 100644
--- a/shared/src/main/java/com/distributed26/videostreaming/shared/upload/RabbitMQJobTaskBus.java
+++ b/shared/src/main/java/com/distributed26/videostreaming/shared/upload/RabbitMQJobTaskBus.java
@@ -1,6 +1,8 @@
package com.distributed26.videostreaming.shared.upload;
import com.distributed26.videostreaming.shared.upload.events.JobTaskEvent;
+import com.distributed26.videostreaming.shared.upload.events.TranscodeProgressEvent;
+import com.distributed26.videostreaming.shared.upload.events.TranscodeSegmentState;
import com.distributed26.videostreaming.shared.upload.events.UploadFailedEvent;
import com.distributed26.videostreaming.shared.upload.events.UploadMetaEvent;
import com.fasterxml.jackson.databind.JsonNode;
@@ -184,6 +186,20 @@ private JobTaskEvent toEvent(JsonNode node) {
if ("meta".equals(type) && node.has("totalSegments")) {
return new UploadMetaEvent(jobId, node.path("totalSegments").asInt());
}
+ if ("transcode_progress".equals(type)) {
+ String profile = node.path("profile").asText("");
+ int segmentNumber = node.path("segmentNumber").asInt(-1);
+ int doneSegments = node.path("doneSegments").asInt(0);
+ int totalSegments = node.path("totalSegments").asInt(0);
+ String stateRaw = node.path("state").asText("FAILED");
+ TranscodeSegmentState state;
+ try {
+ state = TranscodeSegmentState.valueOf(stateRaw.toUpperCase());
+ } catch (IllegalArgumentException e) {
+ state = TranscodeSegmentState.FAILED;
+ }
+ return new TranscodeProgressEvent(jobId, profile, segmentNumber, state, doneSegments, totalSegments);
+ }
String taskId = node.path("taskId").asText("task");
return new JobTaskEvent(jobId, taskId);
}
@@ -195,6 +211,9 @@ private String describeEventType(JobTaskEvent event) {
if (event instanceof UploadMetaEvent) {
return "meta";
}
+ if (event instanceof TranscodeProgressEvent) {
+ return "transcode_progress";
+ }
return "task";
}
diff --git a/shared/src/main/java/com/distributed26/videostreaming/shared/upload/events/TranscodeProgressEvent.java b/shared/src/main/java/com/distributed26/videostreaming/shared/upload/events/TranscodeProgressEvent.java
new file mode 100644
index 0000000..14c1659
--- /dev/null
+++ b/shared/src/main/java/com/distributed26/videostreaming/shared/upload/events/TranscodeProgressEvent.java
@@ -0,0 +1,52 @@
+package com.distributed26.videostreaming.shared.upload.events;
+
+import java.util.Objects;
+
+public class TranscodeProgressEvent extends JobTaskEvent {
+ private final String type = "transcode_progress";
+ private final String profile;
+ private final int segmentNumber;
+ private final TranscodeSegmentState state;
+ private final int doneSegments;
+ private final int totalSegments;
+
+ public TranscodeProgressEvent(
+ String jobId,
+ String profile,
+ int segmentNumber,
+ TranscodeSegmentState state,
+ int doneSegments,
+ int totalSegments
+ ) {
+ super(jobId, "transcode:" + Objects.requireNonNull(profile, "profile is null") + ":" + segmentNumber);
+ this.profile = profile;
+ this.segmentNumber = segmentNumber;
+ this.state = Objects.requireNonNull(state, "state is null");
+ this.doneSegments = doneSegments;
+ this.totalSegments = totalSegments;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public String getProfile() {
+ return profile;
+ }
+
+ public int getSegmentNumber() {
+ return segmentNumber;
+ }
+
+ public TranscodeSegmentState getState() {
+ return state;
+ }
+
+ public int getDoneSegments() {
+ return doneSegments;
+ }
+
+ public int getTotalSegments() {
+ return totalSegments;
+ }
+}
diff --git a/shared/src/main/java/com/distributed26/videostreaming/shared/upload/events/TranscodeSegmentState.java b/shared/src/main/java/com/distributed26/videostreaming/shared/upload/events/TranscodeSegmentState.java
new file mode 100644
index 0000000..aeed4ab
--- /dev/null
+++ b/shared/src/main/java/com/distributed26/videostreaming/shared/upload/events/TranscodeSegmentState.java
@@ -0,0 +1,9 @@
+package com.distributed26.videostreaming.shared.upload.events;
+
+public enum TranscodeSegmentState {
+ QUEUED,
+ TRANSCODING,
+ UPLOADING,
+ DONE,
+ FAILED
+}
diff --git a/upload-service/docs/db/schema.sql b/upload-service/docs/db/schema.sql
index 5c74da7..ede0add 100644
--- a/upload-service/docs/db/schema.sql
+++ b/upload-service/docs/db/schema.sql
@@ -23,3 +23,16 @@ CREATE TABLE IF NOT EXISTS segment_upload (
CREATE INDEX IF NOT EXISTS idx_segment_upload_video_id
ON segment_upload(video_id);
+
+CREATE TABLE IF NOT EXISTS transcoded_segment_status (
+ id SERIAL PRIMARY KEY,
+ video_id UUID NOT NULL REFERENCES video_upload(video_id) ON DELETE CASCADE,
+ profile VARCHAR(32) NOT NULL,
+ segment_number INTEGER NOT NULL,
+ state VARCHAR(32) NOT NULL,
+ updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
+ UNIQUE (video_id, profile, segment_number)
+);
+
+CREATE INDEX IF NOT EXISTS idx_transcoded_segment_status_video_profile_state
+ ON transcoded_segment_status(video_id, profile, state);
diff --git a/upload-service/src/main/java/com/distributed26/videostreaming/upload/db/TranscodedSegmentStatusRepository.java b/upload-service/src/main/java/com/distributed26/videostreaming/upload/db/TranscodedSegmentStatusRepository.java
new file mode 100644
index 0000000..2029c1c
--- /dev/null
+++ b/upload-service/src/main/java/com/distributed26/videostreaming/upload/db/TranscodedSegmentStatusRepository.java
@@ -0,0 +1,65 @@
+package com.distributed26.videostreaming.upload.db;
+
+import io.github.cdimascio.dotenv.Dotenv;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.UUID;
+
+public class TranscodedSegmentStatusRepository {
+ private final String jdbcUrl;
+ private final String username;
+ private final String password;
+
+ public TranscodedSegmentStatusRepository(String jdbcUrl, String username, String password) {
+ this.jdbcUrl = jdbcUrl;
+ this.username = username;
+ this.password = password;
+ }
+
+ public static TranscodedSegmentStatusRepository fromEnv() {
+ Dotenv dotenv = Dotenv.configure().directory("./").ignoreIfMissing().load();
+ String url = getenvOrDotenv(dotenv, "PG_URL");
+ String user = getenvOrDotenv(dotenv, "PG_USER");
+ String pass = getenvOrDotenv(dotenv, "PG_PASSWORD");
+
+ if (url == null || url.isBlank()) {
+ throw new IllegalStateException("PG_URL is not set");
+ }
+ if (user == null || user.isBlank()) {
+ throw new IllegalStateException("PG_USER is not set");
+ }
+ return new TranscodedSegmentStatusRepository(url, user, pass);
+ }
+
+ private static String getenvOrDotenv(Dotenv dotenv, String key) {
+ String envVal = System.getenv(key);
+ if (envVal != null && !envVal.isBlank()) {
+ return envVal;
+ }
+ return dotenv.get(key);
+ }
+
+ public int countByState(String videoId, String profile, String state) {
+ String sql = """
+ SELECT COUNT(*) FROM transcoded_segment_status
+ WHERE video_id = ? AND profile = ? AND state = ?
+ """;
+ try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password);
+ PreparedStatement ps = conn.prepareStatement(sql)) {
+ ps.setObject(1, UUID.fromString(videoId));
+ ps.setString(2, profile);
+ ps.setString(3, state);
+ try (ResultSet rs = ps.executeQuery()) {
+ if (rs.next()) {
+ return rs.getInt(1);
+ }
+ return 0;
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException("Failed to count transcoded_segment_status", e);
+ }
+ }
+}
diff --git a/upload-service/src/main/java/com/distributed26/videostreaming/upload/upload/UploadServiceApplication.java b/upload-service/src/main/java/com/distributed26/videostreaming/upload/upload/UploadServiceApplication.java
index edc8ccd..8fd6b17 100644
--- a/upload-service/src/main/java/com/distributed26/videostreaming/upload/upload/UploadServiceApplication.java
+++ b/upload-service/src/main/java/com/distributed26/videostreaming/upload/upload/UploadServiceApplication.java
@@ -5,6 +5,7 @@
import com.distributed26.videostreaming.shared.storage.S3StorageClient;
import com.distributed26.videostreaming.shared.upload.JobTaskBus;
import com.distributed26.videostreaming.shared.upload.RabbitMQJobTaskBus;
+import com.distributed26.videostreaming.upload.db.TranscodedSegmentStatusRepository;
import com.distributed26.videostreaming.upload.db.VideoUploadRepository;
import io.javalin.Javalin;
import io.javalin.config.SizeUnit;
@@ -82,7 +83,9 @@ static Javalin createStatusApp(JobTaskBus jobTaskBus) {
ensureLogsDirectory();
VideoUploadRepository videoUploadRepository = createVideoUploadRepository();
com.distributed26.videostreaming.upload.db.SegmentUploadRepository segmentUploadRepository = createSegmentUploadRepository();
- UploadStatusWebSocket uploadStatusWebSocket = new UploadStatusWebSocket(jobTaskBus, segmentUploadRepository);
+ TranscodedSegmentStatusRepository transcodedSegmentStatusRepository = createTranscodedSegmentStatusRepository();
+ UploadStatusWebSocket uploadStatusWebSocket =
+ new UploadStatusWebSocket(jobTaskBus, segmentUploadRepository, transcodedSegmentStatusRepository);
UploadInfoHandler uploadInfoHandler = new UploadInfoHandler(videoUploadRepository);
Javalin app = Javalin.create();
@@ -142,6 +145,15 @@ private static com.distributed26.videostreaming.upload.db.SegmentUploadRepositor
}
}
+ private static TranscodedSegmentStatusRepository createTranscodedSegmentStatusRepository() {
+ try {
+ return TranscodedSegmentStatusRepository.fromEnv();
+ } catch (IllegalStateException e) {
+ logger.warn("Postgres not configured; transcoded segment status disabled: {}", e.getMessage());
+ return null;
+ }
+ }
+
private static void ensureLogsDirectory() {
try {
java.nio.file.Files.createDirectories(java.nio.file.Path.of("logs"));
diff --git a/upload-service/src/main/java/com/distributed26/videostreaming/upload/upload/UploadStatusWebSocket.java b/upload-service/src/main/java/com/distributed26/videostreaming/upload/upload/UploadStatusWebSocket.java
index 66296a7..8d45d1a 100644
--- a/upload-service/src/main/java/com/distributed26/videostreaming/upload/upload/UploadStatusWebSocket.java
+++ b/upload-service/src/main/java/com/distributed26/videostreaming/upload/upload/UploadStatusWebSocket.java
@@ -2,8 +2,11 @@
import com.distributed26.videostreaming.shared.upload.JobTaskBus;
import com.distributed26.videostreaming.shared.upload.JobTaskListener;
+import com.distributed26.videostreaming.shared.upload.events.TranscodeProgressEvent;
+import com.distributed26.videostreaming.shared.upload.events.TranscodeSegmentState;
import com.distributed26.videostreaming.shared.upload.events.UploadProgressEvent;
import com.distributed26.videostreaming.upload.db.SegmentUploadRepository;
+import com.distributed26.videostreaming.upload.db.TranscodedSegmentStatusRepository;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
@@ -27,12 +30,18 @@ public class UploadStatusWebSocket {
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
private final JobTaskBus jobTaskBus;
private final SegmentUploadRepository segmentUploadRepository;
+ private final TranscodedSegmentStatusRepository transcodedSegmentStatusRepository;
private final Map jobListenersByContext = new ConcurrentHashMap<>();
private final Map jobIdByContext = new ConcurrentHashMap<>();
- public UploadStatusWebSocket(JobTaskBus jobTaskBus, SegmentUploadRepository segmentUploadRepository) {
+ public UploadStatusWebSocket(
+ JobTaskBus jobTaskBus,
+ SegmentUploadRepository segmentUploadRepository,
+ TranscodedSegmentStatusRepository transcodedSegmentStatusRepository
+ ) {
this.jobTaskBus = Objects.requireNonNull(jobTaskBus, "jobTaskBus is null");
this.segmentUploadRepository = segmentUploadRepository;
+ this.transcodedSegmentStatusRepository = transcodedSegmentStatusRepository;
}
public void configure(WsConfig ws) {
@@ -95,6 +104,16 @@ private void bindJob(WsContext ctx, String jobId) {
} else {
logger.warn("SegmentUploadRepository is null; progress snapshot disabled");
}
+ if (transcodedSegmentStatusRepository != null) {
+ try {
+ sendTranscodeSnapshot(ctx, jobId);
+ } catch (Exception e) {
+ logger.warn("Failed to fetch transcode progress for jobId={}", jobId, e);
+ ctx.send("{\"error\":\"transcode_progress_lookup_failed\"}");
+ }
+ } else {
+ logger.warn("TranscodedSegmentStatusRepository is null; transcode snapshot disabled");
+ }
}
private void cleanup(WsCloseContext ctx) {
@@ -120,6 +139,16 @@ private void sendProgressSnapshot(WsContext ctx, String jobId) throws JsonProces
ctx.send(objectMapper.writeValueAsString(new UploadProgressEvent(jobId, completedSegments)));
}
+ private void sendTranscodeSnapshot(WsContext ctx, String jobId) throws JsonProcessingException {
+ String[] profiles = {"low", "medium", "high"};
+ for (String profile : profiles) {
+ int done = transcodedSegmentStatusRepository.countByState(jobId, profile, TranscodeSegmentState.DONE.name());
+ ctx.send(objectMapper.writeValueAsString(
+ new TranscodeProgressEvent(jobId, profile, -1, TranscodeSegmentState.DONE, done, 0)
+ ));
+ }
+ }
+
private String describeEventType(com.distributed26.videostreaming.shared.upload.events.JobTaskEvent event) {
if (event instanceof com.distributed26.videostreaming.shared.upload.events.UploadFailedEvent failed) {
return failed.getType();
@@ -127,6 +156,9 @@ private String describeEventType(com.distributed26.videostreaming.shared.upload.
if (event instanceof com.distributed26.videostreaming.shared.upload.events.UploadMetaEvent) {
return "meta";
}
+ if (event instanceof TranscodeProgressEvent) {
+ return "transcode_progress";
+ }
return "task";
}
}