Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,15 @@ The format is loosely based on [Keep a Changelog](https://keepachangelog.com/en/
- README section: **Action Executors → Webhook**.
- `wiremock` dev-dependency for mock-server-backed executor tests.
- `src/lib.rs` so the binary's modules are also reachable from `examples/` and downstream crates.
- Second action executor: `cron` (`src/executors/cron.rs`). A `cron` action wraps another `Action` with a 5-, 6-, or 7-field cron expression. Includes a pure `parse` / `next_fire_after` API and an in-process `Scheduler` with a `tick(now)` shape so unit tests can drive it with a mock clock and the real loop can drive it with `Utc::now()`.
- `examples/cron_executor.rs` — end-to-end example: schedules a webhook to fire on the next per-second tick and exits within ~1 second.
- README section: **Action Executors → Cron**.
- `cron` and `chrono` dependencies (chrono with `clock` + `serde` features).

### Changed
- `Agent.actions` storage is unchanged on disk (`Vec<String>`), but each entry is now interpreted as a typed `Action` enum at execution time. Strings that don't parse fall through as `Unrecognized` rather than blocking the pipeline.
- Integrations table: webhook executor flipped from "experimental" to "shipped"; cron + LLM executors split out as planned with the same discriminator.
- `ExecutionOutcome` gains a `Cron(Result<CronScheduled, CronError>)` variant. `process_actions` reports the next computed fire time for cron entries without firing them; firing is handled by the scheduler.
- Integrations table: webhook + cron executors are now shipped; only LLM remains in the planned column.

## [v0.0.6] - 2026-05-03

Expand Down
29 changes: 29 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ axum = "0.7.2"
tower = "0.4.13"
http-body-util = "0.1.0"
reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] }
cron = "0.12"
chrono = { version = "0.4", default-features = false, features = ["clock", "serde"] }

[dev-dependencies]
wiremock = "0.5"
69 changes: 68 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ See [`examples/`](./examples) for ready-to-POST agent definitions.
| [`examples/web-summarize.json`](./examples/web-summarize.json) | Illustrative URL-fetch + summarize agent shape. Action wiring is intentionally a no-op until those executors land — see the integration table below. |
| [`examples/webhook.json`](./examples/webhook.json) | Agent with a single `webhook` action; pair with `pypes agent webhook-demo run` to fire it. |
| [`examples/webhook_executor.rs`](./examples/webhook_executor.rs) | End-to-end Rust example: spins up a local mock receiver, dispatches a webhook action, and prints the captured request. Run with `cargo run --example webhook_executor`. |
| [`examples/cron_executor.rs`](./examples/cron_executor.rs) | End-to-end Rust example: schedules a webhook to fire on the next per-second tick via the in-process scheduler, then exits. Run with `cargo run --example cron_executor`. |

<br>

