Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
<<<<<<< HEAD
||||||| (empty tree)
=======

>>>>>>> e2a9125 (Initial commit of Agent Substrate)
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
Expand Down
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,21 @@ Agent Substrate is designed to be **framework and agent harness agnostic**. Beca
* **Claude Code & CodeX:** Support for high-density, stateful coding environments that preserve terminal and filesystem state across sessions.
* **Model Context Protocol (MCP):** Deploy secure, sandboxed MCP servers as Substrate Actors to provide durable tools for any LLM.

## Ecosystem & Examples

* **[Agent Executor](https://github.com/google/ax):** A distributed agent runtime that demonstrates building a secure, hyper-scalable agent harness on Agent Substrate (see the [announcement blog](https://cloud.google.com/blog/products/ai-machine-learning/agent-executor-googles-distributed-agent-runtime) and [integration guide](https://github.com/google/ax/blob/main/manifests/README.md)).

## Status and compatibility

Agent Substrate is currently in VERY early development. It is not ready for
production use, and the APIs are almost guaranteed to change. We are not
making any guarantees about backward compatibility at this stage, and
everything in this project may be changed.

### Supported Kubernetes Releases

Currently we aim to support the [latest stable release](https://kubernetes.io/releases/) of Kubernetes, and the previous minor release.

## Community

For announcements, technical discussions, and community support, please join
Expand Down
47 changes: 47 additions & 0 deletions cmd/kubectl-ate/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,17 @@ gcloud beta container clusters update CLUSTER_NAME \
--location=LOCATION
```

**Local (kind):**

The kind overlay installed by `hack/install-ate-kind.sh --deploy-ate-system` already provisions an in-cluster OpenTelemetry Collector and a Jaeger all-in-one in the `otel-system` namespace. No additional setup is required.

Port-forward the Jaeger UI and invoke any command with `--trace`:
```bash
kubectl port-forward -n otel-system svc/jaeger 16686:16686 &
kubectl ate get actor my-counter-1 --trace
# open http://localhost:16686 and search for the most recent trace
```

## Global Flags
These flags can be appended to any command:

Expand Down Expand Up @@ -73,6 +84,30 @@ kubectl ate get actor <actor-id> -o yaml
kubectl ate get workers
```

> **Note:** Actors and workers are not Kubernetes CRDs — they live in the Substrate control plane (valkey/redis), not `etcd`. `kubectl get actor` and `kubectl get worker` will not return anything; only `kubectl ate get …` queries the control plane. `kubectl get actortemplate` and `kubectl get workerpool` *do* work, because those are CRDs.

#### `kubectl ate get actor` output columns

| Column | Meaning |
|---|---|
| `NAMESPACE` | The namespace of the `ActorTemplate` the actor was created from. |
| `TEMPLATE` | The `ActorTemplate` name. |
| `ID` | Actor ID. User-provided for application actors; UUID for the golden actor that each template materialises during `ResumeGoldenActor`. |
| `STATUS` | One of `STATUS_RESUMING`, `STATUS_RUNNING`, `STATUS_SUSPENDING`, `STATUS_SUSPENDED`. |
| `ATEOM POD` | The worker pod (namespace/name) currently hosting the actor. Empty while suspended. |
| `ATEOM IP` | The pod IP of that worker. Empty while suspended. |
| `VERSION` | Monotonic integer that increments on every state transition (resume / suspend / checkpoint). Useful for distinguishing snapshots. |

#### `kubectl ate get worker` output columns

| Column | Meaning |
|---|---|
| `NAMESPACE` | The `WorkerPool` namespace. |
| `POOL` | The `WorkerPool` name. |
| `POD` | The worker pod name. |
| `STATUS` | `FREE` (idle, ready to receive an actor) or `ASSIGNED` (currently hosting an actor). |
| `ASSIGNED ACTOR` | If `STATUS=ASSIGNED`, the actor reference `<namespace>/<template>/<actor-id>`. |

### Actor Lifecycle
Manage the execution state of your workloads.
*(Note: Actors are identified by a user-provided ID, which must be a valid DNS-1123 label)*
Expand All @@ -91,6 +126,18 @@ kubectl ate suspend actor my-actor
kubectl ate delete actor my-actor
```

### Logs

`kubectl ate logs` requires a resource-type subcommand; running `kubectl ate logs <id>` on its own prints help. The only supported resource type is `actors`:

```bash
# Stream logs for an actor (follows by default; aggregated across worker
# reassignments so the same actor is queryable as it teleports between pods).
kubectl ate logs actors my-actor
```

Logs are streamable only while the actor is bound to a worker (i.e., `STATUS_RUNNING`). For history across worker migrations, route through a centralized log backend (Cloud Logging, Loki, etc.); see `docs/observability.md`.

### Administration & Setup
Commands for bootstrapping the Substrate control plane and debugging local environments.

Expand Down
14 changes: 7 additions & 7 deletions cmd/servers/atenet/app/router/extproc.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (s *ExtProcServer) Process(stream extprocv3.ExternalProcessor_ProcessServer
switch reqType := req.Request.(type) {
case *extprocv3.ProcessingRequest_RequestHeaders:
start := time.Now()
hResponse, rqm, err := s.handleRequestHeaders(stream.Context(), reqType.RequestHeaders)
hResponse, rqm, target, err := s.handleRequestHeaders(stream.Context(), reqType.RequestHeaders)
if err != nil {
slog.ErrorContext(stream.Context(), "Error during ext_proc RequestHeaders processing", slog.String("err", err.Error()))
var reqErr *reqError
Expand All @@ -95,7 +95,7 @@ func (s *ExtProcServer) Process(stream extprocv3.ExternalProcessor_ProcessServer
s.recorder.AddRouterRequest(start, time.Since(start), "Error", "-", rqm)
} else {
resp.Response = &extprocv3.ProcessingResponse_RequestHeaders{RequestHeaders: hResponse}
s.recorder.AddRouterRequest(start, time.Since(start), "Route ok", "-", rqm)
s.recorder.AddRouterRequest(start, time.Since(start), "Route ok", target, rqm)
}

default:
Expand All @@ -118,14 +118,14 @@ func (s *ExtProcServer) Process(stream extprocv3.ExternalProcessor_ProcessServer
func (s *ExtProcServer) handleRequestHeaders(
ctx context.Context,
reqHeaders *extprocv3.HttpHeaders,
) (*extprocv3.HeadersResponse, *requestMetadata, error) {
) (*extprocv3.HeadersResponse, *requestMetadata, string, error) {
metadata := newRequestMetadata(reqHeaders.Headers.GetHeaders())
slog.InfoContext(ctx, "Request", slog.String("metadata", metadata.String()))

actorID, err := parseActorID(metadata.host)
if err != nil {
// Host is invalid, respond with 404.
return nil, metadata, notFoundErr
return nil, metadata, "", notFoundErr
}

slog.InfoContext(ctx, "ResumeActor", slog.String("actorID", actorID))
Expand All @@ -137,12 +137,12 @@ func (s *ExtProcServer) handleRequestHeaders(
slog.Any("err", err))

if err != nil {
return nil, metadata, fmt.Errorf("error resuming actor %s: %w", actorID, err)
return nil, metadata, "", fmt.Errorf("error resuming actor %s: %w", actorID, err)
}

workerIP := actor.GetAteomPodIp()
if ip := net.ParseIP(workerIP); ip == nil {
return nil, metadata, fmt.Errorf("actor %q did not have a valid IP %q", actorID, workerIP)
return nil, metadata, "", fmt.Errorf("actor %q did not have a valid IP %q", actorID, workerIP)
}

// TODO(bowei) -- handle more than port 80 on the actor.
Expand All @@ -158,5 +158,5 @@ func (s *ExtProcServer) handleRequestHeaders(
Response: &extprocv3.CommonResponse{
HeaderMutation: mutation,
},
}, metadata, nil
}, metadata, targetAddr, nil
}
5 changes: 4 additions & 1 deletion cmd/servers/atenet/app/router/extproc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func TestExtProcHeadersEvaluation(t *testing.T) {
},
}

res, metadata, err := s.handleRequestHeaders(context.Background(), reqHeaders)
res, metadata, target, err := s.handleRequestHeaders(context.Background(), reqHeaders)
if tc.expectErr {
if err == nil {
t.Fatalf("expected error but got nil")
Expand All @@ -129,6 +129,9 @@ func TestExtProcHeadersEvaluation(t *testing.T) {
if err != nil {
t.Fatalf("ext_proc processing error: %v", err)
}
if target != tc.expectedTarget {
t.Errorf("expected target %q, got %q", tc.expectedTarget, target)
}

mutation := res.Response.GetHeaderMutation()
if len(mutation.GetSetHeaders()) != 1 {
Expand Down
9 changes: 8 additions & 1 deletion cmd/servers/ateom-gvisor/ateom-gvisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"net"
"os"
"runtime"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -539,7 +540,13 @@ func restoreLink(ctx context.Context, link netlink.Link, info *SaveLinkInfo) err
return fmt.Errorf("while restoring addr %d onto link: %w", i, err)
}
}
for i, saveRoute := range info.Routes {
// Link-scope routes must be installed before gateway routes so the
// kernel can resolve each gateway's nexthop (fib_check_nh_v4_gw).
routes := append([]SaveRoute(nil), info.Routes...)
sort.SliceStable(routes, func(i, j int) bool {
return routes[i].Gateway == nil && routes[j].Gateway != nil
})
for i, saveRoute := range routes {
route := &netlink.Route{
LinkIndex: link.Attrs().Index,
Scope: netlink.Scope(saveRoute.Scope),
Expand Down
5 changes: 0 additions & 5 deletions internal/ategcs/ategcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,6 @@ func (s *s3Client) GetObject(ctx context.Context, bucket, object string) (io.Rea
}

func (s *s3Client) PutObject(ctx context.Context, bucket, object string, reader io.Reader) error {
// Try creating the bucket first (ignore if it already exists)
_, _ = s.client.CreateBucket(ctx, &s3.CreateBucketInput{
Bucket: aws.String(bucket),
})

_, err := s.client.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(object),
Expand Down
14 changes: 14 additions & 0 deletions manifests/ate-install/kind/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,18 @@ patches:
env:
- name: OTEL_EXPORTER_OTLP_ENDPOINT
value: http://opentelemetry-collector.otel-system.svc:4317
- patch: |-
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: atelet
namespace: ate-system
spec:
template:
spec:
containers:
- name: atelet
env:
- name: OTEL_EXPORTER_OTLP_ENDPOINT
value: http://opentelemetry-collector.otel-system.svc:4317

42 changes: 42 additions & 0 deletions manifests/ate-install/kind/rustfs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,45 @@ spec:
- name: data
persistentVolumeClaim:
claimName: rustfs-data
---
apiVersion: batch/v1
kind: Job
metadata:
name: rustfs-bucket-init
namespace: ate-system
spec:
backoffLimit: 10
template:
spec:
restartPolicy: OnFailure
containers:
- name: create-bucket
image: amazon/aws-cli:2.17.0@sha256:643507c10ada7964ca6157b3d799f030b90577643da9955d319a77399ed80d73
env:
- name: AWS_ACCESS_KEY_ID
value: rustfsadmin
- name: AWS_SECRET_ACCESS_KEY
value: rustfsadmin
- name: AWS_REGION
value: us-east-1
- name: AWS_ENDPOINT_URL
value: http://rustfs.ate-system.svc:9000
command:
- /bin/sh
- -c
- |
set -e
for i in $(seq 1 60); do
if aws s3api head-bucket --bucket ate-snapshots 2>/dev/null; then
echo "bucket ate-snapshots already exists"
exit 0
fi
if aws s3api create-bucket --bucket ate-snapshots 2>/dev/null; then
echo "bucket ate-snapshots created"
exit 0
fi
echo "waiting for rustfs to become available... ($i/60)"
sleep 2
done
echo "timed out waiting for rustfs"
exit 1