This document provides a complete reference for the ColonyOS Rust SDK.
- Core Types
- Constants
- Cryptography
- Colony Management
- Executor Management
- Process Management
- Workflow Management
- Logging
- Channels
- Subscriptions
- Statistics
- Function Registry
- Blueprint Management
- Error Handling
pub struct Colony {
pub colonyid: String,
pub name: String,
}
impl Colony {
pub fn new(colonyid: &str, name: &str) -> Colony;
}pub struct Executor {
pub executorid: String,
pub executortype: String,
pub executorname: String,
pub colonyname: String,
pub state: i32,
pub requirefuncreg: bool,
pub commissiontime: String,
pub lastheardfromtime: String,
pub locationname: String,
pub capabilities: Capabilities,
pub allocations: Allocations,
}
impl Executor {
pub fn new(name: &str, executorid: &str, executortype: &str, colonyname: &str) -> Executor;
}Defines the specification for a process to be executed.
pub struct FunctionSpec {
pub nodename: String, // Name for workflow dependencies
pub funcname: String, // Function name
pub args: Vec<String>, // Positional arguments
pub kwargs: HashMap<String, Value>, // Keyword arguments
pub priority: i32, // Higher priority = executed first
pub maxwaittime: i32, // Max seconds waiting in queue
pub maxexectime: i32, // Max seconds for execution
pub maxretries: i32, // Max retry attempts
pub conditions: Conditions, // Execution conditions
pub label: String, // Optional label
pub fs: Filesystem, // Filesystem configuration
pub env: HashMap<String, String>, // Environment variables
pub channels: Vec<String>, // Channel names
}
impl FunctionSpec {
pub fn new(funcname: &str, executortype: &str, colonyname: &str) -> FunctionSpec;
}Specifies conditions for process assignment.
pub struct Conditions {
pub colonyname: String,
pub executornames: Vec<String>, // Specific executors (optional)
pub executortype: String, // Required executor type
pub dependencies: Vec<String>, // Workflow dependencies (node names)
pub nodes: i32, // Number of nodes required
pub cpu: String, // CPU requirements
pub processes: i32,
pub processes_per_node: i32,
pub mem: String, // Memory requirements
pub storage: String, // Storage requirements
pub gpu: GPU, // GPU requirements
pub walltime: i64, // Wall time limit
}Represents a submitted process.
pub struct Process {
pub processid: String,
pub initiatorid: String,
pub initiatorname: String,
pub assignedexecutorid: String,
pub isassigned: bool,
pub state: i32, // WAITING, RUNNING, SUCCESS, FAILED
pub prioritytime: i64,
pub submissiontime: String,
pub starttime: String,
pub endtime: String,
pub retries: i32,
pub attributes: Vec<Attribute>,
pub spec: FunctionSpec,
pub waitforparents: bool,
pub parents: Vec<String>,
pub children: Vec<String>,
pub processgraphid: String,
pub input: Vec<String>,
pub output: Vec<String>,
pub errors: Vec<String>,
}Represents a workflow (DAG of processes).
pub struct ProcessGraph {
pub processgraphid: String,
pub colonyname: String,
pub state: i32,
pub rootprocessids: Vec<String>,
pub processids: Vec<String>,
}Specification for submitting a workflow.
pub struct WorkflowSpec {
pub colonyname: String,
pub functionspecs: Vec<FunctionSpec>,
}Key-value attribute attached to a process.
pub struct Attribute {
pub attributeid: String,
pub targetid: String,
pub targetcolonyname: String,
pub targetprocessgraphid: String,
pub attributetype: i32, // IN, OUT, ERR, ENV
pub key: String,
pub value: String,
}
impl Attribute {
pub fn new(colonyname: &str, processid: &str, key: &str, value: &str) -> Attribute;
}Log message for a process.
pub struct Log {
pub processid: String,
pub colonyname: String,
pub executorname: String,
pub message: String,
pub timestamp: i64,
}Entry in a process channel.
pub struct ChannelEntry {
pub sequence: i64,
pub data: String,
pub msgtype: String,
pub inreplyto: i64,
}Colony statistics.
pub struct Statistics {
pub colonies: i64,
pub executors: i64,
pub waitingprocesses: i64,
pub runningprocesses: i64,
pub successfulprocesses: i64,
pub failedprocesses: i64,
pub waitingworkflows: i64,
pub runningworkflows: i64,
pub successfulworkflows: i64,
pub failedworkflows: i64,
}Registered function metadata.
pub struct Function {
pub functionid: String,
pub executorname: String,
pub executortype: String,
pub colonyname: String,
pub funcname: String,
pub counter: i64,
pub minwaittime: f64,
pub maxwaittime: f64,
pub minexectime: f64,
pub maxexectime: f64,
pub avgwaittime: f64,
pub avgexectime: f64,
}Blueprint for reconciliation.
pub struct Blueprint {
pub blueprintid: String,
pub kind: String,
pub metadata: BlueprintMetadata,
pub handler: BlueprintHandler,
pub spec: HashMap<String, Value>,
pub status: HashMap<String, Value>,
pub generation: i64,
pub reconciledgeneration: i64,
pub lastreconciled: String,
}pub const WAITING: i32 = 0; // Process waiting in queue
pub const RUNNING: i32 = 1; // Process being executed
pub const SUCCESS: i32 = 2; // Process completed successfully
pub const FAILED: i32 = 3; // Process failedpub const PENDING: i32 = 0; // Executor pending approval
pub const APPROVED: i32 = 1; // Executor approved
pub const REJECTED: i32 = 2; // Executor rejectedpub const IN: i32 = 0; // Input attribute
pub const OUT: i32 = 1; // Output attribute
pub const ERR: i32 = 2; // Error attribute
pub const ENV: i32 = 4; // Environment attributeAll functions are in the crypto module.
Generate a new random private key.
pub fn gen_prvkey() -> StringReturns a hex-encoded 32-byte private key (64 characters).
Derive the public ID from a private key.
pub fn gen_id(private_key: &str) -> StringReturns a hex-encoded SHA3-256 hash of the public key (64 characters).
Sign a message with a private key.
pub fn gen_signature(message: &str, private_key: &str) -> StringReturns a hex-encoded signature (130 characters: r + s + v).
Hash a message with SHA3-256.
pub fn gen_hash(message: &str) -> StringReturns a hex-encoded hash (64 characters).
Recover the public ID from a message and signature.
pub fn recid(message: &str, signature: &str) -> StringReturns the hex-encoded ID that signed the message.
Create a new colony.
pub async fn add_colony(
colony: &Colony,
prvkey: &str,
) -> Result<Colony, RPCError>Delete a colony.
pub async fn remove_colony(
colony_name: &str,
prvkey: &str,
) -> Result<(), RPCError>Get a colony by name.
pub async fn get_colony(
colonyname: &str,
prvkey: &str,
) -> Result<Colony, RPCError>Get all colonies.
pub async fn get_colonies(
prvkey: &str,
) -> Result<Vec<Colony>, RPCError>Register a new executor.
pub async fn add_executor(
executor: &Executor,
prvkey: &str,
) -> Result<Executor, RPCError>Approve a pending executor (requires colony owner key).
pub async fn approve_executor(
colonyname: &str,
executorname: &str,
prvkey: &str,
) -> Result<(), RPCError>Reject a pending executor.
pub async fn reject_executor(
colonyname: &str,
executorname: &str,
prvkey: &str,
) -> Result<(), RPCError>Remove an executor.
pub async fn remove_executor(
colonyname: &str,
executorname: &str,
prvkey: &str,
) -> Result<(), RPCError>Get an executor by name.
pub async fn get_executor(
colonyname: &str,
executorname: &str,
prvkey: &str,
) -> Result<Executor, RPCError>Get all executors in a colony.
pub async fn get_executors(
colonyname: &str,
prvkey: &str,
) -> Result<Vec<Executor>, RPCError>Submit a new process for execution.
pub async fn submit(
spec: &FunctionSpec,
prvkey: &str,
) -> Result<Process, RPCError>Wait for and assign a process to execute.
pub async fn assign(
colonyname: &str,
timeout: i32, // Timeout in seconds
prvkey: &str,
) -> Result<Process, RPCError>Note: Returns an error with conn_err() == false on timeout.
Mark a process as successfully completed.
pub async fn close(
processid: &str,
prvkey: &str,
) -> Result<(), RPCError>Mark a process as failed.
pub async fn fail(
processid: &str,
prvkey: &str,
) -> Result<(), RPCError>Get a process by ID.
pub async fn get_process(
processid: &str,
prvkey: &str,
) -> Result<Process, RPCError>Get processes by state.
pub async fn get_processes(
colonyname: &str,
count: i32,
state: i32, // WAITING, RUNNING, SUCCESS, or FAILED
prvkey: &str,
) -> Result<Vec<Process>, RPCError>Remove a process.
pub async fn remove_process(
processid: &str,
prvkey: &str,
) -> Result<(), RPCError>Remove all processes with a given state.
pub async fn remove_all_processes(
colonyname: &str,
state: i32,
prvkey: &str,
) -> Result<(), RPCError>Set the output of a process.
pub async fn set_output(
processid: &str,
output: Vec<String>,
prvkey: &str,
) -> Result<(), RPCError>Add an attribute to a process.
pub async fn add_attr(
attr: &Attribute,
prvkey: &str,
) -> Result<Attribute, RPCError>Submit a workflow (DAG of processes).
pub async fn submit_workflow(
workflowspec: &WorkflowSpec,
prvkey: &str,
) -> Result<ProcessGraph, RPCError>Get a process graph by ID.
pub async fn get_processgraph(
processgraphid: &str,
prvkey: &str,
) -> Result<ProcessGraph, RPCError>Get process graphs by state.
pub async fn get_processgraphs(
colonyname: &str,
count: i32,
state: i32,
prvkey: &str,
) -> Result<Vec<ProcessGraph>, RPCError>Remove a process graph.
pub async fn remove_processgraph(
processgraphid: &str,
prvkey: &str,
) -> Result<(), RPCError>Remove all process graphs with a given state.
pub async fn remove_all_processgraphs(
colonyname: &str,
state: i32,
prvkey: &str,
) -> Result<(), RPCError>Add a log message for a process.
pub async fn add_log(
log: &Log,
prvkey: &str,
) -> Result<(), RPCError>Get logs for a process.
pub async fn get_logs(
colonyname: &str,
processid: &str,
executorname: &str,
count: i32,
since: i64, // Timestamp
prvkey: &str,
) -> Result<Vec<Log>, RPCError>Channels provide real-time communication with processes. A process must have channels defined in its FunctionSpec before they can be used.
pub struct ChannelEntry {
pub sequence: i64, // Message sequence number
pub payload: String, // Base64 encoded payload
pub msgtype: String, // Message type ("data", "end", or "error")
pub inreplyto: i64, // Sequence number this replies to
pub timestamp: String, // ISO 8601 timestamp
pub senderid: String, // Sender's executor ID
}
impl ChannelEntry {
/// Returns the payload decoded from base64 as a UTF-8 string
pub fn payload_as_string(&self) -> String;
/// Returns the raw payload bytes decoded from base64
pub fn payload_bytes(&self) -> Vec<u8>;
}Append data to a process channel.
pub async fn channel_append(
processid: &str,
channelname: &str,
sequence: i64, // Client-assigned sequence number
data: &str, // Message content
data_type: &str, // Empty string, "end", or "error"
inreplyto: i64, // Sequence number this replies to (0 if not a reply)
prvkey: &str,
) -> Result<ChannelEntry, RPCError>Read from a process channel.
pub async fn channel_read(
processid: &str,
channelname: &str,
afterseq: i64, // Read messages after this sequence (0 for all)
limit: i32, // Max messages to return (0 for no limit)
prvkey: &str,
) -> Result<Vec<ChannelEntry>, RPCError>Example:
// Create a spec with a channel
let mut spec = FunctionSpec::new("my_func", "cli", "dev");
spec.channels = vec!["output".to_string()];
// Submit and assign the process
let process = colonyos::submit(&spec, &prvkey).await?;
let assigned = colonyos::assign(&colonyname, 10, &prvkey).await?;
// Append messages to the channel
colonyos::channel_append(
&assigned.processid,
"output",
1, // sequence
"Hello!",
"", // payloadtype
0, // inreplyto
&prvkey,
).await?;
// Read messages from the channel
let messages = colonyos::channel_read(
&assigned.processid,
"output",
0, // afterseq (0 = all)
10, // limit
&prvkey,
).await?;
for msg in messages {
println!("Message {}: {}", msg.sequence, msg.payload_as_string());
}Subscriptions provide real-time notifications for process state changes and channel messages via WebSocket connections. These are essential for building responsive applications.
Subscribe to process lifecycle events and wait for a specific state.
#[cfg(not(target_arch = "wasm32"))]
pub async fn subscribe_process(
process: &Process,
state: i32, // Target state (RUNNING, SUCCESS, FAILED)
timeout: i32, // Timeout in seconds
prvkey: &str,
) -> Result<(), RPCError>This function opens a WebSocket connection to the server and blocks until the process reaches the specified state. Commonly used to wait for a process to start running before subscribing to its channels.
Example:
use colonyos::core::{FunctionSpec, RUNNING};
// Submit a process
let mut spec = FunctionSpec::new("ai_inference", "ai", "my_colony");
spec.channels = vec!["output".to_string()];
let process = colonyos::submit(&spec, &prvkey).await?;
// Wait for the process to be assigned and running
colonyos::subscribe_process(&process, RUNNING, 60, &prvkey).await?;
println!("Process is now running!");
// Now it's safe to subscribe to channelsSubscribe to channel messages via WebSocket for real-time streaming.
#[cfg(not(target_arch = "wasm32"))]
pub async fn subscribe_channel<F>(
processid: &str,
channelname: &str,
afterseq: i64, // Start reading after this sequence (0 for all)
timeout: i32, // Timeout in seconds
prvkey: &str,
callback: F, // Called for each batch of messages
) -> Result<Vec<ChannelEntry>, RPCError>
where
F: FnMut(Vec<ChannelEntry>) -> bool, // Return false to stop receivingThis function opens a WebSocket connection and receives messages in real-time.
The callback is called for each batch of messages received. Return false from
the callback to stop receiving messages.
Important: Subscribing to a channel triggers channel creation on the server. Always subscribe before appending messages to ensure the channel exists.
Example: Real-time streaming from AI executor
use colonyos::core::{FunctionSpec, Process, RUNNING};
async fn stream_ai_response(prvkey: &str) -> Result<(), Box<dyn std::error::Error>> {
// 1. Submit process with channel
let mut spec = FunctionSpec::new("chat", "ai", "my_colony");
spec.channels = vec!["response".to_string()];
spec.args = vec!["What is the meaning of life?".to_string()];
let process = colonyos::submit(&spec, &prvkey).await?;
// 2. Wait for process to start running
colonyos::subscribe_process(&process, RUNNING, 60, &prvkey).await?;
// 3. Subscribe to channel and print tokens as they arrive
let all_entries = colonyos::subscribe_channel(
&process.processid,
"response",
0, // from beginning
120, // 2 minute timeout
&prvkey,
|entries| {
for entry in &entries {
// Print token without newline for streaming effect
print!("{}", entry.payload_as_string());
std::io::stdout().flush().ok();
// Stop if we receive an "end" message
if entry.msgtype == "end" {
return false;
}
}
true // continue receiving
}
).await?;
println!("\n\nReceived {} total messages", all_entries.len());
Ok(())
}Example: Triggering channel creation
// When you only need to ensure the channel exists (for append operations),
// use a short timeout and stop immediately:
let _ = colonyos::subscribe_channel(
&process.processid,
"my-channel",
0, // afterseq
1, // 1 second timeout
&prvkey,
|_| false, // Stop immediately after first callback
).await;
// Now channel_append will work reliably
colonyos::channel_append(
&process.processid,
"my-channel",
1,
"Hello!",
"",
0,
&prvkey,
).await?;Get statistics for a colony.
pub async fn get_statistics(
colonyname: &str,
prvkey: &str,
) -> Result<Statistics, RPCError>Register a function.
pub async fn add_function(
function: &Function,
prvkey: &str,
) -> Result<Function, RPCError>Get all functions in a colony.
pub async fn get_functions(
colonyname: &str,
prvkey: &str,
) -> Result<Vec<Function>, RPCError>Get functions registered by a specific executor.
pub async fn get_functions_by_executor(
colonyname: &str,
executorname: &str,
prvkey: &str,
) -> Result<Vec<Function>, RPCError>Remove a function.
pub async fn remove_function(
functionid: &str,
prvkey: &str,
) -> Result<(), RPCError>Add a blueprint definition.
pub async fn add_blueprint_definition(
definition: &BlueprintDefinition,
prvkey: &str,
) -> Result<BlueprintDefinition, RPCError>Get a blueprint definition.
pub async fn get_blueprint_definition(
colonyname: &str,
name: &str,
prvkey: &str,
) -> Result<BlueprintDefinition, RPCError>Get all blueprint definitions.
pub async fn get_blueprint_definitions(
colonyname: &str,
prvkey: &str,
) -> Result<Vec<BlueprintDefinition>, RPCError>Remove a blueprint definition.
pub async fn remove_blueprint_definition(
colonyname: &str,
name: &str,
prvkey: &str,
) -> Result<(), RPCError>Add a blueprint.
pub async fn add_blueprint(
blueprint: &Blueprint,
prvkey: &str,
) -> Result<Blueprint, RPCError>Get a blueprint.
pub async fn get_blueprint(
colonyname: &str,
name: &str,
prvkey: &str,
) -> Result<Blueprint, RPCError>Get blueprints by kind and location.
pub async fn get_blueprints(
colonyname: &str,
kind: &str,
location: &str,
prvkey: &str,
) -> Result<Vec<Blueprint>, RPCError>Update a blueprint.
pub async fn update_blueprint(
blueprint: &Blueprint,
force_generation: bool,
prvkey: &str,
) -> Result<Blueprint, RPCError>Remove a blueprint.
pub async fn remove_blueprint(
colonyname: &str,
name: &str,
prvkey: &str,
) -> Result<(), RPCError>Update the status of a blueprint.
pub async fn update_blueprint_status(
colonyname: &str,
name: &str,
status: HashMap<String, Value>,
prvkey: &str,
) -> Result<(), RPCError>Trigger reconciliation for a blueprint.
pub async fn reconcile_blueprint(
colonyname: &str,
name: &str,
force: bool,
prvkey: &str,
) -> Result<Process, RPCError>pub struct RPCError {
// Private fields
}
impl RPCError {
/// Returns true if this was a connection error
pub fn conn_err(&self) -> bool;
}
impl std::fmt::Display for RPCError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result;
}
impl std::error::Error for RPCError {}match colonyos::assign(colonyname, 10, prvkey).await {
Ok(process) => {
// Handle process
}
Err(e) => {
if e.conn_err() {
// Connection error - maybe retry
eprintln!("Connection error: {}", e);
} else {
// Timeout or other error
// For assign, this is normal - just continue polling
}
}
}