Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -779,3 +779,24 @@ Remember: The goal is to make Conductor easy to use in every language while main

---

## 🧪 Post-Session Testing Checklist

**After every coding session, run the full test suite to ensure zero failures:**

```bash
# All suites — expect 0 failures, 0 errors
python3 -m pytest tests/unit tests/backwardcompatibility tests/serdesertest tests/chaos tests/integration -v

# Expected results:
# Unit tests: ~626 passed
# Backward compatibility: ~1015 passed
# Serialization: ~58 passed
# Chaos: 2 skipped (require special setup)
# Integration: 128 skipped (require live Conductor server)
# TOTAL: 0 failures, 0 errors
```

Integration tests skip gracefully when the Conductor server is not available (no `CONDUCTOR_SERVER_URL` / `CONDUCTOR_AUTH_KEY` / `CONDUCTOR_AUTH_SECRET` env vars). When a server is available, they run against it. **There should be NO failures in any suite.**

---

338 changes: 168 additions & 170 deletions README.md

Large diffs are not rendered by default.

208 changes: 203 additions & 5 deletions docs/design/WORKER_SDK_IMPLEMENTATION_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ Initialize → Register Task Def → Start Polling → Execute Tasks → Update
- Manages lifecycle (start, stop, restart)
- Provides configuration to workers
- Coordinates metrics collection
- Monitors and auto-restarts crashed worker processes (see Section 4.4)
- Provides health check APIs for container orchestrators
- Supports `import_modules` to force-import modules before scanning for decorated workers
- Implements context manager protocol (`with TaskHandler(...) as th:`) for clean lifecycle

2. **TaskRunner (Execution Engine)**
- Runs in worker process
Expand Down Expand Up @@ -241,11 +245,20 @@ class TaskHandler {
MetricsSettings metricsSettings
List<EventListener> eventListeners

// Supervision settings
Bool monitorProcesses // default: true
Bool restartOnFailure // default: true
Int restartMaxAttempts // default: 0 (unlimited)
Float restartBackoffSeconds // default: 5.0
Float restartBackoffMaxSeconds // default: 300.0

// Methods
discover_workers() → List<Worker>
start_processes()
stop_processes()
join_processes()
is_healthy() → Bool
get_worker_process_status() → Map<String, ProcessStatus>
}

// Worker metadata
Expand All @@ -263,6 +276,7 @@ class Worker {
Bool overwriteTaskDef
Bool strictSchema
Bool paused
Bool leaseExtendEnabled // Auto-extend task lease for long-running tasks
}

// Execution engine (one per worker process)
Expand Down Expand Up @@ -340,6 +354,103 @@ FUNCTION detect_worker_type(worker_function):
- Rust: Check for `async fn` keyword
- JavaScript/TypeScript: Check for `async function`

### 4.4 Worker Process Supervision

**Key Principle:** Worker processes must be monitored and auto-restarted on failure for production reliability.

**Architecture:**
- TaskHandler spawns a background monitor thread after `start_processes()`
- Monitor thread periodically checks if each worker process is alive
- Dead processes are restarted with exponential backoff to prevent crash loops

```
FUNCTION start_monitor():
IF not monitor_processes:
RETURN

// Spawn background thread
monitor_thread = Thread(target=monitor_loop, daemon=True)
monitor_thread.start()

FUNCTION monitor_loop():
WHILE not shutdown_requested:
check_and_restart_processes()
sleep(monitor_interval_seconds) // default: 5s

FUNCTION check_and_restart_processes():
LOCK process_lock:
FOR i, process IN enumerate(worker_processes):
IF process.is_alive():
CONTINUE

exitcode = process.exitcode
worker_name = workers[i].task_definition_name

log_warning("Worker process exited (worker={worker_name}, exitcode={exitcode})")

IF not restart_on_failure:
CONTINUE

restart_worker_process(i)

FUNCTION restart_worker_process(index: Int):
// Enforce max attempts (0 = unlimited)
IF restart_max_attempts > 0 AND restart_counts[index] >= restart_max_attempts:
log_error("Max restart attempts reached for worker {worker_name}")
RETURN

// Exponential backoff per-worker to prevent tight crash loops
now = current_time()
IF now < next_restart_at[index]:
RETURN // Still in backoff period

backoff = min(
restart_backoff_seconds * (2 ^ restart_counts[index]),
restart_backoff_max_seconds
)
next_restart_at[index] = now + backoff

// Reap old process (avoid zombie accumulation)
old_process.join(timeout=0)
old_process.close()

// Spawn new process
new_process = build_process_for_worker(workers[index])
worker_processes[index] = new_process
new_process.start()
restart_counts[index] += 1

// Metrics
increment_metric("worker_restart_total", {task_type: worker_name})

log_info("Restarted worker (worker={worker_name}, attempt={restart_counts[index]}, backoff={backoff}s)")
```

