-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathwebhook_executor.rs
More file actions
89 lines (76 loc) · 2.79 KB
/
webhook_executor.rs
File metadata and controls
89 lines (76 loc) · 2.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
//! End-to-end demo of the webhook action executor.
//!
//! Run with: `cargo run --example webhook_executor`
//!
//! Steps:
//! 1. Spin up a local axum mock receiver on an ephemeral port.
//! 2. Build an in-memory `Agent` whose `actions` contains a JSON-encoded
//! webhook spec pointed at the mock.
//! 3. Process the agent's actions and print the executor outcome plus the
//! payload the receiver actually got.
//!
//! No external services are required.
use std::sync::{Arc, Mutex};
use axum::{extract::State, routing::post, Json, Router};
use pypes::agent::agent::Agent;
use pypes::executors::{process_actions, ExecutionOutcome};
use serde_json::{json, Value};
type Captured = Arc<Mutex<Vec<Value>>>;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let captured: Captured = Arc::new(Mutex::new(Vec::new()));
let app = Router::new()
.route("/hook", post(receive))
.with_state(captured.clone());
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
let addr = listener.local_addr()?;
tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
let webhook_spec = json!({
"type": "webhook",
"url": format!("http://{}/hook", addr),
"headers": { "Authorization": "Bearer demo-token" },
"payload": { "event": "agent.acted", "n": 1 }
})
.to_string();
let agent = Agent {
name: "demo".to_string(),
inputs: vec![],
actions: vec![webhook_spec],
};
println!("→ stored action: {}", agent.actions[0]);
let outcomes = process_actions(&agent).await;
for (idx, outcome) in outcomes.iter().enumerate() {
match outcome {
ExecutionOutcome::Webhook(Ok(res)) => {
println!("← webhook[{idx}] status={} body={}", res.status, res.body);
}
ExecutionOutcome::Webhook(Err(e)) => {
println!("← webhook[{idx}] error: {e}");
}
ExecutionOutcome::Cron(Ok(s)) => {
println!(
"← cron[{idx}] scheduled `{}` next at {}",
s.expression, s.next_fire
);
}
ExecutionOutcome::Cron(Err(e)) => {
println!("← cron[{idx}] error: {e}");
}
ExecutionOutcome::Unrecognized { raw, error } => {
println!("← unrecognized[{idx}] {error}: {raw}");
}
}
}
let received = captured.lock().unwrap();
println!("✓ mock receiver got {} request(s):", received.len());
for body in received.iter() {
println!(" {body}");
}
Ok(())
}
async fn receive(State(captured): State<Captured>, Json(body): Json<Value>) -> &'static str {
captured.lock().unwrap().push(body);
"{\"ok\":true}"
}