Expand All @@ -111,7 +112,8 @@ Honest status as of **May 2026**. Anything marked planned has a target release;
| Qdrant vector-db | :heavy_check_mark: Shipped | `pypes add vector-db` pulls `qdrant/qdrant` via the local Docker socket. |
| Daemonized server | :heavy_check_mark: Shipped | `pypes start` forks; `pypes start --attatch` runs in the foreground. |
| Action executors → webhook (HTTP POST) | :heavy_check_mark: Shipped | First concrete executor. Each `Agent.actions` entry is a JSON spec like `{"type":"webhook", ...}`; see [Action Executors → Webhook](#action-executors--webhook). |
| Action executors → cron / LLM | :calendar: Planned | Cron is the next executor; LLM follows. Same `{"type": "..."}` discriminator as webhook. |
| Action executors → cron (scheduled) | :heavy_check_mark: Shipped | Wraps another action with a 5-, 6-, or 7-field cron expression and fires it on schedule via an in-process scheduler. See [Action Executors → Cron](#action-executors--cron). |
| Action executors → LLM | :calendar: Planned | Same `{"type": "..."}` discriminator as webhook/cron. |
| Gmail | :calendar: Planned for v0.1.0 | Not implemented. Earlier README claimed "in progress" — that was stale; no module exists in `src/`. |
| SMS (Twilio) | :calendar: Planned for v0.1.0 | Not implemented. |
| Vision / image inputs | :calendar: Planned for v0.2.0 | Not implemented. JSON-only inputs today. |
Expand Down Expand Up @@ -173,6 +175,71 @@ to assert on the exact request the executor sends — see

<br>

## Action Executors → Cron

Cron lets agents act on a schedule — required for any "check inbox every 10
minutes" or "run nightly summary" pattern. A `cron` action wraps another action
with a cron expression; an in-process scheduler fires the wrapped action on
each tick.

```jsonc
// One entry inside Agent.actions
{
"type": "cron",
"expression": "*/5 * * * *",
"action": {
"type": "webhook",
"url": "https://example.com/tick",
"payload": { "tick": true }
}
}
```

The `expression` field accepts:

- **5-field standard cron** — `min hour day-of-month month day-of-week` (e.g.
`*/5 * * * *` for every five minutes). `sec` is padded to `0` and `year` to `*`.
- **6-field cron with seconds** — `sec min hour dom mon dow` (e.g.
`* * * * * *` for every second).
- **7-field cron with year** — passed through to the underlying parser.

`pypes agent <NAME> run` reports the next computed fire time for each cron
action without firing it; the actual firing happens via the scheduler:

```bash
pypes agent scheduled-pinger run
# [0] cron `*/5 * * * *` → next fire 2026-05-03 12:05:00 UTC
```

Out of scope for v1: distributed scheduling, persistent missed-fire catchup
across restarts. The scheduler is single-process and fires only while it's
running.

### Worked example

```bash
cargo run --example cron_executor
```

The example boots a tiny in-process axum receiver on an ephemeral port, builds
a `CronAction` whose target is a webhook pointed at the receiver, advances the
[`Scheduler`](./src/executors/cron.rs) by one real tick, and prints both the
fired outcome and the body the receiver got:

```
→ stored action: {"type":"cron","expression":"* * * * * *","action":{"type":"webhook","url":"http://127.0.0.1:51636/hook","headers":{},"payload":{"event":"cron.tick","n":1}}}
⏲ next fire scheduled at 2026-05-03 09:10:56 UTC (in 253 ms)
← cron[0] fired webhook → status=200 body={"ok":true}
✓ mock receiver got 1 request(s):
{"event":"cron.tick","n":1}
```

The example terminates within ~1 second because the cron expression fires on
the next per-second boundary. Tests drive the scheduler with a fixed mock
clock and a tight real-time clock — see [`src/executors/cron.rs`](./src/executors/cron.rs).

<br>

## CLI reference

```
Expand Down
93 changes: 93 additions & 0 deletions examples/cron_executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
//! End-to-end demo of the cron action executor.
//!
//! Run with: `cargo run --example cron_executor`
//!
//! Steps:
//! 1. Spin up a local axum mock receiver on an ephemeral port.
//! 2. Build a `CronAction` whose target is a `WebhookAction` pointed at the
//! mock. The expression `* * * * * *` fires on every second so the example
//! terminates well under two seconds.
//! 3. Drive the in-process [`Scheduler`] for a single tick: wait until the next
//! fire, dispatch the wrapped webhook, and print the captured payload.
//!
//! No external services are required.

use std::collections::HashMap;
use std::sync::{Arc, Mutex};

use axum::{extract::State, routing::post, Json, Router};
use chrono::Utc;
use pypes::executors::cron::{CronAction, Scheduler};
use pypes::executors::webhook::{execute_webhook, WebhookAction};
use pypes::executors::Action;
use serde_json::{json, Value};

type Captured = Arc<Mutex<Vec<Value>>>;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let captured: Captured = Arc::new(Mutex::new(Vec::new()));

let app = Router::new()
.route("/hook", post(receive))
.with_state(captured.clone());

let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
let addr = listener.local_addr()?;
tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});

let webhook = WebhookAction {
url: format!("http://{}/hook", addr),
headers: HashMap::new(),
payload: Some(json!({ "event": "cron.tick", "n": 1 })),
};

let cron_action = CronAction {
// 6-field cron: every second. The example fires once and exits.
expression: "* * * * * *".to_string(),
action: Box::new(Action::Webhook(webhook.clone())),
};

// Round-trip the spec through JSON to mirror how it would be stored on an
// `Agent.actions` entry.
let stored = serde_json::to_string(&Action::Cron(cron_action.clone()))?;
println!("→ stored action: {stored}");

let started = Utc::now();
let scheduler = Scheduler::from_actions(&[cron_action], started)?;
let next = scheduler.entries()[0].next_fire;
println!("⏲ next fire scheduled at {next} (in {} ms)", (next - started).num_milliseconds());

let due = scheduler.next_due().await.unwrap_or_default();
let client = reqwest::Client::new();
for idx in due {
let entry = &scheduler.entries()[idx];
match entry.action.action.as_ref() {
Action::Webhook(spec) => match execute_webhook(&client, spec).await {
Ok(res) => println!(
"← cron[{idx}] fired webhook → status={} body={}",
res.status, res.body
),
Err(e) => println!("← cron[{idx}] webhook error: {e}"),
},
Action::Cron(_) => {
println!("← cron[{idx}] nested cron actions are not fired by this demo");
}
}
}

let received = captured.lock().unwrap();
println!("✓ mock receiver got {} request(s):", received.len());
for body in received.iter() {
println!(" {body}");
}

Ok(())
}

async fn receive(State(captured): State<Captured>, Json(body): Json<Value>) -> &'static str {
captured.lock().unwrap().push(body);
"{\"ok\":true}"
}
9 changes: 9 additions & 0 deletions examples/webhook_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
ExecutionOutcome::Webhook(Err(e)) => {
println!("← webhook[{idx}] error: {e}");
}
ExecutionOutcome::Cron(Ok(s)) => {
println!(
"← cron[{idx}] scheduled `{}` next at {}",
s.expression, s.next_fire
);
}
ExecutionOutcome::Cron(Err(e)) => {
println!("← cron[{idx}] error: {e}");
}
ExecutionOutcome::Unrecognized { raw, error } => {
println!("← unrecognized[{idx}] {error}: {raw}");
}
Expand Down
Loading
Loading