**Configuration:**

| Property | Type | Default | Description |
|----------|------|---------|-------------|
| `monitor_processes` | Bool | true | Enable process supervision |
| `restart_on_failure` | Bool | true | Auto-restart crashed workers |
| `restart_max_attempts` | Int | 0 | Max restarts per worker (0 = unlimited) |
| `restart_backoff_seconds` | Float | 5.0 | Initial backoff before restart |
| `restart_backoff_max_seconds` | Float | 300.0 | Maximum backoff cap |
| `monitor_interval_seconds` | Float | 5.0 | Health check interval |

**Health Check API:**

```
FUNCTION is_healthy() → Bool:
FOR process IN worker_processes:
IF not process.is_alive():
RETURN false
RETURN true

FUNCTION get_worker_process_status() → Map<String, ProcessStatus>:
// Returns per-worker status: alive, pid, exitcode, restart_count
// Useful for /healthcheck endpoints in web frameworks
```

---

## 5. Polling & Execution Loop
Expand Down Expand Up @@ -415,7 +526,7 @@ FUNCTION batch_poll(count: Int) → List<Task>:
params = {
"workerid": worker_id,
"count": count,
"timeout": 100 // ms, server-side long poll
"timeout": worker.poll_timeout // ms, server-side long poll (default: 100)
}

// Only include domain if not null/empty
Expand Down Expand Up @@ -624,12 +735,66 @@ FUNCTION update_task(task_result: TaskResult):

**Why This Matters:** Task was executed successfully, but Conductor doesn't know. External systems must handle recovery.

### 5.5b v2 Update Endpoint & Task Chaining (Optimization)

**Key Principle:** The v2 update endpoint returns the next task to process, eliminating a round-trip poll.

Instead of the pattern: execute → update → poll → execute → update → poll, the v2 endpoint enables: execute → update+poll → execute → update+poll.

```
FUNCTION update_task_v2(task_result: TaskResult) → Task | null:
// Same retry logic as update_task (Section 5.5)
// BUT: response is a Task object (the next task to process) or null

FOR attempt IN [0, 1, 2, 3]:
IF attempt > 0:
sleep(attempt * 10 seconds)

TRY:
next_task = http_client.update_task_v2(task_result)
RETURN next_task // May be null if no pending tasks

CATCH Exception:
// Same retry logic as v1

RETURN null
```

**Execute-Update Loop:**

```
FUNCTION execute_and_update_task(task: Task):
// Tight loop: execute → update_v2 (get next) → execute → ...
WHILE task is not null AND not shutdown:
result = execute_task(task)

// TaskInProgress or async: stop chaining
IF result is null OR result is TaskInProgress:
RETURN

// Update AND get next task in one call
task = update_task_v2(result)
```

**Benefits:**
- ~50% fewer HTTP round-trips under load (update + poll combined)
- Lower latency between consecutive tasks
- Backward compatible: falls back to normal polling when v2 returns null

**HTTP Endpoint:**
```
POST /api/tasks/update-v2
Body: TaskResult (JSON)
Response: Task | null (next task to process for same task type)
```

### 5.6 Capacity Management

