From 0cf23645f250e7327a5804be2995b01defa655b5 Mon Sep 17 00:00:00 2001 From: alchemistReturns Date: Thu, 29 Jan 2026 14:51:22 +0600 Subject: [PATCH] added async order processing --- docker-compose.yml | 12 +++ services/api-gateway/src/index.ts | 2 + services/frontend/src/App.tsx | 100 ++++++++++++++---- services/inventory-service/package.json | 3 +- services/inventory-service/src/index.ts | 128 ++++++++++++++++-------- services/order-service/package.json | 3 +- services/order-service/src/index.ts | 84 +++++++++++++++- 7 files changed, 265 insertions(+), 67 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index c7d403c..acd1ff8 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -79,6 +79,17 @@ services: ports: - "9090:9090" + rabbitmq: + image: rabbitmq:3-management + ports: + - "5672:5672" + - "15672:15672" + environment: + - RABBITMQ_DEFAULT_USER=user + - RABBITMQ_DEFAULT_PASS=password + volumes: + - rabbitmq-data:/var/lib/rabbitmq + grafana: image: grafana/grafana ports: @@ -95,4 +106,5 @@ services: volumes: order-db-data: inventory-db-data: + rabbitmq-data: grafana-storage: diff --git a/services/api-gateway/src/index.ts b/services/api-gateway/src/index.ts index 20e7f04..a17a4ac 100644 --- a/services/api-gateway/src/index.ts +++ b/services/api-gateway/src/index.ts @@ -18,6 +18,8 @@ const proxyOptions = { '^/api/products': '/products', '^/api/inventory': '/inventory', }, + timeout: 10000, + proxyTimeout: 10000 }; // Routes diff --git a/services/frontend/src/App.tsx b/services/frontend/src/App.tsx index 414b455..14c88f7 100644 --- a/services/frontend/src/App.tsx +++ b/services/frontend/src/App.tsx @@ -2,8 +2,7 @@ import { useState, useEffect } from 'react' import axios from 'axios' import './index.css' -const ORDER_SERVICE_URL = import.meta.env.VITE_ORDER_SERVICE_URL || 'http://localhost:3001'; -const INVENTORY_SERVICE_URL = import.meta.env.VITE_INVENTORY_SERVICE_URL || 'http://localhost:3002'; +const API_GATEWAY_URL = import.meta.env.VITE_API_GATEWAY_URL || 'http://localhost:8080/api'; interface Product { id: string; @@ -23,18 +22,18 @@ function App() { const checkHealth = async () => { try { - await axios.get(`${ORDER_SERVICE_URL}/health`); + await axios.get(`${API_GATEWAY_URL}/health/orders`); setHealth({ order: 'UP' }); } catch (e) { setHealth({ order: 'DOWN' }); } }; - const fetchProducts = async () => { + const fetchProducts = async (initializeSelection = false) => { try { - const res = await axios.get(`${INVENTORY_SERVICE_URL}/products`); + const res = await axios.get(`${API_GATEWAY_URL}/products`); setProducts(res.data); - if (res.data.length > 0 && !selectedProduct) { + if (initializeSelection && res.data.length > 0 && !selectedProduct) { setSelectedProduct(res.data[0].id); } } catch (e) { @@ -44,14 +43,51 @@ function App() { useEffect(() => { checkHealth(); - fetchProducts(); + fetchProducts(true); const interval = setInterval(() => { checkHealth(); - fetchProducts(); // Refresh stock levels + fetchProducts(false); // Refresh stock levels without resetting selection }, 5000); return () => clearInterval(interval); }, []); + const [queuedOrderIds, setQueuedOrderIds] = useState([]); + + useEffect(() => { + if (queuedOrderIds.length === 0) return; + + const pollInterval = setInterval(async () => { + try { + const res = await axios.get(`${API_GATEWAY_URL}/orders`); + const orders = res.data; + + // Check status of all queued orders + const remainingQueuedIds = queuedOrderIds.filter(id => { + const order = orders.find((o: any) => o.id === id); + if (order && order.status === 'COMPLETED') { + addLog(`✅ Async Order Completed! ID: ${id}`); + fetchProducts(); // Refresh stock + return false; // Remove from queued list + } + if (order && order.status === 'FAILED') { + addLog(`❌ Async Order Failed! ID: ${id}`); + return false; // Remove from queued list + } + return true; // Keep polling + }); + + if (remainingQueuedIds.length !== queuedOrderIds.length) { + setQueuedOrderIds(remainingQueuedIds); + } + + } catch (e) { + console.error("Polling error", e); + } + }, 2000); + + return () => clearInterval(pollInterval); + }, [queuedOrderIds]); + const placeOrder = async (isGremlin: boolean) => { if (!selectedProduct) { addLog("⚠️ No product selected!"); @@ -63,18 +99,44 @@ function App() { addLog(`Initiating Order... (Product: ${products.find(p => p.id === selectedProduct)?.name}, Gremlin: ${isGremlin ? 'ON' : 'OFF'})`); try { - // Use quantity=13 to trigger Gremlin Latency in Inventory Service - const quantity = isGremlin ? 3 : 1; - const response = await axios.post(`${ORDER_SERVICE_URL}/orders`, { + // Send 'gremlin' flag to trigger latency in Inventory Service + const quantity = 1; + const response = await axios.post(`${API_GATEWAY_URL}/orders`, { productId: selectedProduct, - quantity + quantity, + gremlin: isGremlin }); const end = performance.now(); const dur = Math.round(end - start); setLatency(dur); - addLog(`✅ Order Success! ID: ${response.data.id}. Duration: ${dur}ms`); - fetchProducts(); // Update stock immediately + + if (response.status === 202) { + // QUEUED + addLog(`⚠️ Order Queued: ${response.data.message}. Duration: ${dur}ms`); + + // Poll for completion + const orderId = response.data.id; + const pollInterval = setInterval(async () => { + try { + const pollRes = await axios.get(`${API_GATEWAY_URL}/orders`); + // Ideally we'd have a specific GET /orders/:id endpoint, but filtering list works for demo + const myOrder = pollRes.data.find((o: any) => o.id === orderId); + if (myOrder && myOrder.status === 'COMPLETED') { + addLog(`✅ Async Order Completed! ID: ${orderId}`); + clearInterval(pollInterval); + fetchProducts(); + } + } catch (e) { + console.error("Polling error", e); + } + }, 2000); + + } else { + // SUCCESS + addLog(`✅ Order Success! ID: ${response.data.id}. Duration: ${dur}ms`); + fetchProducts(); // Update stock immediately + } } catch (error: any) { const end = performance.now(); @@ -90,7 +152,7 @@ function App() { return ( <> -

