Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ The heavy lifting runs in Rust: a Tokio async scheduler, OS thread worker pool w
- **Job archival** — `queue.archive(older_than=86400)`, `queue.list_archived()`
- **Queue pause/resume** — `queue.pause()`, `queue.resume()`, `queue.paused_queues()`
- **Circuit breakers** — `circuit_breaker={"threshold": 5, "window": 60, "cooldown": 300}`
- **Structured logging** — `current_job.log("msg", level="info", extra={...})`
- **Structured logging** — `current_job.log("msg", level=LogLevel.INFO, extra={...})`
- **CLI** — `taskito worker`, `taskito info --watch`, `taskito dashboard`

## Integrations
Expand Down
10 changes: 10 additions & 0 deletions crates/taskito-core/src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ pub struct Job {
pub unique_key: Option<String>,
pub progress: Option<i32>,
pub metadata: Option<String>,
/// Structured, user-readable annotations attached to the job (canonical
/// JSON object, ≤ 15 top-level fields). Validated at the Python
/// boundary by `taskito.notes.validate_and_encode_notes`; stored as the
/// already-encoded JSON string here.
#[serde(default)]
pub notes: Option<String>,
pub cancel_requested: bool,
pub expires_at: Option<i64>,
pub result_ttl_ms: Option<i64>,
Expand Down Expand Up @@ -106,6 +112,7 @@ impl From<JobRow> for Job {
unique_key: row.unique_key,
progress: row.progress,
metadata: row.metadata,
notes: row.notes,
cancel_requested: row.cancel_requested != 0,
expires_at: row.expires_at,
result_ttl_ms: row.result_ttl_ms,
Expand All @@ -125,6 +132,8 @@ pub struct NewJob {
pub timeout_ms: i64,
pub unique_key: Option<String>,
pub metadata: Option<String>,
/// Pre-encoded canonical JSON object (≤ 15 fields). See [`Job::notes`].
pub notes: Option<String>,
pub depends_on: Vec<String>,
pub expires_at: Option<i64>,
pub result_ttl_ms: Option<i64>,
Expand Down Expand Up @@ -153,6 +162,7 @@ impl NewJob {
unique_key: self.unique_key,
progress: None,
metadata: self.metadata,
notes: self.notes,
cancel_requested: false,
expires_at: self.expires_at,
result_ttl_ms: self.result_ttl_ms,
Expand Down
1 change: 1 addition & 0 deletions crates/taskito-core/src/scheduler/maintenance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ impl Scheduler {
timeout_ms: PERIODIC_DEFAULT_TIMEOUT_MS,
unique_key,
metadata: None,
notes: None,
depends_on: vec![],
expires_at: None,
result_ttl_ms: None,
Expand Down
1 change: 1 addition & 0 deletions crates/taskito-core/src/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ mod tests {
timeout_ms: 300_000,
unique_key: None,
metadata: None,
notes: None,
depends_on: vec![],
expires_at: None,
result_ttl_ms: None,
Expand Down
3 changes: 3 additions & 0 deletions crates/taskito-core/src/storage/diesel_common/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ macro_rules! impl_diesel_job_ops {
timeout_ms: job.timeout_ms,
unique_key: job.unique_key.as_deref(),
metadata: job.metadata.as_deref(),
notes: job.notes.as_deref(),
cancel_requested: 0,
expires_at: job.expires_at,
result_ttl_ms: job.result_ttl_ms,
Expand Down Expand Up @@ -160,6 +161,7 @@ macro_rules! impl_diesel_job_ops {
timeout_ms: job.timeout_ms,
unique_key: job.unique_key.as_deref(),
metadata: job.metadata.as_deref(),
notes: job.notes.as_deref(),
cancel_requested: 0,
expires_at: job.expires_at,
result_ttl_ms: job.result_ttl_ms,
Expand Down Expand Up @@ -214,6 +216,7 @@ macro_rules! impl_diesel_job_ops {
timeout_ms: job.timeout_ms,
unique_key: job.unique_key.as_deref(),
metadata: job.metadata.as_deref(),
notes: job.notes.as_deref(),
cancel_requested: 0,
expires_at: job.expires_at,
result_ttl_ms: job.result_ttl_ms,
Expand Down
7 changes: 7 additions & 0 deletions crates/taskito-core/src/storage/diesel_common/migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ pub fn create_tables(d: &Dialect) -> Vec<String> {
unique_key TEXT,
progress INTEGER,
metadata TEXT,
notes TEXT,
cancel_requested INTEGER NOT NULL DEFAULT 0,
expires_at {bi},
result_ttl_ms {bi}
Expand All @@ -104,6 +105,7 @@ pub fn create_tables(d: &Dialect) -> Vec<String> {
retry_count INTEGER NOT NULL,
failed_at {bi} NOT NULL,
metadata TEXT,
notes TEXT,
priority INTEGER NOT NULL DEFAULT 0,
max_retries INTEGER NOT NULL DEFAULT 3,
timeout_ms {bi} NOT NULL DEFAULT 300000,
Expand Down Expand Up @@ -234,6 +236,7 @@ pub fn create_tables(d: &Dialect) -> Vec<String> {
unique_key TEXT,
progress INTEGER,
metadata TEXT,
notes TEXT,
cancel_requested INTEGER NOT NULL DEFAULT 0,
expires_at {bi},
result_ttl_ms {bi}
Expand Down Expand Up @@ -338,5 +341,9 @@ pub fn alter_statements(d: &Dialect) -> Vec<String> {
// namespace backfill on dead_letter / archived_jobs (after circuit-breaker columns)
format!("ALTER TABLE dead_letter ADD COLUMN {ife}namespace TEXT"),
format!("ALTER TABLE archived_jobs ADD COLUMN {ife}namespace TEXT"),
// structured notes (≤15 fields, validated at the Python boundary)
format!("ALTER TABLE jobs ADD COLUMN {ife}notes TEXT"),
format!("ALTER TABLE dead_letter ADD COLUMN {ife}notes TEXT"),
format!("ALTER TABLE archived_jobs ADD COLUMN {ife}notes TEXT"),
]
}
2 changes: 2 additions & 0 deletions crates/taskito-core/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub struct DeadJob {
pub retry_count: i32,
pub failed_at: i64,
pub metadata: Option<String>,
pub notes: Option<String>,
pub priority: i32,
pub max_retries: i32,
pub timeout_ms: i64,
Expand All @@ -63,6 +64,7 @@ impl From<models::DeadLetterRow> for DeadJob {
retry_count: row.retry_count,
failed_at: row.failed_at,
metadata: row.metadata,
notes: row.notes,
priority: row.priority,
max_retries: row.max_retries,
timeout_ms: row.timeout_ms,
Expand Down
5 changes: 5 additions & 0 deletions crates/taskito-core/src/storage/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub struct JobRow {
pub unique_key: Option<String>,
pub progress: Option<i32>,
pub metadata: Option<String>,
pub notes: Option<String>,
pub cancel_requested: i32,
pub expires_at: Option<i64>,
pub result_ttl_ms: Option<i64>,
Expand All @@ -52,6 +53,7 @@ pub struct NewJobRow<'a> {
pub timeout_ms: i64,
pub unique_key: Option<&'a str>,
pub metadata: Option<&'a str>,
pub notes: Option<&'a str>,
pub cancel_requested: i32,
pub expires_at: Option<i64>,
pub result_ttl_ms: Option<i64>,
Expand All @@ -71,6 +73,7 @@ pub struct DeadLetterRow {
pub retry_count: i32,
pub failed_at: i64,
pub metadata: Option<String>,
pub notes: Option<String>,
pub priority: i32,
pub max_retries: i32,
pub timeout_ms: i64,
Expand All @@ -91,6 +94,7 @@ pub struct NewDeadLetterRow<'a> {
pub retry_count: i32,
pub failed_at: i64,
pub metadata: Option<&'a str>,
pub notes: Option<&'a str>,
pub priority: i32,
pub max_retries: i32,
pub timeout_ms: i64,
Expand Down Expand Up @@ -411,6 +415,7 @@ pub struct ArchivedJobRow {
pub unique_key: Option<String>,
pub progress: Option<i32>,
pub metadata: Option<String>,
pub notes: Option<String>,
pub cancel_requested: i32,
pub expires_at: Option<i64>,
pub result_ttl_ms: Option<i64>,
Expand Down
1 change: 1 addition & 0 deletions crates/taskito-core/src/storage/postgres/archival.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ impl PostgresStorage {
unique_key: row.unique_key,
progress: row.progress,
metadata: row.metadata,
notes: row.notes,
cancel_requested: row.cancel_requested != 0,
expires_at: row.expires_at,
result_ttl_ms: row.result_ttl_ms,
Expand Down
3 changes: 3 additions & 0 deletions crates/taskito-core/src/storage/postgres/dead_letter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ impl PostgresStorage {
retry_count: job.retry_count,
failed_at: now,
metadata,
notes: job.notes.as_deref(),
priority: job.priority,
max_retries: job.max_retries,
timeout_ms: job.timeout_ms,
Expand Down Expand Up @@ -92,6 +93,7 @@ impl PostgresStorage {
timeout_ms: dead_row.timeout_ms,
unique_key: None,
metadata: dead_row.metadata,
notes: dead_row.notes,
depends_on: vec![],
expires_at: None,
result_ttl_ms: dead_row.result_ttl_ms,
Expand All @@ -115,6 +117,7 @@ impl PostgresStorage {
timeout_ms: job.timeout_ms,
unique_key: job.unique_key.as_deref(),
metadata: job.metadata.as_deref(),
notes: job.notes.as_deref(),
cancel_requested: 0,
expires_at: job.expires_at,
result_ttl_ms: job.result_ttl_ms,
Expand Down
5 changes: 5 additions & 0 deletions crates/taskito-core/src/storage/redis_backend/dead_letter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ struct DeadJobEntry {
pub retry_count: i32,
pub failed_at: i64,
pub metadata: Option<String>,
#[serde(default)]
pub notes: Option<String>,
pub priority: i32,
pub max_retries: i32,
pub timeout_ms: i64,
Expand All @@ -37,6 +39,7 @@ impl From<DeadJobEntry> for DeadJob {
retry_count: e.retry_count,
failed_at: e.failed_at,
metadata: e.metadata,
notes: e.notes,
priority: e.priority,
max_retries: e.max_retries,
timeout_ms: e.timeout_ms,
Expand All @@ -62,6 +65,7 @@ impl RedisStorage {
retry_count: job.retry_count,
failed_at: now,
metadata: metadata.map(|s| s.to_string()),
notes: job.notes.clone(),
priority: job.priority,
max_retries: job.max_retries,
timeout_ms: job.timeout_ms,
Expand Down Expand Up @@ -146,6 +150,7 @@ impl RedisStorage {
timeout_ms: entry.timeout_ms,
unique_key: None,
metadata: entry.metadata,
notes: entry.notes,
depends_on: vec![],
expires_at: None,
result_ttl_ms: entry.result_ttl_ms,
Expand Down
3 changes: 3 additions & 0 deletions crates/taskito-core/src/storage/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ diesel::table! {
unique_key -> Nullable<Text>,
progress -> Nullable<Integer>,
metadata -> Nullable<Text>,
notes -> Nullable<Text>,
cancel_requested -> Integer,
expires_at -> Nullable<BigInt>,
result_ttl_ms -> Nullable<BigInt>,
Expand All @@ -36,6 +37,7 @@ diesel::table! {
retry_count -> Integer,
failed_at -> BigInt,
metadata -> Nullable<Text>,
notes -> Nullable<Text>,
priority -> Integer,
max_retries -> Integer,
timeout_ms -> BigInt,
Expand Down Expand Up @@ -188,6 +190,7 @@ diesel::table! {
unique_key -> Nullable<Text>,
progress -> Nullable<Integer>,
metadata -> Nullable<Text>,
notes -> Nullable<Text>,
cancel_requested -> Integer,
expires_at -> Nullable<BigInt>,
result_ttl_ms -> Nullable<BigInt>,
Expand Down
1 change: 1 addition & 0 deletions crates/taskito-core/src/storage/sqlite/archival.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ impl SqliteStorage {
unique_key: row.unique_key,
progress: row.progress,
metadata: row.metadata,
notes: row.notes,
cancel_requested: row.cancel_requested != 0,
expires_at: row.expires_at,
result_ttl_ms: row.result_ttl_ms,
Expand Down
3 changes: 3 additions & 0 deletions crates/taskito-core/src/storage/sqlite/dead_letter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ impl SqliteStorage {
retry_count: job.retry_count,
failed_at: now,
metadata,
notes: job.notes.as_deref(),
priority: job.priority,
max_retries: job.max_retries,
timeout_ms: job.timeout_ms,
Expand Down Expand Up @@ -96,6 +97,7 @@ impl SqliteStorage {
timeout_ms: dead_row.timeout_ms,
unique_key: None,
metadata: dead_row.metadata,
notes: dead_row.notes,
depends_on: vec![],
expires_at: None,
result_ttl_ms: dead_row.result_ttl_ms,
Expand All @@ -119,6 +121,7 @@ impl SqliteStorage {
timeout_ms: job.timeout_ms,
unique_key: job.unique_key.as_deref(),
metadata: job.metadata.as_deref(),
notes: job.notes.as_deref(),
cancel_requested: 0,
expires_at: job.expires_at,
result_ttl_ms: job.result_ttl_ms,
Expand Down
46 changes: 46 additions & 0 deletions crates/taskito-core/src/storage/sqlite/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ fn make_job(task_name: &str) -> NewJob {
timeout_ms: 300_000,
unique_key: None,
metadata: None,
notes: None,
depends_on: vec![],
expires_at: None,
result_ttl_ms: None,
Expand All @@ -33,6 +34,51 @@ fn test_enqueue_and_get() {
assert_eq!(fetched.status, JobStatus::Pending);
}

#[test]
fn test_notes_round_trip() {
let storage = test_storage();
let mut new_job = make_job("notes_task");
new_job.notes = Some(r#"{"customer_id":"cus_abc","tier":"gold"}"#.to_string());

let job = storage.enqueue(new_job).unwrap();
let fetched = storage.get_job(&job.id).unwrap().unwrap();
assert_eq!(
fetched.notes.as_deref(),
Some(r#"{"customer_id":"cus_abc","tier":"gold"}"#)
);

// Absence round-trips as None.
let plain = storage.enqueue(make_job("plain_task")).unwrap();
let plain_fetched = storage.get_job(&plain.id).unwrap().unwrap();
assert!(plain_fetched.notes.is_none());
}

#[test]
fn test_notes_survive_dlq_round_trip() {
let storage = test_storage();
let mut new_job = make_job("dlq_notes_task");
new_job.notes = Some(r#"{"customer_id":"cus_xyz"}"#.to_string());
let job = storage.enqueue(new_job).unwrap();

storage
.move_to_dlq(&job, "boom", None)
.expect("move_to_dlq");

let dead = storage.list_dead(10, 0).unwrap();
let entry = dead
.iter()
.find(|d| d.original_job_id == job.id)
.expect("dead entry");
assert_eq!(entry.notes.as_deref(), Some(r#"{"customer_id":"cus_xyz"}"#));

let new_id = storage.retry_dead(&entry.id).expect("retry_dead");
let retried = storage.get_job(&new_id).unwrap().unwrap();
assert_eq!(
retried.notes.as_deref(),
Some(r#"{"customer_id":"cus_xyz"}"#)
);
}

#[test]
fn test_dequeue() {
let storage = test_storage();
Expand Down
1 change: 1 addition & 0 deletions crates/taskito-core/tests/rust/storage_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ fn make_job(queue: &str, task_name: &str) -> NewJob {
timeout_ms: 300_000,
unique_key: None,
metadata: None,
notes: None,
depends_on: vec![],
expires_at: None,
result_ttl_ms: None,
Expand Down
5 changes: 5 additions & 0 deletions crates/taskito-python/src/py_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ pub struct PyJob {
pub progress: Option<i32>,
#[pyo3(get)]
pub metadata: Option<String>,
/// Pre-encoded canonical JSON for structured notes (≤ 15 fields).
/// Python wraps this with `json.loads` via `JobResult.notes`.
#[pyo3(get)]
pub notes: Option<String>,
#[pyo3(get)]
pub namespace: Option<String>,

Expand Down Expand Up @@ -86,6 +90,7 @@ impl From<Job> for PyJob {
unique_key: job.unique_key,
progress: job.progress,
metadata: job.metadata,
notes: job.notes,
namespace: job.namespace,
status_val: job.status as i32,
result_bytes: job.result,
Expand Down
1 change: 1 addition & 0 deletions crates/taskito-python/src/py_queue/inspection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ impl PyQueue {
timeout_ms: original.timeout_ms,
unique_key: None,
metadata: Some(format!("{{\"replayed_from\":\"{job_id}\"}}")),
notes: original.notes,
depends_on: vec![],
expires_at: None,
result_ttl_ms: original.result_ttl_ms,
Expand Down
Loading
Loading