**Key Principle:** Capacity represents end-to-end task handling (execute + update)

**Async Workers (Explicit Semaphore):**
```
// Semaphore/capacity held during BOTH execute and update
// Semaphore held during BOTH execute and update
FUNCTION execute_and_update_task(task: Task):
ACQUIRE semaphore: // Blocks if at capacity
result = execute_task(task)
Expand All @@ -641,7 +806,23 @@ FUNCTION execute_and_update_task(task: Task):
// Only then can new task be polled
```

**Why:** Ensures we don't poll more tasks than we can fully handle (execute AND update).
**Sync Workers (Implicit via Thread Pool):**
```
// Thread pool naturally provides capacity management.
// Each thread runs execute_and_update_task — the thread stays
// occupied during BOTH execute and update, so the pool size
// (= thread_count) limits concurrency without an explicit semaphore.
FUNCTION execute_and_update_task(task: Task):
// Runs inside ThreadPoolExecutor(max_workers=thread_count)
result = execute_task(task)

IF result is not TaskInProgress:
update_task(result)

// Thread returns to pool — capacity slot freed
```

**Why:** Ensures we don't poll more tasks than we can fully handle (execute AND update). Both approaches achieve the same goal — async uses explicit semaphore, sync uses thread pool sizing.

---

Expand All @@ -667,6 +848,7 @@ FUNCTION execute_and_update_task(task: Task):
| `overwrite_task_def` | Bool | true | Overwrite existing task definitions |
| `strict_schema` | Bool | false | Enforce strict JSON schema validation |
| `paused` | Bool | false | Pause worker (stop polling) |
| `lease_extend_enabled` | Bool | false | Auto-extend task lease for long-running tasks (alternative to TaskInProgress) |

### 6.3 Environment Variable Format

Expand Down Expand Up @@ -1144,6 +1326,8 @@ FUNCTION reset_auth_failures():
auth_failures = 0
```

**When to Reset:** Auth failures should be reset when a poll succeeds (200 response), regardless of whether tasks were returned. A successful HTTP response means authentication is working.

### 9.3 Adaptive Backoff for Empty Polls

```
Expand Down Expand Up @@ -1440,14 +1624,23 @@ Query Params:
Response: List<Task>
```

**Update Task:**
**Update Task (v1):**
```
POST /api/tasks
Body: TaskResult (JSON)

Response: string (task status)
```

**Update Task (v2) — Recommended:**
```
POST /api/tasks/update-v2
Body: TaskResult (JSON)

Response: Task | null (next task to process for same task type)
```
The v2 endpoint combines update + poll: it updates the current task result and returns the next pending task (if any) for the same task type. This enables the execute-update loop optimization described in Section 5.5b.

**Register Task Definition:**
```
POST /api/metadata/taskdefs
Expand Down Expand Up @@ -1782,7 +1975,12 @@ FUNCTION validate_and_process_order(order_id: String) → Result:

### 16.2 Long-Running Tasks (TaskInProgress)

**Pattern:** Return TaskInProgress to extend lease
**Two approaches for long-running tasks:**

1. **TaskInProgress (manual):** Worker returns `TaskInProgress` to re-queue itself with a callback delay. Best for tasks that need incremental progress tracking.
2. **Lease Extension (automatic):** Set `lease_extend_enabled=true` on the worker — the SDK automatically extends the task lease periodically. Best for tasks that run continuously without needing poll-based progress.

**Pattern 1: TaskInProgress — Return to re-queue**

```
class TaskInProgress {
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ line-ending = "auto"
"tests/**/*.py" = ["B", "C4", "SIM"]
"examples/**/*.py" = ["B", "C4", "SIM"]

[tool.pytest.ini_options]
pythonpath = ["src"]

[tool.coverage.run]
source = ["src/conductor"]
omit = [
Expand Down
5 changes: 5 additions & 0 deletions src/conductor/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
from __future__ import annotations

from pkgutil import extend_path

__path__ = extend_path(__path__, __name__)
__version__ = "1.1.10"
Loading
Loading