Valerix Resilient Platform

+

Valerix

@@ -131,10 +193,12 @@ function App() { fontSize: '2rem', fontWeight: 'bold', marginBottom: '1rem', - color: latency !== null ? (latency > 1500 ? 'var(--danger)' : 'var(--success)') : 'inherit' + color: latency !== null ? (latency > 2000 ? '#e3b341' : latency > 1500 ? 'var(--danger)' : 'var(--success)') : 'inherit' }}> - {latency !== null ? `${latency}ms` : '---'} -
Last Request Latency
+ {latency !== null ? (latency > 2000 ? 'QUEUED' : `${latency}ms`) : '---'} +
+ {latency !== null && latency > 2000 ? 'Processed in Background' : 'Last Request Latency'} +
diff --git a/services/inventory-service/package.json b/services/inventory-service/package.json index a682502..b95d583 100644 --- a/services/inventory-service/package.json +++ b/services/inventory-service/package.json @@ -16,7 +16,8 @@ "@prisma/client": "^5.10.2", "cors": "^2.8.5", "prom-client": "^15.1.0", - "dotenv": "^16.4.5" + "dotenv": "^16.4.5", + "amqplib": "^0.10.3" }, "devDependencies": { "typescript": "^5.3.3", diff --git a/services/inventory-service/src/index.ts b/services/inventory-service/src/index.ts index 3ac8ecc..60519cb 100644 --- a/services/inventory-service/src/index.ts +++ b/services/inventory-service/src/index.ts @@ -62,61 +62,104 @@ app.get('/products', async (req, res) => { }); // Deduct Inventory (with Idempotency + Gremlin Latency) -app.post('/inventory/deduct', async (req: Request, res: Response) => { - const { productId, quantity, orderId } = req.body; - - if (!productId || !quantity || !orderId) { - res.status(400).json({ error: 'Missing productId, quantity, or orderId' }); - return; +const RABBITMQ_URL = process.env.RABBITMQ_URL || 'amqp://user:password@rabbitmq:5672'; +let channel: any; + +// Connect to RabbitMQ +// Connect to RabbitMQ +async function connectToRabbit() { + const amqp = require('amqplib'); + while (true) { + try { + const connection = await amqp.connect(RABBITMQ_URL); + channel = await connection.createChannel(); + await channel.assertQueue('inventory_queue'); + await channel.assertQueue('order_completion_queue'); + + console.log("Connected to RabbitMQ & listening on inventory_queue"); + + channel.consume('inventory_queue', async (msg: any) => { + if (msg !== null) { + const data = JSON.parse(msg.content.toString()); + console.log("Received Async Order via RabbitMQ:", data); + + try { + await deductInventory(data.productId, data.quantity, data.orderId); + + // Send Completion Event + const completionMsg = JSON.stringify({ + orderId: data.orderId, + status: 'COMPLETED', + message: 'Inventory deducted successfully (Async)' + }); + channel.sendToQueue('order_completion_queue', Buffer.from(completionMsg)); + console.log("Sent completion event for:", data.orderId); + + channel.ack(msg); + } catch (e: any) { + console.error("Async Processing Failed:", e.message); + channel.ack(msg); + } + } + }); + break; // Success + } catch (e) { + console.error("RabbitMQ Connection Failed, retrying in 5s...", e); + await new Promise(resolve => setTimeout(resolve, 5000)); + } } +} - try { - // 1. Check Idempotency - const existingLog = await prisma.idempotencyLog.findUnique({ - where: { orderId } - }); +// Logic: Deduct Inventory +async function deductInventory(productId: string, quantity: number, orderId: string) { + const existingLog = await prisma.idempotencyLog.findUnique({ + where: { orderId } + }); - if (existingLog) { - console.log(`Idempotency check: Order ${orderId} already processed.`); - // Return previous success immediately (skip Gremlin this time?) - // If we want to simulate "Vanishing Response" persisting, we might sleep again, - // but to solve the issue, we usually return success fast on retry. - res.status(200).json({ message: 'Stock already deducted (Idempotent)', success: true }); - return; - } + if (existingLog) { + console.log(`Idempotency check: Order ${orderId} already processed.`); + return { message: 'Stock already deducted (Idempotent)', success: true }; + } - // 2. Transaction: Deduct Stock + Log Idempotency - await prisma.$transaction(async (tx) => { - const product = await tx.product.findUnique({ where: { id: productId } }); - if (!product || product.stock < quantity) { - throw new Error('Insufficient stock or product not found'); - } + await prisma.$transaction(async (tx) => { + const product = await tx.product.findUnique({ where: { id: productId } }); + if (!product || product.stock < quantity) { + throw new Error('Insufficient stock or product not found'); + } - await tx.product.update({ - where: { id: productId }, - data: { stock: product.stock - quantity } - }); + await tx.product.update({ + where: { id: productId }, + data: { stock: product.stock - quantity } + }); - await tx.idempotencyLog.create({ - data: { orderId } - }); + await tx.idempotencyLog.create({ + data: { orderId } }); + }); - // 3. Gremlin Latency (The Vanishing Response) - // Deterministic delay: response delays by 5 seconds if orderId ends with 'DELAY' or basically always to force timeout demonstration. - // The requirement says "deterministic pattern". Let's say if quantity is > 5, or just always for now to verify observability. - // Let's make it deterministic based on orderId hash/char. - // If orderId starts with 'GREMLIN', we delay. - // Or simpler: Just delay 3s (Order timeout is 2s). - // But then *all* orders fail. - // Let's only delay if the 'quantity' is 13 (unlucky number). + return { message: 'Stock deducted', success: true }; +} + +// Deduct Inventory Endpoint +app.post('/inventory/deduct', async (req: Request, res: Response) => { + const { productId, quantity, orderId } = req.body; - if (quantity === 13) { + if (!productId || !quantity || !orderId) { + res.status(400).json({ error: 'Missing productId, quantity, or orderId' }); + return; + } + + try { + // Gremlin Latency: Simulate "Not Responding" / High Latency + // This will cause the synchronous caller (Order Service) to timeout. + // Gremlin Latency: Simulate "Not Responding" / High Latency + if (req.body.gremlin === true) { console.log("Gremlin Triggered: Delaying response..."); await new Promise(resolve => setTimeout(resolve, 5000)); } - res.status(200).json({ message: 'Stock deducted', success: true }); + const result = await deductInventory(productId, quantity, orderId); + res.status(200).json(result); } catch (error: any) { console.error("Inventory Error:", error.message); @@ -127,4 +170,5 @@ app.post('/inventory/deduct', async (req: Request, res: Response) => { app.listen(PORT, async () => { console.log(`Inventory Service running on port ${PORT}`); await seedProducts(); + await connectToRabbit(); }); diff --git a/services/order-service/package.json b/services/order-service/package.json index b647252..abd3c72 100644 --- a/services/order-service/package.json +++ b/services/order-service/package.json @@ -17,7 +17,8 @@ "cors": "^2.8.5", "axios": "^1.6.7", "prom-client": "^15.1.0", - "dotenv": "^16.4.5" + "dotenv": "^16.4.5", + "amqplib": "^0.10.3" }, "devDependencies": { "typescript": "^5.3.3", diff --git a/services/order-service/src/index.ts b/services/order-service/src/index.ts index 38cc33c..920e927 100644 --- a/services/order-service/src/index.ts +++ b/services/order-service/src/index.ts @@ -8,6 +8,49 @@ const app = express(); const prisma = new PrismaClient(); const PORT = process.env.PORT || 3001; const INVENTORY_SERVICE_URL = process.env.INVENTORY_SERVICE_URL || 'http://localhost:3002'; +const RABBITMQ_URL = process.env.RABBITMQ_URL || 'amqp://user:password@rabbitmq:5672'; +let channel: any; + +// Connect to RabbitMQ +async function connectToRabbit() { + const amqp = require('amqplib'); + while (true) { + try { + const connection = await amqp.connect(RABBITMQ_URL); + channel = await connection.createChannel(); + await channel.assertQueue('inventory_queue'); + await channel.assertQueue('order_completion_queue'); + console.log("Connected to RabbitMQ (Producer & Consumer)"); + + // Listen for Completion Events + channel.consume('order_completion_queue', async (msg: any) => { + if (msg) { + const data = JSON.parse(msg.content.toString()); + console.log(`Received Async Event: ${data.status} for Order ${data.orderId}`); + + // Update Order Status in DB + try { + const status = (data.status === 'COMPLETED' || data.status === 'FAILED') ? data.status : 'COMPLETED'; + + await prisma.order.update({ + where: { id: data.orderId }, + data: { status: status } + }); + console.log(`Order ${data.orderId} updated to ${status}`); + } catch (e) { + console.error("Failed to update order status:", e); + } + + channel.ack(msg); + } + }); + break; + } catch (e) { + console.error("RabbitMQ Connection Failed, retrying in 5s...", e); + await new Promise(resolve => setTimeout(resolve, 5000)); + } + } +} app.use(cors()); app.use(express.json()); @@ -56,7 +99,7 @@ app.get('/orders', async (req: Request, res: Response) => { // Create Order (with Timeout handling) app.post('/orders', async (req: Request, res: Response) => { - const { productId, quantity } = req.body; + const { productId, quantity, gremlin } = req.body; if (!productId || !quantity) { res.status(400).json({ error: 'Missing productId or quantity' }); @@ -79,7 +122,8 @@ app.post('/orders', async (req: Request, res: Response) => { const inventoryResponse = await axios.post(`${INVENTORY_SERVICE_URL}/inventory/deduct`, { productId, quantity, - orderId: order.id // For Idempotency + orderId: order.id, // For Idempotency + gremlin }, { timeout: 2000 }); @@ -99,10 +143,39 @@ app.post('/orders', async (req: Request, res: Response) => { console.error("Inventory call failed:", error.message); let errorMessage = 'Order failed due to inventory issue'; - if (error.code === 'ECONNABORTED') { - errorMessage = 'Order processing timed out waiting for inventory'; + if (error.code === 'ECONNABORTED' || error.message.includes('timeout')) { + // TIMEOUT DETECTED -> Fallback to Async Queue + console.log(`Order ${order.id} timed out. Queuing for background processing...`); + + try { + if (channel) { + const msg = JSON.stringify({ productId, quantity, orderId: order.id }); + channel.sendToQueue('inventory_queue', Buffer.from(msg)); + + // Update Order to QUEUED + const queuedOrder = await prisma.order.update({ + where: { id: order.id }, + data: { status: 'QUEUED' } + }); + + // Return QUEUED status to user + res.status(202).json({ + message: 'Order timed out, queued for async processing', + status: 'QUEUED', + id: order.id + }); + return; + } else { + console.error("RabbitMQ channel not available"); + throw new Error("Critical: Service Unavailable (Queue Down)"); + } + } catch (queueError) { + console.error("Queueing failed:", queueError); + // Fallthrough to failure + } } + // General Failure // Update to FAILED await prisma.order.update({ where: { id: order.id }, @@ -113,6 +186,7 @@ app.post('/orders', async (req: Request, res: Response) => { } }); -app.listen(PORT, () => { +app.listen(PORT, async () => { console.log(`Order Service running on port ${PORT}`); + await connectToRabbit(); });