diff --git a/README.md b/README.md index 346408c..03c0324 100644 --- a/README.md +++ b/README.md @@ -78,7 +78,7 @@ The heavy lifting runs in Rust: a Tokio async scheduler, OS thread worker pool w - **Job archival** — `queue.archive(older_than=86400)`, `queue.list_archived()` - **Queue pause/resume** — `queue.pause()`, `queue.resume()`, `queue.paused_queues()` - **Circuit breakers** — `circuit_breaker={"threshold": 5, "window": 60, "cooldown": 300}` -- **Structured logging** — `current_job.log("msg", level="info", extra={...})` +- **Structured logging** — `current_job.log("msg", level=LogLevel.INFO, extra={...})` - **CLI** — `taskito worker`, `taskito info --watch`, `taskito dashboard` ## Integrations diff --git a/crates/taskito-core/src/job.rs b/crates/taskito-core/src/job.rs index d62184e..b02df6a 100644 --- a/crates/taskito-core/src/job.rs +++ b/crates/taskito-core/src/job.rs @@ -79,6 +79,12 @@ pub struct Job { pub unique_key: Option, pub progress: Option, pub metadata: Option, + /// Structured, user-readable annotations attached to the job (canonical + /// JSON object, ≤ 15 top-level fields). Validated at the Python + /// boundary by `taskito.notes.validate_and_encode_notes`; stored as the + /// already-encoded JSON string here. + #[serde(default)] + pub notes: Option, pub cancel_requested: bool, pub expires_at: Option, pub result_ttl_ms: Option, @@ -106,6 +112,7 @@ impl From for Job { unique_key: row.unique_key, progress: row.progress, metadata: row.metadata, + notes: row.notes, cancel_requested: row.cancel_requested != 0, expires_at: row.expires_at, result_ttl_ms: row.result_ttl_ms, @@ -125,6 +132,8 @@ pub struct NewJob { pub timeout_ms: i64, pub unique_key: Option, pub metadata: Option, + /// Pre-encoded canonical JSON object (≤ 15 fields). See [`Job::notes`]. + pub notes: Option, pub depends_on: Vec, pub expires_at: Option, pub result_ttl_ms: Option, @@ -153,6 +162,7 @@ impl NewJob { unique_key: self.unique_key, progress: None, metadata: self.metadata, + notes: self.notes, cancel_requested: false, expires_at: self.expires_at, result_ttl_ms: self.result_ttl_ms, diff --git a/crates/taskito-core/src/scheduler/maintenance.rs b/crates/taskito-core/src/scheduler/maintenance.rs index de069f5..afb3f30 100644 --- a/crates/taskito-core/src/scheduler/maintenance.rs +++ b/crates/taskito-core/src/scheduler/maintenance.rs @@ -96,6 +96,7 @@ impl Scheduler { timeout_ms: PERIODIC_DEFAULT_TIMEOUT_MS, unique_key, metadata: None, + notes: None, depends_on: vec![], expires_at: None, result_ttl_ms: None, diff --git a/crates/taskito-core/src/scheduler/mod.rs b/crates/taskito-core/src/scheduler/mod.rs index 328b71d..d96553a 100644 --- a/crates/taskito-core/src/scheduler/mod.rs +++ b/crates/taskito-core/src/scheduler/mod.rs @@ -332,6 +332,7 @@ mod tests { timeout_ms: 300_000, unique_key: None, metadata: None, + notes: None, depends_on: vec![], expires_at: None, result_ttl_ms: None, diff --git a/crates/taskito-core/src/storage/diesel_common/jobs.rs b/crates/taskito-core/src/storage/diesel_common/jobs.rs index 0222f63..f0b6e90 100644 --- a/crates/taskito-core/src/storage/diesel_common/jobs.rs +++ b/crates/taskito-core/src/storage/diesel_common/jobs.rs @@ -103,6 +103,7 @@ macro_rules! impl_diesel_job_ops { timeout_ms: job.timeout_ms, unique_key: job.unique_key.as_deref(), metadata: job.metadata.as_deref(), + notes: job.notes.as_deref(), cancel_requested: 0, expires_at: job.expires_at, result_ttl_ms: job.result_ttl_ms, @@ -160,6 +161,7 @@ macro_rules! impl_diesel_job_ops { timeout_ms: job.timeout_ms, unique_key: job.unique_key.as_deref(), metadata: job.metadata.as_deref(), + notes: job.notes.as_deref(), cancel_requested: 0, expires_at: job.expires_at, result_ttl_ms: job.result_ttl_ms, @@ -214,6 +216,7 @@ macro_rules! impl_diesel_job_ops { timeout_ms: job.timeout_ms, unique_key: job.unique_key.as_deref(), metadata: job.metadata.as_deref(), + notes: job.notes.as_deref(), cancel_requested: 0, expires_at: job.expires_at, result_ttl_ms: job.result_ttl_ms, diff --git a/crates/taskito-core/src/storage/diesel_common/migrations.rs b/crates/taskito-core/src/storage/diesel_common/migrations.rs index 5d95f5b..5f228d5 100644 --- a/crates/taskito-core/src/storage/diesel_common/migrations.rs +++ b/crates/taskito-core/src/storage/diesel_common/migrations.rs @@ -88,6 +88,7 @@ pub fn create_tables(d: &Dialect) -> Vec { unique_key TEXT, progress INTEGER, metadata TEXT, + notes TEXT, cancel_requested INTEGER NOT NULL DEFAULT 0, expires_at {bi}, result_ttl_ms {bi} @@ -104,6 +105,7 @@ pub fn create_tables(d: &Dialect) -> Vec { retry_count INTEGER NOT NULL, failed_at {bi} NOT NULL, metadata TEXT, + notes TEXT, priority INTEGER NOT NULL DEFAULT 0, max_retries INTEGER NOT NULL DEFAULT 3, timeout_ms {bi} NOT NULL DEFAULT 300000, @@ -234,6 +236,7 @@ pub fn create_tables(d: &Dialect) -> Vec { unique_key TEXT, progress INTEGER, metadata TEXT, + notes TEXT, cancel_requested INTEGER NOT NULL DEFAULT 0, expires_at {bi}, result_ttl_ms {bi} @@ -338,5 +341,9 @@ pub fn alter_statements(d: &Dialect) -> Vec { // namespace backfill on dead_letter / archived_jobs (after circuit-breaker columns) format!("ALTER TABLE dead_letter ADD COLUMN {ife}namespace TEXT"), format!("ALTER TABLE archived_jobs ADD COLUMN {ife}namespace TEXT"), + // structured notes (≤15 fields, validated at the Python boundary) + format!("ALTER TABLE jobs ADD COLUMN {ife}notes TEXT"), + format!("ALTER TABLE dead_letter ADD COLUMN {ife}notes TEXT"), + format!("ALTER TABLE archived_jobs ADD COLUMN {ife}notes TEXT"), ] } diff --git a/crates/taskito-core/src/storage/mod.rs b/crates/taskito-core/src/storage/mod.rs index fb52d36..d37d5e7 100644 --- a/crates/taskito-core/src/storage/mod.rs +++ b/crates/taskito-core/src/storage/mod.rs @@ -44,6 +44,7 @@ pub struct DeadJob { pub retry_count: i32, pub failed_at: i64, pub metadata: Option, + pub notes: Option, pub priority: i32, pub max_retries: i32, pub timeout_ms: i64, @@ -63,6 +64,7 @@ impl From for DeadJob { retry_count: row.retry_count, failed_at: row.failed_at, metadata: row.metadata, + notes: row.notes, priority: row.priority, max_retries: row.max_retries, timeout_ms: row.timeout_ms, diff --git a/crates/taskito-core/src/storage/models.rs b/crates/taskito-core/src/storage/models.rs index 0b6fa02..e0ecd80 100644 --- a/crates/taskito-core/src/storage/models.rs +++ b/crates/taskito-core/src/storage/models.rs @@ -29,6 +29,7 @@ pub struct JobRow { pub unique_key: Option, pub progress: Option, pub metadata: Option, + pub notes: Option, pub cancel_requested: i32, pub expires_at: Option, pub result_ttl_ms: Option, @@ -52,6 +53,7 @@ pub struct NewJobRow<'a> { pub timeout_ms: i64, pub unique_key: Option<&'a str>, pub metadata: Option<&'a str>, + pub notes: Option<&'a str>, pub cancel_requested: i32, pub expires_at: Option, pub result_ttl_ms: Option, @@ -71,6 +73,7 @@ pub struct DeadLetterRow { pub retry_count: i32, pub failed_at: i64, pub metadata: Option, + pub notes: Option, pub priority: i32, pub max_retries: i32, pub timeout_ms: i64, @@ -91,6 +94,7 @@ pub struct NewDeadLetterRow<'a> { pub retry_count: i32, pub failed_at: i64, pub metadata: Option<&'a str>, + pub notes: Option<&'a str>, pub priority: i32, pub max_retries: i32, pub timeout_ms: i64, @@ -411,6 +415,7 @@ pub struct ArchivedJobRow { pub unique_key: Option, pub progress: Option, pub metadata: Option, + pub notes: Option, pub cancel_requested: i32, pub expires_at: Option, pub result_ttl_ms: Option, diff --git a/crates/taskito-core/src/storage/postgres/archival.rs b/crates/taskito-core/src/storage/postgres/archival.rs index 747059e..241f303 100644 --- a/crates/taskito-core/src/storage/postgres/archival.rs +++ b/crates/taskito-core/src/storage/postgres/archival.rs @@ -67,6 +67,7 @@ impl PostgresStorage { unique_key: row.unique_key, progress: row.progress, metadata: row.metadata, + notes: row.notes, cancel_requested: row.cancel_requested != 0, expires_at: row.expires_at, result_ttl_ms: row.result_ttl_ms, diff --git a/crates/taskito-core/src/storage/postgres/dead_letter.rs b/crates/taskito-core/src/storage/postgres/dead_letter.rs index 0069569..1c12226 100644 --- a/crates/taskito-core/src/storage/postgres/dead_letter.rs +++ b/crates/taskito-core/src/storage/postgres/dead_letter.rs @@ -26,6 +26,7 @@ impl PostgresStorage { retry_count: job.retry_count, failed_at: now, metadata, + notes: job.notes.as_deref(), priority: job.priority, max_retries: job.max_retries, timeout_ms: job.timeout_ms, @@ -92,6 +93,7 @@ impl PostgresStorage { timeout_ms: dead_row.timeout_ms, unique_key: None, metadata: dead_row.metadata, + notes: dead_row.notes, depends_on: vec![], expires_at: None, result_ttl_ms: dead_row.result_ttl_ms, @@ -115,6 +117,7 @@ impl PostgresStorage { timeout_ms: job.timeout_ms, unique_key: job.unique_key.as_deref(), metadata: job.metadata.as_deref(), + notes: job.notes.as_deref(), cancel_requested: 0, expires_at: job.expires_at, result_ttl_ms: job.result_ttl_ms, diff --git a/crates/taskito-core/src/storage/redis_backend/dead_letter.rs b/crates/taskito-core/src/storage/redis_backend/dead_letter.rs index 0dd23ec..acd2307 100644 --- a/crates/taskito-core/src/storage/redis_backend/dead_letter.rs +++ b/crates/taskito-core/src/storage/redis_backend/dead_letter.rs @@ -17,6 +17,8 @@ struct DeadJobEntry { pub retry_count: i32, pub failed_at: i64, pub metadata: Option, + #[serde(default)] + pub notes: Option, pub priority: i32, pub max_retries: i32, pub timeout_ms: i64, @@ -37,6 +39,7 @@ impl From for DeadJob { retry_count: e.retry_count, failed_at: e.failed_at, metadata: e.metadata, + notes: e.notes, priority: e.priority, max_retries: e.max_retries, timeout_ms: e.timeout_ms, @@ -62,6 +65,7 @@ impl RedisStorage { retry_count: job.retry_count, failed_at: now, metadata: metadata.map(|s| s.to_string()), + notes: job.notes.clone(), priority: job.priority, max_retries: job.max_retries, timeout_ms: job.timeout_ms, @@ -146,6 +150,7 @@ impl RedisStorage { timeout_ms: entry.timeout_ms, unique_key: None, metadata: entry.metadata, + notes: entry.notes, depends_on: vec![], expires_at: None, result_ttl_ms: entry.result_ttl_ms, diff --git a/crates/taskito-core/src/storage/schema.rs b/crates/taskito-core/src/storage/schema.rs index 3ecdf11..12be5da 100644 --- a/crates/taskito-core/src/storage/schema.rs +++ b/crates/taskito-core/src/storage/schema.rs @@ -18,6 +18,7 @@ diesel::table! { unique_key -> Nullable, progress -> Nullable, metadata -> Nullable, + notes -> Nullable, cancel_requested -> Integer, expires_at -> Nullable, result_ttl_ms -> Nullable, @@ -36,6 +37,7 @@ diesel::table! { retry_count -> Integer, failed_at -> BigInt, metadata -> Nullable, + notes -> Nullable, priority -> Integer, max_retries -> Integer, timeout_ms -> BigInt, @@ -188,6 +190,7 @@ diesel::table! { unique_key -> Nullable, progress -> Nullable, metadata -> Nullable, + notes -> Nullable, cancel_requested -> Integer, expires_at -> Nullable, result_ttl_ms -> Nullable, diff --git a/crates/taskito-core/src/storage/sqlite/archival.rs b/crates/taskito-core/src/storage/sqlite/archival.rs index 154f86b..b198ae5 100644 --- a/crates/taskito-core/src/storage/sqlite/archival.rs +++ b/crates/taskito-core/src/storage/sqlite/archival.rs @@ -70,6 +70,7 @@ impl SqliteStorage { unique_key: row.unique_key, progress: row.progress, metadata: row.metadata, + notes: row.notes, cancel_requested: row.cancel_requested != 0, expires_at: row.expires_at, result_ttl_ms: row.result_ttl_ms, diff --git a/crates/taskito-core/src/storage/sqlite/dead_letter.rs b/crates/taskito-core/src/storage/sqlite/dead_letter.rs index 3cabf03..6e3a3c3 100644 --- a/crates/taskito-core/src/storage/sqlite/dead_letter.rs +++ b/crates/taskito-core/src/storage/sqlite/dead_letter.rs @@ -26,6 +26,7 @@ impl SqliteStorage { retry_count: job.retry_count, failed_at: now, metadata, + notes: job.notes.as_deref(), priority: job.priority, max_retries: job.max_retries, timeout_ms: job.timeout_ms, @@ -96,6 +97,7 @@ impl SqliteStorage { timeout_ms: dead_row.timeout_ms, unique_key: None, metadata: dead_row.metadata, + notes: dead_row.notes, depends_on: vec![], expires_at: None, result_ttl_ms: dead_row.result_ttl_ms, @@ -119,6 +121,7 @@ impl SqliteStorage { timeout_ms: job.timeout_ms, unique_key: job.unique_key.as_deref(), metadata: job.metadata.as_deref(), + notes: job.notes.as_deref(), cancel_requested: 0, expires_at: job.expires_at, result_ttl_ms: job.result_ttl_ms, diff --git a/crates/taskito-core/src/storage/sqlite/tests.rs b/crates/taskito-core/src/storage/sqlite/tests.rs index 3959d57..b9cd35f 100644 --- a/crates/taskito-core/src/storage/sqlite/tests.rs +++ b/crates/taskito-core/src/storage/sqlite/tests.rs @@ -16,6 +16,7 @@ fn make_job(task_name: &str) -> NewJob { timeout_ms: 300_000, unique_key: None, metadata: None, + notes: None, depends_on: vec![], expires_at: None, result_ttl_ms: None, @@ -33,6 +34,51 @@ fn test_enqueue_and_get() { assert_eq!(fetched.status, JobStatus::Pending); } +#[test] +fn test_notes_round_trip() { + let storage = test_storage(); + let mut new_job = make_job("notes_task"); + new_job.notes = Some(r#"{"customer_id":"cus_abc","tier":"gold"}"#.to_string()); + + let job = storage.enqueue(new_job).unwrap(); + let fetched = storage.get_job(&job.id).unwrap().unwrap(); + assert_eq!( + fetched.notes.as_deref(), + Some(r#"{"customer_id":"cus_abc","tier":"gold"}"#) + ); + + // Absence round-trips as None. + let plain = storage.enqueue(make_job("plain_task")).unwrap(); + let plain_fetched = storage.get_job(&plain.id).unwrap().unwrap(); + assert!(plain_fetched.notes.is_none()); +} + +#[test] +fn test_notes_survive_dlq_round_trip() { + let storage = test_storage(); + let mut new_job = make_job("dlq_notes_task"); + new_job.notes = Some(r#"{"customer_id":"cus_xyz"}"#.to_string()); + let job = storage.enqueue(new_job).unwrap(); + + storage + .move_to_dlq(&job, "boom", None) + .expect("move_to_dlq"); + + let dead = storage.list_dead(10, 0).unwrap(); + let entry = dead + .iter() + .find(|d| d.original_job_id == job.id) + .expect("dead entry"); + assert_eq!(entry.notes.as_deref(), Some(r#"{"customer_id":"cus_xyz"}"#)); + + let new_id = storage.retry_dead(&entry.id).expect("retry_dead"); + let retried = storage.get_job(&new_id).unwrap().unwrap(); + assert_eq!( + retried.notes.as_deref(), + Some(r#"{"customer_id":"cus_xyz"}"#) + ); +} + #[test] fn test_dequeue() { let storage = test_storage(); diff --git a/crates/taskito-core/tests/rust/storage_tests.rs b/crates/taskito-core/tests/rust/storage_tests.rs index 8fce93b..b689c14 100644 --- a/crates/taskito-core/tests/rust/storage_tests.rs +++ b/crates/taskito-core/tests/rust/storage_tests.rs @@ -22,6 +22,7 @@ fn make_job(queue: &str, task_name: &str) -> NewJob { timeout_ms: 300_000, unique_key: None, metadata: None, + notes: None, depends_on: vec![], expires_at: None, result_ttl_ms: None, diff --git a/crates/taskito-python/src/py_job.rs b/crates/taskito-python/src/py_job.rs index c311302..bc362b4 100644 --- a/crates/taskito-python/src/py_job.rs +++ b/crates/taskito-python/src/py_job.rs @@ -36,6 +36,10 @@ pub struct PyJob { pub progress: Option, #[pyo3(get)] pub metadata: Option, + /// Pre-encoded canonical JSON for structured notes (≤ 15 fields). + /// Python wraps this with `json.loads` via `JobResult.notes`. + #[pyo3(get)] + pub notes: Option, #[pyo3(get)] pub namespace: Option, @@ -86,6 +90,7 @@ impl From for PyJob { unique_key: job.unique_key, progress: job.progress, metadata: job.metadata, + notes: job.notes, namespace: job.namespace, status_val: job.status as i32, result_bytes: job.result, diff --git a/crates/taskito-python/src/py_queue/inspection.rs b/crates/taskito-python/src/py_queue/inspection.rs index 5d1b46a..df387af 100644 --- a/crates/taskito-python/src/py_queue/inspection.rs +++ b/crates/taskito-python/src/py_queue/inspection.rs @@ -200,6 +200,7 @@ impl PyQueue { timeout_ms: original.timeout_ms, unique_key: None, metadata: Some(format!("{{\"replayed_from\":\"{job_id}\"}}")), + notes: original.notes, depends_on: vec![], expires_at: None, result_ttl_ms: original.result_ttl_ms, diff --git a/crates/taskito-python/src/py_queue/mod.rs b/crates/taskito-python/src/py_queue/mod.rs index 124b085..8da355e 100644 --- a/crates/taskito-python/src/py_queue/mod.rs +++ b/crates/taskito-python/src/py_queue/mod.rs @@ -164,7 +164,8 @@ impl PyQueue { } /// Enqueue a job. - #[pyo3(signature = (task_name, payload, queue="default", priority=None, delay_seconds=None, max_retries=None, timeout=None, unique_key=None, metadata=None, depends_on=None, expires=None, result_ttl=None))] + #[pyo3(signature = (task_name, payload, queue="default", priority=None, delay_seconds=None, max_retries=None, timeout=None, unique_key=None, metadata=None, notes=None, depends_on=None, expires=None, result_ttl=None))] + #[allow(clippy::too_many_arguments)] pub fn enqueue( &self, task_name: &str, @@ -176,6 +177,7 @@ impl PyQueue { timeout: Option, unique_key: Option, metadata: Option, + notes: Option, depends_on: Option>, expires: Option, result_ttl: Option, @@ -244,6 +246,7 @@ impl PyQueue { timeout_ms, unique_key: unique_key.clone(), metadata, + notes, depends_on: depends_on.unwrap_or_default(), expires_at, result_ttl_ms, @@ -261,7 +264,7 @@ impl PyQueue { } /// Enqueue multiple jobs in a single transaction. - #[pyo3(signature = (task_names, payloads, queues=None, priorities=None, max_retries_list=None, timeouts=None, delay_seconds_list=None, unique_keys=None, metadata_list=None, expires_list=None, result_ttl_list=None))] + #[pyo3(signature = (task_names, payloads, queues=None, priorities=None, max_retries_list=None, timeouts=None, delay_seconds_list=None, unique_keys=None, metadata_list=None, notes_list=None, expires_list=None, result_ttl_list=None))] #[allow(clippy::too_many_arguments)] pub fn enqueue_batch( &self, @@ -274,6 +277,7 @@ impl PyQueue { delay_seconds_list: Option>>, unique_keys: Option>>, metadata_list: Option>>, + notes_list: Option>>, expires_list: Option>>, result_ttl_list: Option>>, ) -> PyResult> { @@ -346,6 +350,9 @@ impl PyQueue { metadata: metadata_list .as_ref() .and_then(|m| m.get(i).cloned().flatten()), + notes: notes_list + .as_ref() + .and_then(|n| n.get(i).cloned().flatten()), depends_on: vec![], expires_at, result_ttl_ms, diff --git a/crates/taskito-python/src/py_queue/workflow_ops/fan_out.rs b/crates/taskito-python/src/py_queue/workflow_ops/fan_out.rs index 8703224..b646e1e 100644 --- a/crates/taskito-python/src/py_queue/workflow_ops/fan_out.rs +++ b/crates/taskito-python/src/py_queue/workflow_ops/fan_out.rs @@ -77,6 +77,7 @@ impl PyQueue { timeout_ms, unique_key: None, metadata: Some(build_metadata_json(&run_id_owned, child_name)), + notes: None, depends_on: vec![], expires_at: None, result_ttl_ms: self.result_ttl_ms, @@ -144,6 +145,7 @@ impl PyQueue { timeout_ms, unique_key: None, metadata: Some(build_metadata_json(&run_id_owned, &node_name_owned)), + notes: None, depends_on: vec![], expires_at: None, result_ttl_ms: self.result_ttl_ms, diff --git a/crates/taskito-python/src/py_queue/workflow_ops/lifecycle.rs b/crates/taskito-python/src/py_queue/workflow_ops/lifecycle.rs index d1d88cb..56b771d 100644 --- a/crates/taskito-python/src/py_queue/workflow_ops/lifecycle.rs +++ b/crates/taskito-python/src/py_queue/workflow_ops/lifecycle.rs @@ -183,6 +183,7 @@ impl PyQueue { timeout_ms, unique_key: None, metadata: Some(build_metadata_json(&run_id, &topo.name)), + notes: None, depends_on, expires_at: None, result_ttl_ms: self.result_ttl_ms, diff --git a/crates/taskito-python/src/py_queue/workflow_ops/mod.rs b/crates/taskito-python/src/py_queue/workflow_ops/mod.rs index cf5e69e..9c63693 100644 --- a/crates/taskito-python/src/py_queue/workflow_ops/mod.rs +++ b/crates/taskito-python/src/py_queue/workflow_ops/mod.rs @@ -153,6 +153,7 @@ mod tests { timeout_ms: 300_000, unique_key: None, metadata: None, + notes: None, depends_on: vec![], expires_at: None, result_ttl_ms: None, diff --git a/dashboard/src/features/jobs/components/job-overview-tab.tsx b/dashboard/src/features/jobs/components/job-overview-tab.tsx index adcba87..203163a 100644 --- a/dashboard/src/features/jobs/components/job-overview-tab.tsx +++ b/dashboard/src/features/jobs/components/job-overview-tab.tsx @@ -91,6 +91,8 @@ export function JobOverviewTab({ job }: JobOverviewTabProps) { ) : null} + + {job.error ? ( @@ -137,6 +139,67 @@ function tryPrettyJson(raw: string): string { } } +/** + * Render the structured ``notes`` blob as a fixed-size key/value table. + * + * The contract on the server side caps notes at 15 top-level keys, so this + * always fits in a card. Non-string leaves are stringified via + * ``JSON.stringify`` to keep the renderer total. If the raw payload + * can't be parsed (e.g. the column was tampered with out of band), we + * fall back to the raw text in a `
` so the operator can still see
+ * something useful.
+ */
+function NotesCard({ raw }: { raw: string | null }) {
+  if (!raw) return null;
+
+  let parsed: unknown;
+  try {
+    parsed = JSON.parse(raw);
+  } catch {
+    return (
+      
+        
+          Notes
+        
+        
+          
+            {raw}
+          
+
+
+ ); + } + + if ( + parsed === null || + typeof parsed !== "object" || + Array.isArray(parsed) || + Object.keys(parsed).length === 0 + ) { + return null; + } + + const entries = Object.entries(parsed as Record); + return ( + + + Notes + + +
+ {entries.map(([key, value]) => ( + + + {typeof value === "string" ? value : JSON.stringify(value)} + + + ))} +
+
+
+ ); +} + /** * Render configured integration shortcuts (Grafana / Sentry / OTel) for * the given job. Each URL may contain a ``{job_id}`` placeholder; if it diff --git a/dashboard/src/lib/api-types.ts b/dashboard/src/lib/api-types.ts index a1904ae..a2a246d 100644 --- a/dashboard/src/lib/api-types.ts +++ b/dashboard/src/lib/api-types.ts @@ -56,6 +56,12 @@ export interface Job { error: string | null; unique_key: string | null; metadata: string | null; + /** + * Structured notes JSON string (canonical encoding, ≤ 15 top-level fields). + * Parse with `JSON.parse` for a dict suitable for key/value rendering. + * `null` when no notes were attached at enqueue time. + */ + notes: string | null; } export interface JobError { diff --git a/docs/content/docs/api-reference/context.mdx b/docs/content/docs/api-reference/context.mdx index c4373bc..0a4487d 100644 --- a/docs/content/docs/api-reference/context.mdx +++ b/docs/content/docs/api-reference/context.mdx @@ -120,7 +120,7 @@ Publish a partial result visible to intermediate data from long-running tasks. `data` must be JSON-serializable. It is stored as a task log entry with -`level="result"`, distinguishing it from regular logs. +level `LogLevel.RESULT`, distinguishing it from regular logs. ```python @queue.task() diff --git a/docs/content/docs/api-reference/queue/index.mdx b/docs/content/docs/api-reference/queue/index.mdx index 23e58b4..f737637 100644 --- a/docs/content/docs/api-reference/queue/index.mdx +++ b/docs/content/docs/api-reference/queue/index.mdx @@ -153,6 +153,7 @@ queue.enqueue( timeout: int | None = None, unique_key: str | None = None, metadata: str | None = None, + notes: dict[str, Any] | None = None, depends_on: str | list[str] | None = None, ) -> JobResult ``` @@ -161,6 +162,8 @@ Enqueue a task for execution. Returns a [`JobResult`](/api-reference/result) han | Parameter | Type | Default | Description | |---|---|---|---| +| `metadata` | `str \| None` | `None` | Free-form JSON string blob. No size or shape constraint. | +| `notes` | `dict \| None` | `None` | Structured annotations, max **15** top-level keys. See [Structured notes](/guides/observability/notes). | | `depends_on` | `str \| list[str] \| None` | `None` | Job ID(s) this job depends on. See [Dependencies](/guides/advanced-execution). | ### `queue.enqueue_many()` @@ -179,6 +182,8 @@ queue.enqueue_many( unique_keys: list[str | None] | None = None, metadata: str | None = None, metadata_list: list[str | None] | None = None, + notes: dict[str, Any] | None = None, + notes_list: list[dict[str, Any] | None] | None = None, expires: float | None = None, expires_list: list[float | None] | None = None, result_ttl: int | None = None, @@ -196,6 +201,8 @@ uniform parameters (applied to all jobs) and per-job lists. | `unique_keys` | `list[str \| None] \| None` | `None` | Per-job deduplication keys | | `metadata` | `str \| None` | `None` | Uniform metadata JSON for all jobs | | `metadata_list` | `list[str \| None] \| None` | `None` | Per-job metadata JSON | +| `notes` | `dict \| None` | `None` | Uniform [structured notes](/guides/observability/notes) for all jobs (≤ 15 keys) | +| `notes_list` | `list[dict \| None] \| None` | `None` | Per-job structured notes (takes precedence over `notes`) | | `expires` | `float \| None` | `None` | Uniform expiry in seconds for all jobs | | `expires_list` | `list[float \| None] \| None` | `None` | Per-job expiry in seconds | | `result_ttl` | `int \| None` | `None` | Uniform result TTL in seconds | diff --git a/docs/content/docs/guides/advanced-execution/streaming.mdx b/docs/content/docs/guides/advanced-execution/streaming.mdx index 29a818a..9b4c87c 100644 --- a/docs/content/docs/guides/advanced-execution/streaming.mdx +++ b/docs/content/docs/guides/advanced-execution/streaming.mdx @@ -127,9 +127,11 @@ def process_orders(order_ids): ## How it works -`publish()` stores data as a task log entry with `level="result"`, reusing -the existing `task_logs` table. No new tables or Rust changes are needed. - -`stream()` polls `get_task_logs(job_id)`, filters for `level == "result"`, -tracks the last-seen timestamp, and yields only new entries. It stops when -the job's status becomes terminal. +`publish()` stores data as a task log entry with level `LogLevel.RESULT`, +reusing the existing `task_logs` table. No new tables or Rust changes are +needed. + +`stream()` polls `get_task_logs(job_id)`, filters for entries whose level +matches `LogLevel.RESULT` (stored as `"result"`), tracks the last-seen +timestamp, and yields only new entries. It stops when the job's status +becomes terminal. diff --git a/docs/content/docs/guides/observability/logging.mdx b/docs/content/docs/guides/observability/logging.mdx index 183bc7c..8faa4e9 100644 --- a/docs/content/docs/guides/observability/logging.mdx +++ b/docs/content/docs/guides/observability/logging.mdx @@ -9,23 +9,28 @@ making them queryable and visible in the dashboard. ## Writing logs -Use `current_job.log()` inside any task: +Use `current_job.log()` inside any task. Levels are members of the +`LogLevel` enum — `DEBUG`, `INFO`, `WARNING`, `ERROR`, `CRITICAL`, `RESULT`: ```python -from taskito import current_job +from taskito import LogLevel, current_job @queue.task() def process_order(order_id: int): current_job.log("Starting order processing", extra={"order_id": order_id}) items = fetch_items(order_id) - current_job.log(f"Found {len(items)} items", level="debug") + current_job.log(f"Found {len(items)} items", level=LogLevel.DEBUG) for item in items: try: process_item(item) except ValueError as e: - current_job.log(f"Skipping invalid item: {e}", level="warning", extra={"item": item}) + current_job.log( + f"Skipping invalid item: {e}", + level=LogLevel.WARNING, + extra={"item": item}, + ) current_job.log("Order processing complete") ``` @@ -35,7 +40,7 @@ def process_order(order_id: int): | Parameter | Type | Default | Description | |-----------|------|---------|-------------| | `message` | `str` | *required* | The log message | -| `level` | `str` | `"info"` | Log level: `"debug"`, `"info"`, `"warning"`, `"error"` | +| `level` | `LogLevel` | `LogLevel.INFO` | One of `LogLevel.DEBUG`, `LogLevel.INFO`, `LogLevel.WARNING`, `LogLevel.ERROR`, `LogLevel.CRITICAL`, `LogLevel.RESULT` | | `extra` | `dict \| None` | `None` | Structured data to attach as JSON | ## Querying logs @@ -51,9 +56,11 @@ for log in logs: ### Cross-job log query ```python +from taskito import LogLevel + logs = queue.query_logs( task_name="myapp.tasks.process_order", - level="error", + level=LogLevel.ERROR, since=1700000000, limit=50, ) @@ -62,7 +69,7 @@ logs = queue.query_logs( | Parameter | Type | Description | |-----------|------|-------------| | `task_name` | `str \| None` | Filter by task name | -| `level` | `str \| None` | Filter by log level | +| `level` | `LogLevel \| None` | Filter by log level | | `since` | `int \| None` | Unix timestamp — only logs after this time | | `limit` | `int` | Maximum number of logs to return | @@ -86,14 +93,14 @@ curl http://localhost:8080/api/logs?limit=20 ### ETL pipeline with progress logging ```python -from taskito import current_job +from taskito import LogLevel, current_job @queue.task() def etl_pipeline(source: str, destination: str): current_job.log("Starting extraction", extra={"source": source}) records = extract(source) - current_job.log(f"Extracted {len(records)} records", level="info") + current_job.log(f"Extracted {len(records)} records", level=LogLevel.INFO) current_job.update_progress(33) transformed = [] @@ -103,7 +110,7 @@ def etl_pipeline(source: str, destination: str): except Exception as e: current_job.log( f"Transform failed for record {i}", - level="warning", + level=LogLevel.WARNING, extra={"record_id": record.get("id"), "error": str(e)}, ) current_job.update_progress(66) diff --git a/docs/content/docs/guides/observability/meta.json b/docs/content/docs/guides/observability/meta.json index d62f59c..474b374 100644 --- a/docs/content/docs/guides/observability/meta.json +++ b/docs/content/docs/guides/observability/meta.json @@ -1,4 +1,4 @@ { "title": "Observability", - "pages": ["monitoring", "logging", "dashboard", "dashboard-api"] + "pages": ["monitoring", "logging", "notes", "dashboard", "dashboard-api"] } diff --git a/docs/content/docs/guides/observability/notes.mdx b/docs/content/docs/guides/observability/notes.mdx new file mode 100644 index 0000000..70fd35a --- /dev/null +++ b/docs/content/docs/guides/observability/notes.mdx @@ -0,0 +1,120 @@ +--- +title: Structured Notes +description: "Attach a bounded dict of annotations to a job at enqueue time — capped at 15 fields, dashboard-rendered." +--- + +`notes` is a structured annotation field on every job. It's a small dict +(at most **15** top-level keys) that you attach when enqueuing and read +back from `JobResult.notes`. Unlike `metadata` — which is a free-form +JSON string blob — `notes` is validated at the Python boundary and +rendered by the dashboard as a key/value table. + +## When to use `notes` vs `metadata` + +| | `notes` | `metadata` | +|---|---|---| +| Type at enqueue | `dict[str, Any]` | `str` (pre-encoded JSON) | +| Top-level fields | ≤ 15 | unbounded | +| Validation | At the Python boundary | None | +| Dashboard render | Key/value table | Raw JSON dump | +| Survives DLQ retry | Yes | Yes | +| Best for | User-visible annotations | Operational/debug context | + +Use **`notes`** for short, user-readable annotations a human or +dashboard would want to scan: customer IDs, business priority reasons, +short comments. Use **`metadata`** when you need to attach an opaque +JSON blob without size constraints (trace IDs, request envelopes, etc.). + +## Writing notes + +Pass a dict to `notes=` on any enqueue call: + +```python +from taskito import Queue + +queue = Queue() + +@queue.task() +def process_order(order_id: int) -> None: ... + +job = process_order.apply_async( + args=(42,), + notes={ + "customer_id": "cus_abc", + "tier": "gold", + "priority_reason": "VIP onboarding", + }, +) +``` + +The same kwarg is accepted on `Queue.enqueue()`, `TaskWrapper.apply_async()`, +and `Queue.enqueue_many()` (both uniform `notes=` and per-job `notes_list=`). + +## Reading notes back + +```python +result = process_order.apply_async(args=(42,), notes={"customer_id": "cus_abc"}) +result.refresh() +print(result.notes) # {'customer_id': 'cus_abc'} +``` + +`JobResult.notes` returns the parsed dict (or `None` if no notes were +attached). The raw stored JSON string is also available on the underlying +`PyJob` via `result._py_job.notes` if you need it untouched. + +## Validation rules + +Validation runs at the Python boundary; the Rust storage layer receives an +already-encoded JSON string and stores it verbatim. The exact contract is +defined in [`taskito.notes`](https://github.com/your-org/taskito/blob/master/py_src/taskito/notes.py): + +| Constraint | Default | Constant | +|---|---|---| +| Max top-level fields | 15 | `MAX_NOTE_FIELDS` | +| Max key length | 64 chars | `MAX_NOTE_KEY_LENGTH` | +| Max string value length | 500 chars | `MAX_NOTE_VALUE_LENGTH` | +| Max nesting depth | 3 | `MAX_NOTE_DEPTH` | +| Max encoded size | 4096 bytes | `MAX_NOTE_BYTES` | + +Values may be any JSON-serializable primitive (`str`, `int`, `float`, +`bool`, `None`), plus `list` or `dict` within the depth cap. + +Violations raise [`NotesValidationError`](#errors), a subclass of both +`TaskitoError` and `ValueError`, so existing `except ValueError` handlers +keep working: + +```python +from taskito import NotesValidationError + +try: + process_order.apply_async(args=(42,), notes={f"k{i}": i for i in range(20)}) +except NotesValidationError as e: + print(e) # notes may not have more than 15 fields, got 20 +``` + +## Dashboard rendering + +Notes are surfaced on the job detail page as a fixed-size key/value +table next to the Metadata card. Because the field is capped at 15 +entries, the table is always small enough to scan without scrolling. + +## Cross-backend behavior + +The `notes` column lives on the `jobs`, `dead_letter`, and `archived_jobs` +tables on SQLite and PostgreSQL, and rides along with the job's JSON +representation on Redis. Notes survive: + +- DLQ moves and DLQ retries (the original notes are restored on the + re-enqueued job). +- Job replay via `queue.replay_job(...)`. +- Job archival. + +## Errors + +```python +from taskito import NotesValidationError +``` + +Subclasses `TaskitoError` and `ValueError`. The exception message names +the offending key or constraint so it can be surfaced directly to end +users. diff --git a/py_src/taskito/__init__.py b/py_src/taskito/__init__.py index 0ced0ac..3779223 100644 --- a/py_src/taskito/__init__.py +++ b/py_src/taskito/__init__.py @@ -2,13 +2,14 @@ from taskito.app import Queue from taskito.canvas import Signature, chain, chord, chunks, group, starmap -from taskito.context import current_job +from taskito.context import LogLevel, current_job from taskito.events import EventType from taskito.exceptions import ( CircuitBreakerOpenError, CircularDependencyError, JobNotFoundError, MaxRetriesExceededError, + NotesValidationError, PredicateRejectedError, ProxyCleanupError, ProxyReconstructionError, @@ -29,6 +30,7 @@ from taskito.interception import InterceptionError, InterceptionReport from taskito.log_config import configure as configure_logging from taskito.middleware import TaskMiddleware +from taskito.notes import MAX_NOTE_FIELDS from taskito.proxies.no_proxy import NoProxy from taskito.result import JobResult from taskito.serializers import ( @@ -42,6 +44,7 @@ from taskito.testing import MockResource, TestMode, TestResult, TestResults __all__ = [ + "MAX_NOTE_FIELDS", "CircuitBreakerOpenError", "CircularDependencyError", "CloudpickleSerializer", @@ -53,10 +56,12 @@ "JobNotFoundError", "JobResult", "JsonSerializer", + "LogLevel", "MaxRetriesExceededError", "MockResource", "MsgPackSerializer", "NoProxy", + "NotesValidationError", "PredicateRejectedError", "ProxyCleanupError", "ProxyReconstructionError", diff --git a/py_src/taskito/_taskito.pyi b/py_src/taskito/_taskito.pyi index 71c636d..7822c41 100644 --- a/py_src/taskito/_taskito.pyi +++ b/py_src/taskito/_taskito.pyi @@ -62,6 +62,7 @@ class PyJob: unique_key: str | None progress: int | None metadata: str | None + notes: str | None namespace: str | None @property @@ -102,6 +103,7 @@ class PyQueue: timeout: int | None = None, unique_key: str | None = None, metadata: str | None = None, + notes: str | None = None, depends_on: list[str] | None = None, expires: float | None = None, result_ttl: int | None = None, @@ -117,6 +119,7 @@ class PyQueue: delay_seconds_list: list[float | None] | None = None, unique_keys: list[str | None] | None = None, metadata_list: list[str | None] | None = None, + notes_list: list[str | None] | None = None, expires_list: list[float | None] | None = None, result_ttl_list: list[int | None] | None = None, ) -> list[PyJob]: ... diff --git a/py_src/taskito/app.py b/py_src/taskito/app.py index 625377d..8f87ff7 100644 --- a/py_src/taskito/app.py +++ b/py_src/taskito/app.py @@ -44,6 +44,7 @@ QueueRuntimeConfigMixin, QueueSettingsMixin, ) +from taskito.notes import validate_and_encode_notes from taskito.proxies import ProxyRegistry from taskito.proxies.built_in import register_builtin_handlers from taskito.proxies.metrics import ProxyMetrics @@ -318,6 +319,7 @@ def enqueue( timeout: int | None = None, unique_key: str | None = None, metadata: str | None = None, + notes: dict[str, Any] | None = None, depends_on: str | list[str] | None = None, expires: float | None = None, result_ttl: int | None = None, @@ -338,6 +340,10 @@ def enqueue( unique_key: Deduplication key (alias of ``idempotency_key`` — wins if both are set, kept for backwards compatibility). metadata: Arbitrary JSON string to attach to the job. + notes: Structured annotations dict (≤ 15 top-level fields). See + :mod:`taskito.notes` for the validation contract. Use this + instead of ``metadata`` when you want dashboard-renderable + key/value annotations with bounded size. depends_on: Job ID or list of job IDs that must complete first. expires: Seconds until the job expires (skipped if not started by then). result_ttl: Per-job result TTL in seconds. Overrides global result_ttl. @@ -361,6 +367,7 @@ def enqueue( "timeout": timeout, "unique_key": unique_key, "metadata": metadata, + "notes": notes, "depends_on": depends_on, "expires": expires, "result_ttl": result_ttl, @@ -383,12 +390,17 @@ def enqueue( timeout = enqueue_options.get("timeout") unique_key = enqueue_options.get("unique_key") metadata = enqueue_options.get("metadata") + notes = enqueue_options.get("notes") depends_on = enqueue_options.get("depends_on") expires = enqueue_options.get("expires") result_ttl = enqueue_options.get("result_ttl") idempotency_key = enqueue_options.get("idempotency_key") idempotent = enqueue_options.get("idempotent") + # Validation runs *after* middleware so a mutating hook still gets + # the chance to reshape notes before we reject them. + notes_encoded = validate_and_encode_notes(notes) + if self._interceptor is not None and not self._test_mode_active: final_args, final_kwargs = self._interceptor.intercept(final_args, final_kwargs) task_serializer = self._get_serializer(task_name) @@ -432,6 +444,7 @@ def enqueue( timeout=timeout, unique_key=unique_key, metadata=metadata, + notes=notes_encoded, depends_on=dep_ids, expires=expires, result_ttl=result_ttl, @@ -462,6 +475,8 @@ def enqueue_many( unique_keys: list[str | None] | None = None, metadata: str | None = None, metadata_list: list[str | None] | None = None, + notes: dict[str, Any] | None = None, + notes_list: list[dict[str, Any] | None] | None = None, expires: float | None = None, expires_list: list[float | None] | None = None, result_ttl: int | None = None, @@ -486,6 +501,10 @@ def enqueue_many( if both are set). metadata: Uniform metadata JSON string for all jobs. metadata_list: Per-job metadata JSON strings. + notes: Uniform structured-notes dict for all jobs (see + :mod:`taskito.notes`). + notes_list: Per-job notes dicts. Takes precedence over ``notes`` + when both are supplied. expires: Uniform expiry in seconds for all jobs. expires_list: Per-job expiry in seconds. result_ttl: Uniform result TTL in seconds for all jobs. @@ -523,6 +542,7 @@ def enqueue_many( "delay": (delay_list[i] if delay_list is not None else delay), "unique_key": (unique_keys[i] if unique_keys is not None else None), "metadata": (metadata_list[i] if metadata_list is not None else metadata), + "notes": (notes_list[i] if notes_list is not None else notes), "expires": (expires_list[i] if expires_list is not None else expires), "result_ttl": (result_ttl_list[i] if result_ttl_list is not None else result_ttl), "idempotency_key": (idempotency_keys[i] if idempotency_keys is not None else None), @@ -550,6 +570,7 @@ def enqueue_many( timeouts_list = [opt["timeout"] for opt in per_job_options] delays = [opt["delay"] for opt in per_job_options] metas = [opt["metadata"] for opt in per_job_options] + notes_encoded = [validate_and_encode_notes(opt["notes"]) for opt in per_job_options] exp_list = [opt["expires"] for opt in per_job_options] ttl_list = [opt["result_ttl"] for opt in per_job_options] @@ -604,6 +625,7 @@ def enqueue_many( delay_seconds_list=delays, unique_keys=per_job_unique_keys, metadata_list=metas, + notes_list=notes_encoded, expires_list=exp_list, result_ttl_list=ttl_list, ) diff --git a/py_src/taskito/async_support/mixins.py b/py_src/taskito/async_support/mixins.py index 3d64a08..4f06bb8 100644 --- a/py_src/taskito/async_support/mixins.py +++ b/py_src/taskito/async_support/mixins.py @@ -15,6 +15,7 @@ from collections.abc import Sequence from concurrent.futures import Executor + from taskito.context import LogLevel from taskito.result import JobResult logger = logging.getLogger("taskito") @@ -73,7 +74,7 @@ def task_logs(self, job_id: str) -> list[dict]: ... def query_logs( self, task_name: str | None = ..., - level: str | None = ..., + level: LogLevel | None = ..., since: int = ..., limit: int = ..., ) -> list[dict]: ... diff --git a/py_src/taskito/context.py b/py_src/taskito/context.py index 2dc55e4..15f5be1 100644 --- a/py_src/taskito/context.py +++ b/py_src/taskito/context.py @@ -2,6 +2,7 @@ from __future__ import annotations +import enum import json import logging import threading @@ -15,6 +16,18 @@ logger = logging.getLogger("taskito.context") + +class LogLevel(str, enum.Enum): + """Severity levels for structured task logs written via :meth:`JobContext.log`.""" + + DEBUG = "debug" + INFO = "info" + WARNING = "warning" + ERROR = "error" + CRITICAL = "critical" + RESULT = "result" + + if TYPE_CHECKING: from taskito.app import Queue @@ -56,7 +69,7 @@ class JobContext: @queue.task() def process(data): current_job.update_progress(50) - current_job.log("Processing started", level="info") + current_job.log("Processing started", level=LogLevel.INFO) current_job.check_cancelled() # raises TaskCancelledError if cancelled current_job.check_timeout() # raises SoftTimeoutError if exceeded ... @@ -93,14 +106,14 @@ def update_progress(self, progress: int) -> None: def log( self, message: str, - level: str = "info", + level: LogLevel = LogLevel.INFO, extra: dict | None = None, ) -> None: """Write a structured log entry for this job. Args: message: The log message. - level: Log level (``"debug"``, ``"info"``, ``"warning"``, ``"error"``). + level: A :class:`LogLevel` member (e.g. ``LogLevel.INFO``). extra: Optional dict of structured data to attach. """ ctx = self._require_context() @@ -114,7 +127,9 @@ def log( extra_str = str(extra) else: extra_str = None - _queue_ref._inner.write_task_log(ctx.job_id, ctx.task_name, level, message, extra_str) + _queue_ref._inner.write_task_log( + ctx.job_id, ctx.task_name, level.value, message, extra_str + ) def publish(self, data: Any) -> None: """Publish a partial result visible to ``job.stream()`` consumers. @@ -124,7 +139,8 @@ def publish(self, data: Any) -> None: Args: data: Any JSON-serializable value. Stored as a task log entry - with ``level="result"`` so it can be filtered from regular logs. + with level :attr:`LogLevel.RESULT` so it can be filtered from + regular logs. """ ctx = self._require_context() if _queue_ref is None: @@ -133,7 +149,9 @@ def publish(self, data: Any) -> None: extra_str = json.dumps(data) except (TypeError, ValueError): extra_str = str(data) - _queue_ref._inner.write_task_log(ctx.job_id, ctx.task_name, "result", "", extra_str) + _queue_ref._inner.write_task_log( + ctx.job_id, ctx.task_name, LogLevel.RESULT.value, "", extra_str + ) def check_cancelled(self) -> None: """Check if cancellation has been requested for this job. diff --git a/py_src/taskito/dashboard/handlers/logs.py b/py_src/taskito/dashboard/handlers/logs.py index f30cd48..2ece3af 100644 --- a/py_src/taskito/dashboard/handlers/logs.py +++ b/py_src/taskito/dashboard/handlers/logs.py @@ -4,6 +4,7 @@ from typing import TYPE_CHECKING +from taskito.context import LogLevel from taskito.dashboard.handlers._qs import _parse_int_qs if TYPE_CHECKING: @@ -12,7 +13,8 @@ def _handle_logs(queue: Queue, qs: dict) -> list: task = qs.get("task", [None])[0] - level = qs.get("level", [None])[0] + level_raw = qs.get("level", [None])[0] + level = LogLevel(level_raw) if level_raw is not None else None since = _parse_int_qs(qs, "since", 3600) limit = _parse_int_qs(qs, "limit", 100) return queue.query_logs(task_name=task, level=level, since=since, limit=limit) diff --git a/py_src/taskito/exceptions.py b/py_src/taskito/exceptions.py index 11346ec..b8d03eb 100644 --- a/py_src/taskito/exceptions.py +++ b/py_src/taskito/exceptions.py @@ -73,6 +73,14 @@ class ProxyCleanupError(ResourceError): """Raised when a proxy handler fails during cleanup.""" +class NotesValidationError(TaskitoError, ValueError): + """Raised when a ``notes`` dict violates the structured-notes contract. + + Subclasses :class:`ValueError` so existing ``except ValueError`` clauses + keep working when validation fails inside enqueue paths. + """ + + class PredicateRejectedError(TaskitoError): """Raised when an enqueue-time predicate cancels the submission. diff --git a/py_src/taskito/mixins/operations.py b/py_src/taskito/mixins/operations.py index 5a252dc..7d59c11 100644 --- a/py_src/taskito/mixins/operations.py +++ b/py_src/taskito/mixins/operations.py @@ -4,6 +4,7 @@ from typing import Any +from taskito.context import LogLevel from taskito.events import EventType from taskito.result import JobResult @@ -53,13 +54,16 @@ def task_logs(self, job_id: str) -> list[dict]: def query_logs( self, task_name: str | None = None, - level: str | None = None, + level: LogLevel | None = None, since: int = 3600, limit: int = 100, ) -> list[dict]: """Query structured task logs with filters.""" return self._inner.query_task_logs( # type: ignore[no-any-return] - task_name=task_name, level=level, since_seconds=since, limit=limit + task_name=task_name, + level=level.value if level is not None else None, + since_seconds=since, + limit=limit, ) # -- Workers -- diff --git a/py_src/taskito/notes.py b/py_src/taskito/notes.py new file mode 100644 index 0000000..6f9fcb4 --- /dev/null +++ b/py_src/taskito/notes.py @@ -0,0 +1,122 @@ +"""Validation + encoding for the structured ``notes`` field on jobs. + +Notes are a small, bounded, user-readable annotation dict attached to a +job at enqueue time. Storage represents them as a JSON-encoded string in +the ``jobs.notes`` column across SQLite, Postgres, and Redis. + +Validation is performed in Python (the single boundary that all enqueue +paths funnel through) so the Rust/PyO3 layer can stay schema-agnostic: +it accepts an already-encoded string and stores it verbatim. + +Contract: + +* At most :data:`MAX_NOTE_FIELDS` top-level keys. +* Each key is a non-empty ``str`` no longer than + :data:`MAX_NOTE_KEY_LENGTH` characters. +* Nested values may be JSON-serializable primitives, lists, or dicts up + to :data:`MAX_NOTE_DEPTH` levels of nesting. +* String leaves are capped at :data:`MAX_NOTE_VALUE_LENGTH` characters. +* The final encoded JSON document fits in :data:`MAX_NOTE_BYTES` bytes. + +Limits are chosen to be tight enough that the dashboard can render the +full note set as a fixed-size key/value table without truncation. +""" + +from __future__ import annotations + +import json +from typing import Any + +from taskito.exceptions import NotesValidationError + +__all__ = [ + "MAX_NOTE_BYTES", + "MAX_NOTE_DEPTH", + "MAX_NOTE_FIELDS", + "MAX_NOTE_KEY_LENGTH", + "MAX_NOTE_VALUE_LENGTH", + "validate_and_encode_notes", +] + +MAX_NOTE_FIELDS: int = 15 +MAX_NOTE_KEY_LENGTH: int = 64 +MAX_NOTE_VALUE_LENGTH: int = 500 +MAX_NOTE_DEPTH: int = 3 +MAX_NOTE_BYTES: int = 4096 + +# Internal: JSON primitives accepted as leaf values. +_PRIMITIVE_TYPES = (str, int, float, bool, type(None)) + + +def validate_and_encode_notes(notes: dict[str, Any] | None) -> str | None: + """Validate ``notes`` and return its canonical JSON encoding. + + Returns ``None`` if ``notes`` is ``None`` (the absence of notes is + distinct from an empty dict, which is also accepted and encoded as + ``"{}"``). + + Raises: + NotesValidationError: if any rule in the module docstring is + violated. The exception message names the offending key or + constraint so callers can surface it to end users. + """ + if notes is None: + return None + if not isinstance(notes, dict): + raise NotesValidationError(f"notes must be a dict, got {type(notes).__name__}") + if len(notes) > MAX_NOTE_FIELDS: + raise NotesValidationError( + f"notes may not have more than {MAX_NOTE_FIELDS} fields, got {len(notes)}" + ) + + for key, value in notes.items(): + _validate_key(key) + _validate_value(key, value, depth=1) + + encoded = json.dumps(notes, sort_keys=True, ensure_ascii=False, separators=(",", ":")) + encoded_bytes = len(encoded.encode("utf-8")) + if encoded_bytes > MAX_NOTE_BYTES: + raise NotesValidationError( + f"encoded notes are {encoded_bytes} bytes, exceeds limit of {MAX_NOTE_BYTES} bytes" + ) + return encoded + + +def _validate_key(key: object) -> None: + if not isinstance(key, str): + raise NotesValidationError(f"note keys must be strings, got {type(key).__name__}") + if not key: + raise NotesValidationError("note keys may not be empty strings") + if len(key) > MAX_NOTE_KEY_LENGTH: + raise NotesValidationError( + f"note key {key!r} is {len(key)} characters, exceeds limit of {MAX_NOTE_KEY_LENGTH}" + ) + + +def _validate_value(path: str, value: Any, *, depth: int) -> None: + if depth > MAX_NOTE_DEPTH: + raise NotesValidationError( + f"note value at {path!r} exceeds max nesting depth of {MAX_NOTE_DEPTH}" + ) + if isinstance(value, str): + if len(value) > MAX_NOTE_VALUE_LENGTH: + raise NotesValidationError( + f"note value at {path!r} is {len(value)} characters, exceeds " + f"limit of {MAX_NOTE_VALUE_LENGTH}" + ) + return + if isinstance(value, (bool, int, float)) or value is None: + return + if isinstance(value, list): + for i, item in enumerate(value): + _validate_value(f"{path}[{i}]", item, depth=depth + 1) + return + if isinstance(value, dict): + for k, v in value.items(): + _validate_key(k) + _validate_value(f"{path}.{k}", v, depth=depth + 1) + return + raise NotesValidationError( + f"note value at {path!r} has unsupported type {type(value).__name__}; " + "must be JSON-serializable (str, int, float, bool, None, list, dict)" + ) diff --git a/py_src/taskito/result.py b/py_src/taskito/result.py index 3a46f0e..46847b9 100644 --- a/py_src/taskito/result.py +++ b/py_src/taskito/result.py @@ -78,6 +78,33 @@ def errors(self) -> list[dict]: """Error history for this job (one entry per failed attempt).""" return self._queue.job_errors(self.id) + @property + def metadata(self) -> str | None: + """Raw metadata JSON string set at enqueue time (free-form blob).""" + return self._py_job.metadata + + @property + def notes(self) -> dict[str, Any] | None: + """Structured notes set at enqueue time, parsed back to a dict. + + Returns ``None`` if no notes were attached. Decoding errors are + treated as missing — the stored string was validated on the way + in, so this branch only fires if the column was corrupted out of + band. + """ + raw = self._py_job.notes + if raw is None: + return None + try: + decoded = json.loads(raw) + except (json.JSONDecodeError, TypeError): + log.warning("failed to decode notes JSON for job %s", self.id) + return None + if not isinstance(decoded, dict): + log.warning("notes for job %s is not a JSON object", self.id) + return None + return decoded + @property def dependencies(self) -> list[str]: """IDs of jobs this job depends on.""" @@ -237,6 +264,7 @@ def to_dict(self) -> dict[str, Any]: "timeout_ms": self._py_job.timeout_ms, "unique_key": self._py_job.unique_key, "metadata": self._py_job.metadata, + "notes": self.notes, "namespace": self._py_job.namespace, } diff --git a/py_src/taskito/task.py b/py_src/taskito/task.py index e22c6c3..db4b302 100644 --- a/py_src/taskito/task.py +++ b/py_src/taskito/task.py @@ -82,6 +82,7 @@ def apply_async( timeout: int | None = None, unique_key: str | None = None, metadata: str | None = None, + notes: dict[str, Any] | None = None, depends_on: str | list[str] | None = None, expires: float | None = None, result_ttl: int | None = None, @@ -100,6 +101,8 @@ def apply_async( timeout: Override the default timeout in seconds. unique_key: Deduplication key (alias of ``idempotency_key``). metadata: Arbitrary JSON string to attach to the job. + notes: Structured annotations dict (≤ 15 top-level fields). See + :mod:`taskito.notes`. depends_on: Job ID or list of job IDs that must complete first. expires: Seconds until the job expires (skipped if not started by then). result_ttl: Per-job result TTL in seconds. @@ -122,6 +125,7 @@ def apply_async( timeout=timeout if timeout is not None else self._default_timeout, unique_key=unique_key, metadata=metadata, + notes=notes, depends_on=depends_on, expires=expires, result_ttl=result_ttl, diff --git a/tests/core/test_notes.py b/tests/core/test_notes.py new file mode 100644 index 0000000..bdabe0f --- /dev/null +++ b/tests/core/test_notes.py @@ -0,0 +1,187 @@ +"""Structured notes attached to jobs. + +Exercises both the standalone validator (``taskito.notes``) and the +end-to-end round trip through ``Queue.enqueue`` / +``Queue.enqueue_many`` → storage → ``JobResult.notes``. +""" + +from __future__ import annotations + +from typing import Any + +import pytest + +from taskito import MAX_NOTE_FIELDS, NotesValidationError, Queue +from taskito.notes import ( + MAX_NOTE_BYTES, + MAX_NOTE_DEPTH, + MAX_NOTE_KEY_LENGTH, + MAX_NOTE_VALUE_LENGTH, + validate_and_encode_notes, +) + +# ── Pure validator ----------------------------------------------------------- + + +def test_none_passes_through_unchanged() -> None: + assert validate_and_encode_notes(None) is None + + +def test_empty_dict_encodes_as_object() -> None: + assert validate_and_encode_notes({}) == "{}" + + +def test_canonical_encoding_sorts_keys() -> None: + encoded = validate_and_encode_notes({"b": 1, "a": 2}) + assert encoded == '{"a":2,"b":1}' + + +def test_non_dict_rejected() -> None: + with pytest.raises(NotesValidationError, match="notes must be a dict"): + validate_and_encode_notes(["nope"]) # type: ignore[arg-type] + + +def test_too_many_fields_rejected() -> None: + too_many = {f"k{i}": i for i in range(MAX_NOTE_FIELDS + 1)} + with pytest.raises(NotesValidationError, match=f"more than {MAX_NOTE_FIELDS} fields"): + validate_and_encode_notes(too_many) + + +def test_exactly_fifteen_fields_accepted() -> None: + notes = {f"k{i}": i for i in range(MAX_NOTE_FIELDS)} + encoded = validate_and_encode_notes(notes) + assert encoded is not None + + +def test_non_string_key_rejected() -> None: + with pytest.raises(NotesValidationError, match="note keys must be strings"): + validate_and_encode_notes({1: "value"}) # type: ignore[dict-item] + + +def test_empty_key_rejected() -> None: + with pytest.raises(NotesValidationError, match="may not be empty"): + validate_and_encode_notes({"": "value"}) + + +def test_oversized_key_rejected() -> None: + long_key = "k" * (MAX_NOTE_KEY_LENGTH + 1) + with pytest.raises(NotesValidationError, match="exceeds limit of 64"): + validate_and_encode_notes({long_key: "value"}) + + +def test_oversized_string_value_rejected() -> None: + long_value = "x" * (MAX_NOTE_VALUE_LENGTH + 1) + with pytest.raises(NotesValidationError, match=f"exceeds limit of {MAX_NOTE_VALUE_LENGTH}"): + validate_and_encode_notes({"k": long_value}) + + +def test_nested_value_within_depth_allowed() -> None: + assert MAX_NOTE_DEPTH >= 3 + encoded = validate_and_encode_notes({"k": {"inner": {"deepest": "ok"}}}) + assert encoded is not None + assert '"deepest":"ok"' in encoded + + +def test_nesting_beyond_depth_rejected() -> None: + # Build a structure whose depth exceeds the cap. Top-level key counts as + # depth=1, so MAX_NOTE_DEPTH + 1 nested dicts trips the check. + value: object = "leaf" + for _ in range(MAX_NOTE_DEPTH + 1): + value = {"nested": value} + with pytest.raises(NotesValidationError, match="max nesting depth"): + validate_and_encode_notes({"k": value}) + + +def test_unsupported_value_type_rejected() -> None: + with pytest.raises(NotesValidationError, match="unsupported type"): + validate_and_encode_notes({"k": object()}) + + +def test_total_size_cap_enforced() -> None: + # Pack a single string value just over the byte cap so the per-field + # limit doesn't trip first. + chunk = "x" * MAX_NOTE_VALUE_LENGTH + fields = (MAX_NOTE_BYTES // MAX_NOTE_VALUE_LENGTH) + 1 + if fields > MAX_NOTE_FIELDS: + pytest.skip("Per-field cap reaches byte cap before field cap") + huge = {f"k{i}": chunk for i in range(fields)} + with pytest.raises(NotesValidationError, match="exceeds limit of"): + validate_and_encode_notes(huge) + + +def test_primitives_accepted() -> None: + encoded = validate_and_encode_notes({"s": "x", "i": 1, "f": 1.5, "b": True, "n": None}) + assert encoded == '{"b":true,"f":1.5,"i":1,"n":null,"s":"x"}' + + +# ── End-to-end through Queue.enqueue ---------------------------------------- + + +def test_enqueue_round_trip(queue: Queue) -> None: + @queue.task() + def noop() -> str: + return "ok" + + job = noop.apply_async(notes={"customer_id": "cus_abc", "tier": "gold"}) + assert job.notes == {"customer_id": "cus_abc", "tier": "gold"} + + +def test_enqueue_without_notes_returns_none(queue: Queue) -> None: + @queue.task() + def noop() -> str: + return "ok" + + job = noop.apply_async() + assert job.notes is None + + +def test_enqueue_rejects_oversized_notes(queue: Queue) -> None: + @queue.task() + def noop() -> str: + return "ok" + + with pytest.raises(NotesValidationError): + noop.apply_async(notes={f"k{i}": i for i in range(MAX_NOTE_FIELDS + 1)}) + + +def test_enqueue_many_uniform_notes(queue: Queue) -> None: + @queue.task() + def noop(x: int) -> int: + return x + + jobs = queue.enqueue_many( + task_name=noop._task_name, + args_list=[(i,) for i in range(3)], + notes={"batch": "ETL-2026-05-14"}, + ) + for job in jobs: + assert job.notes == {"batch": "ETL-2026-05-14"} + + +def test_enqueue_many_per_job_notes_override_uniform(queue: Queue) -> None: + @queue.task() + def noop(x: int) -> int: + return x + + per_job: list[dict[str, Any] | None] = [{"row": str(i)} for i in range(3)] + jobs = queue.enqueue_many( + task_name=noop._task_name, + args_list=[(i,) for i in range(3)], + notes={"batch": "fallback"}, + notes_list=per_job, + ) + for i, job in enumerate(jobs): + assert job.notes == {"row": str(i)} + + +def test_enqueue_many_rejects_invalid_in_one_row(queue: Queue) -> None: + @queue.task() + def noop(x: int) -> int: + return x + + with pytest.raises(NotesValidationError): + queue.enqueue_many( + task_name=noop._task_name, + args_list=[(0,), (1,)], + notes_list=[{"ok": True}, ["bad"]], # type: ignore[list-item] + ) diff --git a/tests/dashboard/test_dashboard.py b/tests/dashboard/test_dashboard.py index dc281e1..de651b0 100644 --- a/tests/dashboard/test_dashboard.py +++ b/tests/dashboard/test_dashboard.py @@ -143,6 +143,7 @@ def dummy() -> None: "timeout_ms", "unique_key", "metadata", + "notes", "namespace", } assert set(d.keys()) == expected_keys