Skip to content

Commit 1034696

Browse files
Shukriclaude
authored andcommitted
fix: address high-priority worker, rate limiter, and API key issues
- worker: cap concurrent jobs at 64 with a tokio Semaphore - rate_limit: recover from poisoned mutex instead of panicking - projects: replace UUID v4 with OsRng for API key generation (128-bit CSPRNG, no fixed UUID structure bits) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 1573d4c commit 1034696

4 files changed

Lines changed: 19 additions & 7 deletions

File tree

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ chrono = { version = "0.4", features = ["serde"] }
3232
dotenvy = "0.15"
3333
tracing = "0.1"
3434
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
35-
md5 = "0.7"
35+
md5 = "0.7"
36+
rand = { version = "0.8", features = ["getrandom"] }
3637
lettre = { version = "0.11", features = ["tokio1", "tokio1-native-tls"] }
3738
reqwest = { version = "0.12", features = ["json"] }
3839
futures = "0.3"

src/rate_limit.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ impl RateLimiter {
2323

2424
/// Returns `true` if the request is allowed, `false` if rate-limited.
2525
pub fn check(&self, key: &str) -> bool {
26-
let mut store = self.store.lock().unwrap();
26+
let mut store = self.store.lock().unwrap_or_else(|e| e.into_inner());
2727
let now = Instant::now();
2828
let window = self.window;
2929
let entry = store.entry(key.to_string()).or_default();

src/routes/projects.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use axum::{extract::{Path, State}, Json};
2+
use rand::{rngs::OsRng, RngCore};
23
use serde_json::{json, Value};
34
use uuid::Uuid;
45
use crate::{errors::AppError, AppState};
@@ -35,8 +36,10 @@ pub async fn create_project(
3536

3637
let platform = body["platform"].as_str().unwrap_or("php");
3738

38-
// Generate a unique API key
39-
let api_key = Uuid::new_v4().simple().to_string();
39+
// Generate a cryptographically random API key (32 hex chars = 128 bits from OsRng)
40+
let mut bytes = [0u8; 16];
41+
OsRng.fill_bytes(&mut bytes);
42+
let api_key = bytes.iter().map(|b| format!("{:02x}", b)).collect::<String>();
4043

4144
let project = sqlx::query!(
4245
"INSERT INTO projects (name, api_key, platform) VALUES ($1, $2, $3)

src/worker.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,27 @@
11
use crate::alerts::{fire_alerts, AlertContext};
22
use deadpool_redis::Pool as RedisPool;
33
use sqlx::PgPool;
4+
use std::sync::Arc;
45

56
use crate::queue::{pop_job, EventJob};
6-
use tokio::sync::broadcast;
7+
use tokio::sync::{broadcast, Semaphore};
8+
9+
const MAX_CONCURRENT_JOBS: usize = 64;
710

811
pub async fn run(redis_pool: RedisPool, pg_pool: PgPool, event_tx: broadcast::Sender<String>) {
912
tracing::info!("⚙️ Worker started — listening on queue");
1013

14+
let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_JOBS));
15+
1116
loop {
1217
match pop_job(&redis_pool).await {
1318
Some(job) => {
14-
let pool = pg_pool.clone();
15-
let tx = event_tx.clone();
19+
let pool = pg_pool.clone();
20+
let tx = event_tx.clone();
21+
let permit = semaphore.clone().acquire_owned().await
22+
.expect("semaphore closed");
1623
tokio::spawn(async move {
24+
let _permit = permit; // released when task finishes
1725
if let Err(e) = process(job, &pool, &tx).await {
1826
tracing::error!("Worker failed to process job: {}", e);
1927
}

0 commit comments

Comments
 (0)