From 2ae30e6a1de6b7df44a177ffb6221a3b302aec73 Mon Sep 17 00:00:00 2001 From: Michael Kupferstein Date: Tue, 19 May 2026 16:36:39 -0400 Subject: [PATCH 01/14] Enhance documentation for KoopDB architecture, challenges, installation, scope, technologies, and workflow --- .gitignore | 4 +- docs/api.md | 10 +++-- docs/architecture.md | 19 ++++++---- docs/challenges.md | 20 +++++----- docs/installation.md | 88 +++++++++++++++++++++++++++++++++++++++----- docs/scope.md | 12 +++--- docs/technologies.md | 21 ++++++----- docs/workflow.md | 82 ++++++++++++++++++++++++----------------- 8 files changed, 176 insertions(+), 80 deletions(-) diff --git a/.gitignore b/.gitignore index 2f323a94..cb576389 100644 --- a/.gitignore +++ b/.gitignore @@ -11,4 +11,6 @@ CLAUDE.md settings.local.json create.py k8s-9/k8-logs/ -k8s/k8-logs/ \ No newline at end of file +k8s/k8-logs/ +venv/ +docs/superpowers \ No newline at end of file diff --git a/docs/api.md b/docs/api.md index 968a2a8e..865d8526 100644 --- a/docs/api.md +++ b/docs/api.md @@ -2,7 +2,7 @@ KoopDB exposes a subset of the Amazon S3 API. Authentication is intentionally out of scope for this project; all requests are accepted anonymously and any credentials supplied by an S3 client are ignored. Use the normal S3 API to interact. -> **Note on `ETag`:** ETags are not relevant for KoopDB — the gateway does not generate or return ETag headers, and the XML body of `CompleteMultipartUpload` is parsed for `` only; any `` values supplied by the client are ignored. +> **Note on `ETag`:** ETags are not meaningful in KoopDB — the gateway returns `ETag` headers populated with placeholder values (random or dummy), not real content hashes, so they should not be used for integrity checks. The XML body of `CompleteMultipartUpload` is parsed for `` only; any `` values supplied by the client are ignored. ## Base URL The API is available via the Query Processor services, typically exposed on ports `9001-9003` in a default Docker Compose setup. @@ -19,6 +19,7 @@ The API is available via the Query Processor services, typically exposed on port ### Bucket Operations | Method | Path | Description | S3 Operation | | :--- | :--- | :--- | :--- | +| `GET` | `/` | Lists all buckets. Returns a `ListAllMyBucketsResult` XML payload. | `ListBuckets` | | `PUT` | `/{bucket}` | Creates a new bucket. | `CreateBucket` | | `DELETE` | `/{bucket}` | Deletes an empty bucket. | `DeleteBucket` | | `GET` | `/{bucket}` | Lists objects in a bucket (v2). Supports `prefix` and `max-keys` query params. | `ListObjectsV2` | @@ -31,10 +32,11 @@ The API is available via the Query Processor services, typically exposed on port | :--- | :--- | :--- | :--- | | `PUT` | `/{bucket}/{key}` | Uploads an object. Max size defaults to 100MB. | `PutObject` | | `GET` | `/{bucket}/{key}` | Retrieves an object. Returns binary stream. | `GetObject` | +| `HEAD` | `/{bucket}/{key}` | Returns object size and `Last-Modified` without the body. | `HeadObject` | | `DELETE` | `/{bucket}/{key}` | Deletes an object. | `DeleteObject` | ### Multipart Upload Operations -| Method | Path | Queue Parameters | Description | S3 Operation | +| Method | Path | Query Parameters | Description | S3 Operation | | :--- | :--- | :--- | :--- | :--- | | `POST` | `/{bucket}/{key}` | `?uploads` | Initiates a new multipart upload. Returns an XML payload with `UploadId`. | `CreateMultipartUpload` | | `PUT` | `/{bucket}/{key}` | `?partNumber=N&uploadId=X` | Uploads a specific part for an upload ID. | `UploadPart` | @@ -74,6 +76,8 @@ The API returns standard S3 XML error responses when applicable. **Common Error Codes:** - `NoSuchBucket`: The specified bucket does not exist. - `NoSuchKey`: The specified key does not exist. -- `EntityTooLarge`: The object exceeded maximum allowed size. +- `NoSuchUpload`: The specified multipart `uploadId` does not exist. +- `InvalidPart`: A part referenced in `CompleteMultipartUpload` is missing or invalid. +- `InvalidRequest`: The request is malformed (e.g. missing `Content-Length` on `PUT`, or `POST` without `?uploads`/`?uploadId`). - `InternalError`: Unexpected server-side failure. - `NotImplemented`: The requested S3 operation is not supported by Koop. diff --git a/docs/architecture.md b/docs/architecture.md index 2da0ab97..8e108c72 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -20,16 +20,21 @@ The architecture consists of four primary services: - **Storage Engine:** Uses **RocksDB** (embedded key-value store) for the OpLog, object metadata (including each object's byte size, recorded on PUT so HEAD, GET, and ListObjectsV2 can report content length without re-reading shards and so multipart completion can validate part sizes), and bucket tables. - **Operation Log:** Maintains a per-partition, sequenced log of mutations (PUT, DELETE, bucket ops) used for repair. - **Repair Worker Pool:** Background workers (`RepairWorkerPool` in `StorageNodeServerV2`) detect missed sequence numbers when a node comes back online and replay them from peers to catch up. + - **Garbage Collector:** A `GarbageCollectorWorker` per node walks the OpLog forward from a persistent `gc_cursor`, using a gossip-derived watermark to reap stale (superseded) versions in RocksDB transactions. A companion `BlobDeletionWorker` physically removes the blob bytes for keys the GC pass has marked obsolete. + - **Multipart Chunk Storage:** Multipart parts are stored as ordinary shards under internal keys prefixed with `__mpu__:` on the same `/store/{partition}/{bucket}/` path; on `CompleteMultipartUpload` the QP emits a `MultipartCommitMessage` whose payload lists the chunks rather than a single blob. 3. **Kafka (Sequencer / Pub-Sub):** - Provides total ordering of mutating operations on a per-partition basis. - - Query Processors publish ordered commit messages (`PutMessage`, `DeleteMessage`, `CreateBucketMessage`, `DeleteBucketMessage`, `MultipartCommitMessage`) to partition-keyed topics. + - Query Processors publish ordered commit messages (`FileCommitMessage`, `MultipartCommitMessage`, `DeleteMessage`, `CreateBucketMessage`, `DeleteBucketMessage` — the full sealed list in [`Message`](../common-lib/src/main/java/com/github/koop/common/messages/Message.java)) to partition-keyed topics. - Storage nodes consume those messages and apply the corresponding atomic RocksDB writes. 4. **Coordination Cluster (Metadata & State):** - **Etcd (3-node quorum):** Stores cluster topology, erasure-set configuration, and partition→erasure-set mapping. Acts as the source of truth for routing config; Query Processors and Storage Workers watch the relevant keys for changes. - **Redis:** Used by Query Processors to track multipart upload session state (active sessions, uploaded parts, cached part sizes). A `MemoryCacheClient` is available as an in-process substitute for dev/test. +5. **Nginx Ingress (Kubernetes deployments only):** + - Acts as the public-facing entry point in the k8s deployments, fronting the three Query Processor replicas (`query-processor-1..3:8080`) as an `upstream`. Disables request buffering and `client_max_body_size` so large object PUTs stream straight through to the QPs. Configured in [`nginx-k8s.conf`](../nginx-k8s.conf); not present in the local `docker-compose.yml` topology, where clients hit Query Processors on ports `9001-9003` directly. + ### Data Flow #### 1. PUT Object @@ -37,19 +42,19 @@ The architecture consists of four primary services: The system uses a two-stage write: shards are streamed first, and only after a write quorum acknowledges the shard upload is an ordered commit message published. This is **not** XA-style two-phase commit — there is no prepare/abort vote — but it does separate durable shard placement from the ordered metadata commit. 1. **Client** sends a `PUT /{bucket}/{key}` request to any Query Processor. -2. **QP** receives the data stream and erasure-encodes it into `n` total shards (`m` data + `k = n − m` parity). The defaults depend on deployment: the `docker-compose.yml` `etcd-seeder` block configures `n=6, m=4, k=2, write_quorum=5` (tolerates 2 node failures), while the system-tests cluster and the recommended full deployment use `n=9, m=6, k=3, write_quorum=7` (tolerates 3 node failures). Both configurations are read from `erasure_set_configurations` in Etcd at startup. -3. **QP** looks up the partition's erasure set in Etcd and streams shards to the target storage nodes concurrently over HTTP (`PUT /store/{partition}/{storageKey}`). +2. **QP** receives the data stream and erasure-encodes it into `n` total shards (`m` data + `k = n − m` parity). The defaults depend on deployment: the `docker-compose.yml` `etcd-seeder` block (used by the local dev stack **and** the system-tests cluster, since [`DockerComposeS3E2EIT`](../system-tests/src/test/java/koop/DockerComposeS3E2EIT.java) launches `../docker-compose.yml`) configures `n=6, m=4, k=2, write_quorum=5` (tolerates 2 node failures); the larger k8s deployments — [`docker-compose.k8s.yml`](../docker-compose.k8s.yml), [`docker-compose.k8s-local.yml`](../docker-compose.k8s-local.yml), and [`k8s-9/`](../k8s-9/) — use `n=9, m=6, k=3, write_quorum=7` (tolerates 3 node failures). Both configurations are read from `erasure_set_configurations` in Etcd at startup. +3. **QP** looks up the partition's erasure set in Etcd and streams shards to the target storage nodes concurrently over HTTP (`PUT /store/{partition}/{bucket}/{key}`). 4. **QP** waits for at least `write_quorum` shard upload ACKs from the storage nodes. -5. **QP** publishes an ordered `PutMessage` via Kafka to the partition's topic. +5. **QP** publishes an ordered `FileCommitMessage` via Kafka to the partition's topic. 6. **Storage Nodes** consume the sequenced commit message and atomically write OpLog + Metadata in RocksDB. 7. **QP** waits for `write_quorum` commit ACKs and then responds `200 OK` to the client. #### 2. GET Object 1. **Client** sends a `GET /{bucket}/{key}` request. -2. **QP** resolves the erasure set for the key and queries all `n` storage nodes for their shard. -3. **QP** reconciles version conflicts: the version that a read quorum agrees on is selected. If no version reaches quorum the request fails with `500 InternalError`. -4. **QP** reconstructs the original object from any `k` available shards via Reed-Solomon decoding. +2. **QP** resolves the erasure set for the key and queries all `n` storage nodes for shard metadata. +3. **QP** picks the **maximum reported version** across responding nodes. If that version's record is a tombstone the request returns `404 NoSuchKey`; if no shards respond at all the request also returns `404`. +4. **QP** reconstructs the object via Reed-Solomon decoding from the shards at that max version. If fewer than `m` shards are available at the chosen version it falls back to an older version via `reconstructFromOlderVersion`; only an outright reconstruction failure surfaces as `500 InternalError`. 5. **QP** streams the data back to the client. #### 3. DELETE / Bucket Ops diff --git a/docs/challenges.md b/docs/challenges.md index e43c5583..3676a482 100644 --- a/docs/challenges.md +++ b/docs/challenges.md @@ -41,17 +41,15 @@ This throughput increase aligns with the benefits of system scalability. ### Discovery Cache for Dead Storage Nodes -- **Challenge:** Without health awareness, every QP request paid the full per-node HTTP timeout (10 s connect + 30 s read) for each unreachable storage node. One dead node added up to 30 s to every GET/PUT; three dead nodes (still within fault tolerance) inflated sub-second requests to 90 s+. +- **Challenge:** Without health awareness, every QP request paid the full per-request HTTP timeout (today: a 10 s connect timeout on the data-path `HttpClient`) for each unreachable storage node. One dead node added the full connect-timeout cost to every GET/PUT; three dead nodes (still within fault tolerance) compounded that delay, inflating sub-second requests into multi-second ones. - **Sub-challenge — stale health cache:** A cached "DOWN" marking can become wrong: the node may have recovered, or a transient network blip may have falsely marked a healthy node as down. A naive hard-exclusion cache would prevent recovery indefinitely (false negative) or shed load unnecessarily (false positive). - **Solution:** Each QP runs a [`NodeHealthTracker`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/NodeHealthTracker.java) (state machine: `HEALTHY → SUSPECT → DOWN`) fed by both a periodic [`NodeHealthProbe`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/NodeHealthProbe.java) (5 s interval, `GET /health`) and outcomes of real data-path requests. Data-path operations **soft-exclude** DOWN nodes — they are skipped only while enough healthy peers remain to make quorum; otherwise the path falls back to contacting every node. Any successful response from a DOWN-marked node immediately promotes it back to `HEALTHY` (opportunistic recovery). -- **Benefit:** Dead-node latency drops from ~30 s to sub-second on the fast path, the quorum denominator is never reduced, and the cache self-heals against both false positives (probe re-detects recovery) and false negatives (data-path successes override stale DOWN markings). +- **Benefit:** Dead-node latency drops to sub-second on the fast path, the quorum denominator is never reduced, and the cache self-heals against both false positives (probe re-detects recovery) and false negatives (data-path successes override stale DOWN markings). ## Write Consistency - **Challenge:** Ensuring shards are successfully written to a write quorum of nodes. -- **Solution:** The system employs a two-phase commit strategy. - -The shards are first uploaded to the nodes; once all are uploaded, a commit message is dispatched via Kafka. +- **Solution:** The system uses a **two-stage write** (not XA-style two-phase commit — there is no prepare/abort vote). Shards are first streamed to the storage nodes; once a write quorum of shard ACKs is received, a single ordered commit message is dispatched via Kafka and storage nodes apply the corresponding RocksDB write when they consume it. ## PUT/DELETE Ordering @@ -62,16 +60,16 @@ The shards are first uploaded to the nodes; once all are uploaded, a commit mess ## Read Consistency & Version Conflicts - **Challenge:** Different nodes may return conflicting versions of data during a read. -- **Solution:** The system returns the highest version agreed upon by a quorum. +- **Solution:** The QP selects the **maximum version** reported across responding storage nodes and reconstructs from the shards at that version. ### Handling Stale Data -- If a version lacks a quorum, it is not considered fully acknowledged or logically valid. -- In cases of missing quorum, the highest quorum version is returned instead. +- If the chosen max version has fewer than `m` shards available (so Reed-Solomon cannot reconstruct from it), the QP falls back to an older version via `reconstructFromOlderVersion` instead of failing. +- A latest-version tombstone short-circuits to `404 NoSuchKey`; an outright reconstruction failure surfaces as `500 InternalError`. ### Garbage Collection -- Consensus among storage nodes determines the latest version that is safe to delete, utilizing a gossip-based watermark. +- Storage nodes gossip about their per-partition cursors and active reads; the resulting **gossip-derived watermark** (not a formal consensus protocol) tells each node which versions are safe to physically reap. ## Data Version Overload @@ -86,5 +84,5 @@ The shards are first uploaded to the nodes; once all are uploaded, a commit mess ## Bucket Item Consistency -- **Challenge:** Storage nodes commit in order, but not always simultaneously, potentially affecting bucket item consistency across nodes. -- **Solution:** When streaming items in a bucket, an item is considered to be in the system if it is present on at least a write quorum of nodes. \ No newline at end of file +- **Challenge:** Storage nodes commit in order, but not always simultaneously, so the per-node view of a bucket can diverge briefly. +- **Solution:** Bucket listings are served per-partition from a single healthy storage node — [`fetchListFromAnyNode`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/StorageWorker.java) walks the healthy node list and returns the first successful response. Because every mutation is Kafka-sequenced and each storage node converges on the same per-partition log, any node that has caught up gives the same answer; the eventual-consistency window during a divergence is bounded by Kafka consumer lag rather than reconciled per-item across replicas. \ No newline at end of file diff --git a/docs/installation.md b/docs/installation.md index f5c76ea1..7432b464 100644 --- a/docs/installation.md +++ b/docs/installation.md @@ -1,6 +1,11 @@ # Installation and Setup -KoopDB uses Docker Compose for orchestrating its services (Query Processors, Storage Nodes, Etcd, Redis). +KoopDB ships two deployment flavors: + +- **Docker Compose** — single-host local stack (default for development and the `system-tests` suite). +- **Kubernetes (k3s)** — multi-node cluster deployment, available in two sizings under [`k8s/`](../k8s/) and [`k8s-9/`](../k8s-9/). + +Both orchestrate the same services (Query Processors, Storage Nodes, Etcd, Kafka, Redis), so the env-var and erasure-config sections below apply to either. ## Prerequisites @@ -41,39 +46,55 @@ KoopDB uses Docker Compose for orchestrating its services (Query Processors, Sto ```bash docker-compose ps ``` - You should see containers for `storage-node`, `query-processor`, `etcd`, and `redis-master` in the `Up` state. + You should see `storage-node-1..6`, `query-processor-1..3`, `etcd1/etcd2/etcd3`, `kafka`, and `redis-master` in the `Up` state (plus a short-lived `etcd-seeder` that exits after seeding). ## Service Environment Variables -If you run services outside Docker Compose, you must supply these environment variables. +If you run services outside Docker Compose, you must supply these environment variables. The lists below match what the code actually reads (`System.getenv` callsites) and what the in-tree compose / k8s manifests set. ### Storage Node | Variable | Description | | --- | --- | +| `APP_PORT` | HTTP listen port (default `8080`) | +| `STORAGE_DIR` | On-disk persistence directory (the k8s manifests mount `/storage`) | | `ETCD_URL` | Etcd endpoint (e.g. `http://etcd1:2379`) | -| `REDIS_URL` | Redis URL (e.g. `redis://redis-master:6379`) | | `KAFKA_BOOTSTRAP_SERVERS` | Kafka bootstrap servers (e.g. `kafka:9092`) | | `NODE_IP` | This node's identity / advertised hostname | +| `LOG_LEVEL` | log4j level (optional; both manifests set `DEBUG`) | ### Query Processor | Variable | Description | | --- | --- | -| `APP_PORT` | HTTP listen port (default `8080`) | | `ETCD_URL` | Etcd endpoint | -| `REDIS_URL` | Redis URL (multipart session state) | +| `REDIS_URL` | Redis URL for multipart session state (default `redis://localhost:6379`) | | `KAFKA_BOOTSTRAP_SERVERS` | Kafka bootstrap servers | -| `STORAGE_NODE_URL` | A storage node URL used during initial bootstrap | | `NODE_IP` | This QP's identity for logging/metrics | +| `LOG_LEVEL` | log4j level (optional; both manifests set `DEBUG`) | + +> The QP HTTP port is hardcoded to `8080`; `APP_PORT` and `STORAGE_NODE_URL` are still set in some compose files but no longer read by the QP and should be considered vestigial. ## Erasure Configuration -> **Note on cluster sizing:** This guide describes the `docker-compose.yml` setup, which runs a **6-node cluster** with `n = 6, m = 4, write_quorum = 5` (4 data + 2 parity shards, tolerates 2 node failures). The system-tests integration suite (`system-tests/src/test/java/koop/RealStorageNodesIT.java`) and the recommended full deployment use a **9-node cluster** with `n = 9, m = 6, write_quorum = 7` (6 data + 3 parity, tolerates 3 failures). The architecture and workflow documents describe the 9-node configuration as the default; the flow is identical, only the seeded numbers and the number of storage-node replicas differ. +The erasure-set parameters are seeded into Etcd at startup. Two sizings ship in-tree: + +| Deployment | Erasure config | Tolerance | +| --- | --- | --- | +| [`docker-compose.yml`](../docker-compose.yml) (local dev + system-tests) | n=6, m=4, write_quorum=5 (1 erasure set, 6 storage-node containers) | 2 node failures | +| [`docker-compose-local.yml`](../docker-compose-local.yml) | n=6, m=4, write_quorum=5 | 2 node failures | +| [`k8s/`](../k8s/) (small k3s deployment) | n=6, m=4, write_quorum=5 (2 erasure sets × 6 pods) | 2 node failures | +| [`docker-compose.k8s.yml`](../docker-compose.k8s.yml) / [`docker-compose.k8s-local.yml`](../docker-compose.k8s-local.yml) | n=9, m=6, write_quorum=7 | 3 node failures | +| [`k8s-9/`](../k8s-9/) (large k3s deployment) | n=9, m=6, write_quorum=7 (2 erasure sets × 9 pods) | 3 node failures | -The erasure-set parameters are seeded into Etcd at startup by the `etcd-seeder` service in `docker-compose.yml`. To change `n`, `m`, `write_quorum`, the participating storage nodes, or the partition spread, edit the `etcd-seeder` `command` block in `docker-compose.yml` and restart the cluster. The seeded keys are: +The flow is identical across all of them; only the seeded numbers and the number of storage-node replicas differ. The seeded etcd keys are: - `erasure_set_configurations` — list of erasure sets, each with `n` (total shards), `m` (data shards), `write_quorum`, and the list of `(ip, port)` machines. The parity-shard count is computed as `k = n − m`. - `partition_spread_configurations` — mapping of partitions to erasure sets. +**How to change the topology:** + +- *Docker Compose:* edit the `etcd-seeder` `command` block in the relevant `docker-compose*.yml` and restart the cluster. +- *Kubernetes:* edit the JSON files under [`k8s/etcd-data/`](../k8s/etcd-data/) (or [`k8s-9/etcd-data/`](../k8s-9/etcd-data/)). [`deploy.sh`](../k8s/deploy.sh) mounts them as the `etcd-seeder-config` ConfigMap consumed by [`05-etcd-seeder.yaml`](../k8s/05-etcd-seeder.yaml). + ## Stopping the Cluster To stop the cluster and remove containers/networks: @@ -86,6 +107,55 @@ To remove volumes (persisted data) as well: docker-compose down -v ``` +## Kubernetes Deployment + +The [`k8s/`](../k8s/) (6-node erasure sets) and [`k8s-9/`](../k8s-9/) (9-node erasure sets) directories contain a self-contained k3s deployment: manifests + helper scripts. Both target a remote control-plane node configured in [`config.sh`](../k8s-9/config.sh) (`SERVER`, `DOCKERHUB_USER`, `SSH_PASS`) and apply everything into a `koopdb` namespace. + +### Manifest layout + +| File | Purpose | +| --- | --- | +| `00-namespace.yaml` | Creates the `koopdb` namespace | +| `01-etcd.yaml` | 3-replica etcd StatefulSet (`etcd-headless` + `etcd` services) | +| `02-kafka.yaml` | Kafka broker (KRaft mode) | +| `03-redis.yaml` | Redis StatefulSet (multipart session state) | +| `04-storage-nodes.yaml` | Two `storage-node-setN` StatefulSets with pod anti-affinity per erasure set | +| `05-etcd-seeder.yaml` | One-shot Job that writes `erasure_set_configurations` + `partition_spread_configurations` from the `etcd-seeder-config` ConfigMap | +| `06-query-processor.yaml` | 3-replica QP Deployment + `ClusterIP` Service | +| `07-nginx.yaml` | 2-replica nginx fronting the QP service, exposed as `NodePort` on `30080` | + +Nodes are pinned with the `koopdb/role` label: `cp` for control-plane (etcd, seeder) and `worker` for data-plane (storage nodes, QPs, nginx, kafka, redis). [`label-nodes.sh`](../k8s-9/label-nodes.sh) holds the IP-to-role mapping for the target cluster. + +### Scripts + +| Script | What it does | +| --- | --- | +| [`build.sh`](../k8s-9/build.sh) | Builds `${DOCKERHUB_USER}/query-processor:latest` and `${DOCKERHUB_USER}/storage-node:latest` and pushes both to Docker Hub | +| [`label-nodes.sh`](../k8s-9/label-nodes.sh) | Labels k3s nodes `koopdb/role=cp|worker` | +| [`deploy.sh`](../k8s-9/deploy.sh) | SCPs the manifests, generates the `etcd-seeder-config` ConfigMap from `etcd-data/*.json`, patches the image references, and `kubectl apply`s the manifests in sorted order | +| [`status.sh`](../k8s-9/status.sh) | Lists pods / services / StatefulSets / Deployments in the namespace | +| [`logs.sh`](../k8s-9/logs.sh), [`all_logs.sh`](../k8s-9/all_logs.sh) | Tail pod logs | +| [`teardown.sh`](../k8s-9/teardown.sh) | `kubectl delete namespace koopdb` | + +### Typical workflow + +```bash +# From the repo root, against the cluster configured in k8s-9/config.sh +k8s-9/build.sh # build + push images +k8s-9/label-nodes.sh # one-time per fresh k3s cluster +k8s-9/deploy.sh # apply manifests +k8s-9/status.sh # check pods are Ready +``` + +### Accessing the cluster + +The nginx ingress exposes the QPs on `NodePort 30080`: + +```bash +curl http://:30080/health +aws --endpoint-url=http://:30080 s3 ls +``` + ## Accessing the API The Query Processors expose an S3-compatible API on ports `9001`, `9002`, and `9003`. You can point any S3 client to these endpoints. diff --git a/docs/scope.md b/docs/scope.md index 8328417c..b0cd970f 100644 --- a/docs/scope.md +++ b/docs/scope.md @@ -6,14 +6,14 @@ The KoopDB project focuses on building a **distributed, fault-tolerant object st ### Core Features - **S3 Compatibility:** Support for standard S3 tools (like AWS CLI and SDKs) for fundamental operations. -- **Distributed Architecture:** A headless cluster of Query Processors (gateways) and Storage Nodes. -- **Fault Tolerance:** Data is sharded using **Erasure Coding** (Reed-Solomon equivalent), allowing the system to survive node failures without data loss. -- **Metadata Management:** Centralized, consistent metadata storage using **Etcd** to manage partition maps +- **Distributed Architecture:** A horizontally scaled cluster of Query Processors (stateless gateways) and Storage Nodes, with no single coordinator on the data path. +- **Fault Tolerance:** Data is sharded using **Reed-Solomon erasure coding**, allowing the system to survive node failures without data loss. +- **Metadata Management:** Centralized, consistent metadata storage using **Etcd** to manage partition maps. - **Multipart Uploads:** Support for uploading large objects in parts, reassembled upon completion. -### supported Operations -- **Buckets:** Create, Delete, List, Head. -- **Objects:** Put, Get, Delete. +### Supported Operations +- **Buckets:** Create, Delete, List, Head, List Buckets. +- **Objects:** Put, Get, Head, Delete. - **Multipart:** Initiate, Upload Part, Complete, Abort. ## Out of Scope diff --git a/docs/technologies.md b/docs/technologies.md index bf64e91c..01a2ced7 100644 --- a/docs/technologies.md +++ b/docs/technologies.md @@ -14,23 +14,26 @@ KoopDB leverages a modern Java stack focused on high-performance concurrency and - Configured to use Virtual Threads for optimal throughput. ### Storage Engine -- **RocksDB (8.10.0):** An embeddable persistent key-value store for fast storage. - - Used by Storage Nodes to persist object metadata, data shards, and operation logs on disk. - - Efficient for high write throughput (LSM-Tree based). +- **RocksDB (8.10.0):** An embeddable persistent key-value store, used by Storage Nodes for **metadata only**. Column families: `op_log`, `metadata`, `buckets`, `uncommitted_writes`, `repair_queue`, `pending_deletes`, `gc_cursors` (see [`RocksDbStorageStrategy`](../storage-node/src/main/java/com/github/koop/storagenode/db/RocksDbStorageStrategy.java)). The RocksDB instance lives under `${STORAGE_DIR}/rocksdb`. +- **Filesystem (FileChannel):** The actual shard bytes are written as plain files on disk via `java.nio.channels.FileChannel`, under directories sibling to the RocksDB instance inside `${STORAGE_DIR}`. + +### Erasure Coding +- **Backblaze JavaReedSolomon:** Reed-Solomon encoder/decoder used by [`ErasureCoder`](../common-lib/src/main/java/com/github/koop/common/erasure/ErasureCoder.java) on both the encode (PUT) and decode (GET) paths. Dependency `com.github.Backblaze:JavaReedSolomon` in [`common-lib/pom.xml`](../common-lib/pom.xml). ### Serialization & Logging - **Jackson (2.17.0):** JSON serialization/deserialization for API responses and internal messages. - **Log4j 2 (2.23.1):** Asynchronous logging framework. ### Distributed Coordination -- **Etcd (via Jetcd):** Distributed key-value store holding cluster topology, erasure-set configuration, and partition→erasure-set mapping. Run as a 3-node quorum. -- **Redis:** In-memory store used by Query Processors to track multipart upload session state (active sessions, uploaded part numbers, cached part sizes). Not used for ordering. -- **Kafka:** Per-partition sequencer and pub/sub bus for ordered commit messages (`PutMessage`, `DeleteMessage`, `CreateBucketMessage`, `DeleteBucketMessage`, `MultipartCommitMessage`). Provides total ordering of mutations within a partition; "last write" is then determined by the sequence numbers persisted in each storage node's RocksDB metadata table, resolved at read time via read quorum. +- **Etcd (via Jetcd 0.8.6):** Distributed key-value store holding cluster topology, erasure-set configuration, and partition→erasure-set mapping. Run as a 3-node quorum. The QP and Storage Nodes register Etcd watches so config changes propagate without a restart. +- **Redis (via Jedis 5.1.0):** In-memory store used by Query Processors to track multipart upload session state (active sessions, uploaded part numbers, cached part sizes). Not used for ordering. +- **Kafka (kafka-clients 3.7.0):** Per-partition sequencer and pub/sub bus for ordered commit messages (`FileCommitMessage`, `MultipartCommitMessage`, `DeleteMessage`, `CreateBucketMessage`, `DeleteBucketMessage`). Provides total ordering of mutations within a partition; "last write" is then determined by the sequence numbers persisted in each storage node's RocksDB metadata table. Reads pick the max version reported across storage nodes and reconstruct from ≥ `m` shards (falling back to an older version if not enough shards are present). ### Testing & Verification -- **JUnit 5:** Unit and integration testing. -- **AWS SDK for Java (2.x):** Used in system tests to verify S3 compatibility against the running cluster. -- **In-process integration cluster:** End-to-end tests (e.g. `RealStorageNodesIT`) start Etcd, Redis, Kafka, and storage nodes in-process rather than via Testcontainers. +- **JUnit 5 (5.10.2):** Unit and integration testing. +- **AWS SDK for Java v2 (`software.amazon.awssdk:s3`):** Used in system tests to drive the cluster through a real S3 client and verify S3-compatible behavior. +- **Testcontainers:** Used by [`DockerComposeS3E2EIT`](../system-tests/src/test/java/koop/DockerComposeS3E2EIT.java) (via `ComposeContainer`) to bring up the actual `docker-compose.yml` stack for true end-to-end tests. +- **In-process integration cluster:** Suites like [`RealStorageNodesIT`](../system-tests/src/test/java/koop/RealStorageNodesIT.java) spin up 9 `StorageNodeServerV2` instances inside a single JVM and substitute the external dependencies with in-memory fakes — `MemoryFetcher` for Etcd and `MemoryPubSub` for Kafka — so quorum, version, and repair logic can be exercised without Docker. ## Infrastructure - **Docker & Docker Compose:** Containerization of all components for consistent development and deployment environments. diff --git a/docs/workflow.md b/docs/workflow.md index 16c8986c..9096dcdf 100644 --- a/docs/workflow.md +++ b/docs/workflow.md @@ -7,9 +7,12 @@ This document displays the **planned target-state** workflow diagrams for each s > - [Architecture Overview](architecture.md) — system component overview and redundancy model > **Erasure configuration shown in the diagrams:** The sequence diagrams below depict the -> recommended full-deployment configuration of `n = 9` total shards (`m = 6` data + `k = 3` -> parity, `write_quorum = 7`), which is what the system-tests cluster uses. The -> `docker-compose.yml` setup runs a smaller cluster (`n = 6, m = 4, k = 2, write_quorum = 5`) +> larger deployment configuration of `n = 9` total shards (`m = 6` data + `k = 3` +> parity, `write_quorum = 7`), which is what the [`k8s-9/`](../k8s-9/) deployment and the +> [`docker-compose.k8s*.yml`](../docker-compose.k8s.yml) files seed. The default local +> [`docker-compose.yml`](../docker-compose.yml) (used by the system-tests E2E suite via +> [`DockerComposeS3E2EIT`](../system-tests/src/test/java/koop/DockerComposeS3E2EIT.java)) +> and the smaller [`k8s/`](../k8s/) deployment run `n = 6, m = 4, k = 2, write_quorum = 5` > over 6 storage nodes; the flow is identical, only the numbers change. Where this document > writes `≥ m` it means "at least the data-shard count" (reads need `m` shards to reconstruct); > where it writes `≥ write_quorum` it means the shard upload / commit ACK threshold from @@ -40,7 +43,7 @@ This document displays the **planned target-state** workflow diagrams for each s ## System Component Overview -The following diagram shows the high-level components involved in every workflow. Each request enters through the S3-compatible HTTP gateway ([`Main.java`](../query-processor/src/main/java/com/github/koop/queryprocessor/gateway/Main.java)), is processed by the [`StorageWorker`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/StorageWorker.java), and is fanned out to the storage nodes in the target erasure set. Kafka (pub/sub) provides per-partition sequencing for write/delete and multipart commit ordering — commit messages are published to a partition-keyed topic so that all storage nodes in the erasure set apply mutations for a given partition in the same order. Redis (or an in-memory [`MemoryCacheClient`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/cache/MemoryCacheClient.java) in dev/test) is used by the Query Processor for multipart upload session tracking. Storage nodes persist shard data to disk and maintain operation metadata in RocksDB through [`StorageNodeServer`](../storage-node/src/main/java/com/github/koop/storagenode/StorageNodeServer.java), [`StorageNodeV2`](../storage-node/src/main/java/com/github/koop/storagenode/StorageNodeV2.java), and [`Database`](../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java). +The following diagram shows the high-level components involved in every workflow. Each request enters through the S3-compatible HTTP gateway ([`Main.java`](../query-processor/src/main/java/com/github/koop/queryprocessor/gateway/Main.java)), is processed by the [`StorageWorker`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/StorageWorker.java), and is fanned out to the storage nodes in the target erasure set. Kafka (pub/sub) provides per-partition sequencing for write/delete and multipart commit ordering — commit messages are published to a partition-keyed topic so that all storage nodes in the erasure set apply mutations for a given partition in the same order. Redis (or an in-memory [`MemoryCacheClient`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/cache/MemoryCacheClient.java) in dev/test) is used by the Query Processor for multipart upload session tracking. Storage nodes persist shard bytes as files on disk and maintain operation metadata in RocksDB through [`StorageNodeServerV2`](../storage-node/src/main/java/com/github/koop/storagenode/StorageNodeServerV2.java), [`StorageNodeV2`](../storage-node/src/main/java/com/github/koop/storagenode/StorageNodeV2.java), and [`Database`](../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java). ![System Container Overview](diagrams/Containers.svg) @@ -63,7 +66,7 @@ sequenceDiagram C->>QP: PUT /{bucket}/{key} [data] QP->>QP: Erasure-encode → n shards (here 9) par Stream shards concurrently - QP->>SN: PUT /store/{partition}/{storageKey}?requestId=X [shard i] + QP->>SN: PUT /store/{partition}/{bucket}/{key}?requestId=X [shard i] SN-->>QP: 200 shard received end Note over QP: Wait for shard upload quorum (≥ write_quorum; here ≥7) @@ -98,7 +101,7 @@ flowchart TD The QP queries all `n` storage nodes for their shard. At least `m` shards (the erasure-coding data-shard count) must be available to reconstruct the object. The QP streams the reconstructed data back to the client. -See [`StorageWorker.get()`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/StorageWorker.java:180) and [`StorageNode.retrieve()`](../storage-node/src/main/java/com/github/koop/storagenode/StorageNode.java:171). +See [`StorageWorker.get()`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/StorageWorker.java) and [`StorageNodeV2.retrieve()`](../storage-node/src/main/java/com/github/koop/storagenode/StorageNodeV2.java). ```mermaid sequenceDiagram @@ -110,7 +113,7 @@ sequenceDiagram C->>QP: GET /{bucket}/{key} QP->>QP: Hash key → erasure set + partition par Request shards concurrently - QP->>SN: GET /store/{partition}/{storageKey} + QP->>SN: GET /store/{partition}/{bucket}/{key} SN->>Disk: Read shard via "current" version pointer Disk-->>SN: shard bytes SN-->>QP: 200 + shard bytes @@ -126,15 +129,21 @@ sequenceDiagram ### Conflicting Versions -When nodes return different versions of the same key (e.g., a write is mid-commit), the QP returns the version that at least a read quorum of `m` nodes agree on (in the 9-node configuration, that is `6/9`). If no version reaches quorum, the operation fails immediately with `500 InternalError` — the system does **not** wait for stabilization. +When nodes report different versions of the same key (e.g. a write is mid-commit), the QP picks the **maximum reported version** across responders and tries to reconstruct from the shards at that version. If fewer than `m` shards are present at the max version, it falls back to an older version via `reconstructFromOlderVersion` rather than failing. A latest-version tombstone short-circuits to `404 NoSuchKey`; only an outright reconstruction failure surfaces as `500 InternalError`. ```mermaid flowchart TD - A[Collect shard responses] --> B{All nodes agree on version?} - B -- Yes --> C[Erasure-decode + return to client] - B -- No --> D{Does a read quorum ≥ m share the same version?} - D -- Yes --> E[Use quorum version → decode + return] - D -- No --> F[500 Internal Error\nNo quorum version available] + A[Collect shard metadata responses] --> B{Any shards responded?} + B -- No --> N[404 NoSuchKey] + B -- Yes --> C[Pick maxVersion across responders] + C --> T{Is maxVersion a tombstone?} + T -- Yes --> N + T -- No --> Q{≥ m shards at maxVersion?} + Q -- Yes --> D[Reconstruct at maxVersion → return] + Q -- No --> F[reconstructFromOlderVersion: try the next-lower version that has ≥ m shards] + F --> R{Reconstruction succeeded?} + R -- Yes --> D + R -- No --> X[500 InternalError] ``` --- @@ -175,7 +184,7 @@ sequenceDiagram **Route:** `PUT /{bucket}` -Bucket creation is sequenced through Kafka/pub/sub. Storage nodes store the bucket record in their RocksDB bucket table (via [`Database.createBucket()`](../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java:81) and [`StorageNodeV2.createBucket()`](../storage-node/src/main/java/com/github/koop/storagenode/StorageNodeV2.java:248)). A write quorum must acknowledge before the client is notified. +Bucket creation is sequenced through Kafka/pub/sub. Storage nodes store the bucket record in their RocksDB bucket table (via [`Database.createBucket()`](../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java) and [`StorageNodeV2.createBucket()`](../storage-node/src/main/java/com/github/koop/storagenode/StorageNodeV2.java)). A write quorum must acknowledge before the client is notified. ```mermaid sequenceDiagram @@ -205,7 +214,7 @@ sequenceDiagram **Route:** `DELETE /{bucket}` -Bucket deletion publishes a `DeleteBucketMessage` via pub/sub. Storage nodes write a tombstone to the RocksDB bucket table (via [`Database.deleteBucket()`](../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java:91)). The bucket record is logically deleted; any remaining objects in the bucket are not immediately purged. +Bucket deletion publishes a `DeleteBucketMessage` via pub/sub. Storage nodes write a tombstone to the RocksDB bucket table (via [`Database.deleteBucket()`](../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java)). The bucket record is logically deleted; any remaining objects in the bucket are not immediately purged. ```mermaid sequenceDiagram @@ -256,9 +265,9 @@ sequenceDiagram **Route:** `GET /{bucket}?prefix=...&max-keys=...` -The QP streams metadata from all storage nodes using a prefix range scan on the RocksDB metadata table. Because objects in the same bucket may be spread across different erasure sets (based on key hashing), all nodes must be queried. Conflicting versions are resolved using the same read-quorum semantics as GET Object. +The QP fans out per partition, but for each partition it queries one healthy storage node at a time and uses the **first successful response** ([`StorageWorker.fetchListFromAnyNode`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/StorageWorker.java)). No per-item quorum check is performed; because every mutation is Kafka-sequenced and each storage node converges on the same per-partition log, a caught-up node gives the same answer. The eventual-consistency window during a divergence is therefore bounded by Kafka consumer lag rather than reconciled per-item across replicas. Objects in the same bucket may be spread across different erasure sets (based on partitioning), so every owning partition is contacted. -See [`Database.listItemsInBucket()`](../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java:111) and [`StorageNodeV2.listItemsInBucket()`](../storage-node/src/main/java/com/github/koop/storagenode/StorageNodeV2.java:295). +See [`Database.listItemsInBucket()`](../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java) and [`StorageNodeV2.listItemsInBucket()`](../storage-node/src/main/java/com/github/koop/storagenode/StorageNodeV2.java). ```mermaid sequenceDiagram @@ -268,14 +277,13 @@ sequenceDiagram participant RDB as RocksDB (each SN) C->>QP: GET /{bucket}?prefix=animals/ - par Stream metadata from all nodes - QP->>SN: Stream metadata with prefix "animals/" + loop For each partition owning the bucket + QP->>SN: Try the next healthy node (skip DOWN nodes) SN->>RDB: Range scan metadata table (sorted by key) RDB-->>SN: Metadata entries - SN-->>QP: Stream of ObjectSummary records + SN-->>QP: ObjectSummary records (or error → try next node) end - QP->>QP: Merge + deduplicate results - Note over QP: Conflicting versions → apply read-quorum semantics + QP->>QP: Merge per-partition results QP-->>C: 200 OK [XML ListBucketResult] ``` @@ -287,7 +295,7 @@ sequenceDiagram Initiates a multipart upload session. The QP generates a unique `uploadId` and stores the session state in the cache (Redis in production, in-memory for dev/test). No data is written to storage nodes at this stage. -See [`MultipartUploadManager.initiateMultipartUpload()`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/MultipartUploadManager.java:35). +See [`MultipartUploadManager.initiateMultipartUpload()`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/MultipartUploadManager.java). ```mermaid sequenceDiagram @@ -309,7 +317,7 @@ sequenceDiagram Each part is stored as an independent erasure-coded object on the storage nodes using a derived key. The cache is updated only **after** the storage nodes confirm the shard write, ensuring the client is not ACKed until the part is durably stored. The part's byte size is also cached for use during completion. -See [`MultipartUploadManager.uploadPart()`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/MultipartUploadManager.java:50). +See [`MultipartUploadManager.uploadPart()`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/MultipartUploadManager.java). ```mermaid sequenceDiagram @@ -343,9 +351,9 @@ sequenceDiagram **Route:** `POST /{bucket}/{key}?uploadId=X` -The QP verifies all declared parts are present in the cache and that cached sizes are valid, then transitions the session to `COMPLETING` and publishes a [`MultipartCommitMessage`](../common-lib/src/main/java/com/github/koop/common/messages/Message.java:97) via pub/sub to the partition-keyed topic. The message carries the ordered list of part numbers. **Parts are not concatenated or re-uploaded** — they remain as individual erasure-coded shards in storage; reconstruction happens on read. The cache session and part metadata are cleaned up after successful publish. +The QP verifies all declared parts are present in the cache and that cached sizes are valid, then transitions the session to `COMPLETING` and publishes a [`MultipartCommitMessage`](../common-lib/src/main/java/com/github/koop/common/messages/Message.java) via pub/sub to the partition-keyed topic. The message carries the ordered list of part numbers. **Parts are not concatenated or re-uploaded** — they remain as individual erasure-coded shards in storage; reconstruction happens on read. The cache session and part metadata are cleaned up after successful publish. -See [`MultipartUploadManager.completeMultipartUpload()`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/MultipartUploadManager.java:104). +See [`MultipartUploadManager.completeMultipartUpload()`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/MultipartUploadManager.java). ```mermaid sequenceDiagram @@ -406,7 +414,7 @@ sequenceDiagram QP->>Cache: Read mpu:parts:{uploadId} loop For each uploaded part (asynchronous, best-effort) QP->>SW: delete(requestId, bucket, partStorageKey) - SW->>SN: DELETE /store/{partition}/{storageKey} + SW->>SN: DELETE /store/{partition}/{bucket}/{key} SN-->>SW: ACK (errors ignored, cleanup continues) QP->>Cache: Delete part size entry for this part end @@ -470,7 +478,7 @@ sequenceDiagram ## 14. Node Discovery / Health Probing -Each Query Processor maintains a local [`NodeHealthTracker`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/NodeHealthTracker.java) that classifies every known storage node as `HEALTHY`, `SUSPECT`, or `DOWN`. A background [`NodeHealthProbe`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/NodeHealthProbe.java) refreshes the cache every 5 s by issuing `GET /health` to every machine in the current `ErasureSetConfiguration`. Data-path operations (`put`, `fetchShards`, `bucketExists`, `fetchListFromAnyNode` in [`StorageWorker`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/StorageWorker.java)) consult the tracker to skip nodes known to be down — eliminating the 30 s per-request timeout penalty for dead peers — while keeping the quorum denominator unchanged. +Each Query Processor maintains a local [`NodeHealthTracker`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/NodeHealthTracker.java) that classifies every known storage node as `HEALTHY`, `SUSPECT`, or `DOWN`. A background [`NodeHealthProbe`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/NodeHealthProbe.java) refreshes the cache every 5 s by issuing `GET /health` to every machine in the current `ErasureSetConfiguration`. Data-path operations (`put`, `fetchShards`, `bucketExists`, `fetchListFromAnyNode` in [`StorageWorker`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/StorageWorker.java)) consult the tracker to skip nodes known to be down — avoiding the full per-request HTTP timeout (today: a 10 s connect timeout on the data-path `HttpClient`) for each dead peer — while keeping the quorum denominator unchanged. Soft-exclusion semantics: if too few healthy nodes remain to make progress (e.g. fewer than `m` for reads or fewer than `writeQuorum` for writes), the path falls back to contacting every node so a recovered-but-still-marked-DOWN node has a chance to succeed. Any successful response to a DOWN-marked node immediately promotes it back to HEALTHY. @@ -512,12 +520,18 @@ The QP `/health` endpoint surfaces the live counts (`API Gateway is healthy! / S ## RocksDB Table Reference -The [`Database`](../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java) facade and [`StorageNodeV2`](../storage-node/src/main/java/com/github/koop/storagenode/StorageNodeV2.java) define three RocksDB tables written atomically on every PUT/DELETE/bucket operation in the planned flow. +The [`Database`](../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java) facade is backed by [`RocksDbStorageStrategy`](../storage-node/src/main/java/com/github/koop/storagenode/db/RocksDbStorageStrategy.java), which declares **seven** RocksDB column families. The first three are written atomically on every PUT/DELETE/bucket operation; the remaining four support repair, GC, and asynchronous blob deletion. -| Table | Key | Value | Purpose | +| Column family | Key | Value | Purpose | |---|---|---|---| -| **OpLog** | Sequence Number | `(key, operation)` | Ordered log of all mutations; enables repair | -| **Metadata** | Object Key | `(partition, seq, location)` | Latest shard location per object key; supports regular, multipart, and tombstone versions | -| **Buckets** | Bucket Name | `(partition, seq, deleted)` | Bucket existence and tombstone tracking | +| **`op_log`** | Sequence Number | `(key, operation)` | Ordered per-partition log of all mutations; enables repair | +| **`metadata`** | Object Key | `(partition, seq, location)` | Latest shard location per object key; supports regular, multipart, and tombstone versions | +| **`buckets`** | Bucket Name | `(partition, seq, deleted)` | Bucket existence and tombstone tracking | +| **`uncommitted_writes`** | Request ID | Per-shard staging entry | Tracks shards that have arrived from the QP but whose Kafka commit has not yet been observed | +| **`repair_queue`** | Sequence Number | Pending repair task | Persistent queue consumed by [`RepairWorkerPool`](../storage-node/src/main/java/com/github/koop/storagenode/RepairWorkerPool.java) to catch up after a node restart | +| **`pending_deletes`** | Blob Key | Tombstone marker | Obsolete shard paths enqueued on DELETE / GC; drained asynchronously by [`BlobDeletionWorker`](../storage-node/src/main/java/com/github/koop/storagenode/gc/BlobDeletionWorker.java) | +| **`gc_cursors`** | Partition | Last-scanned seqNum | Persistent cursor used by [`GarbageCollectorWorker`](../storage-node/src/main/java/com/github/koop/storagenode/gc/GarbageCollectorWorker.java) so each GC pass examines only new oplog entries | -See [`Database.putItem()`](../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java:24), [`Database.deleteItem()`](../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java:45), and [`Database.createBucket()`](../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java:81) for the atomic write implementations. +The actual shard bytes are **not** stored in RocksDB — they are written as files on the filesystem under `${STORAGE_DIR}` via `java.nio.channels.FileChannel`. The RocksDB instance itself lives at `${STORAGE_DIR}/rocksdb`. + +See [`Database.putItem()`](../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java), [`Database.deleteItem()`](../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java), and [`Database.createBucket()`](../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java) for the atomic write implementations. From 0a28318b859f6f2955b63c1b8cf6c2ec838b7452 Mon Sep 17 00:00:00 2001 From: Michael Kupferstein Date: Tue, 19 May 2026 16:43:04 -0400 Subject: [PATCH 02/14] Update architecture and technologies docs --- docs/architecture.md | 2 +- docs/technologies.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/architecture.md b/docs/architecture.md index 8e108c72..e3f2dfcc 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -33,7 +33,7 @@ The architecture consists of four primary services: - **Redis:** Used by Query Processors to track multipart upload session state (active sessions, uploaded parts, cached part sizes). A `MemoryCacheClient` is available as an in-process substitute for dev/test. 5. **Nginx Ingress (Kubernetes deployments only):** - - Acts as the public-facing entry point in the k8s deployments, fronting the three Query Processor replicas (`query-processor-1..3:8080`) as an `upstream`. Disables request buffering and `client_max_body_size` so large object PUTs stream straight through to the QPs. Configured in [`nginx-k8s.conf`](../nginx-k8s.conf); not present in the local `docker-compose.yml` topology, where clients hit Query Processors on ports `9001-9003` directly. + - Acts as the public-facing entry point in the k8s deployments, exposed as a `NodePort` on `30080`. Its `upstream` points at the Query Processor `ClusterIP` Service (`query-processor.koopdb.svc.cluster.local:8080`), letting kube-proxy load-balance across the 3 QP pods. Request buffering and `client_max_body_size` are disabled so large object PUTs stream straight through to the QPs. Configured via the inline ConfigMap inside [`k8s/07-nginx.yaml`](../k8s/07-nginx.yaml) (which additionally applies a `limit_req_zone ... 10r/s burst=20` rate limit on `location /`) and [`k8s-9/07-nginx.yaml`](../k8s-9/07-nginx.yaml) (no rate limit). Not present in the local `docker-compose.yml` topology, where clients hit Query Processors on ports `9001-9003` directly. ### Data Flow diff --git a/docs/technologies.md b/docs/technologies.md index 01a2ced7..f4252fc7 100644 --- a/docs/technologies.md +++ b/docs/technologies.md @@ -31,7 +31,7 @@ KoopDB leverages a modern Java stack focused on high-performance concurrency and ### Testing & Verification - **JUnit 5 (5.10.2):** Unit and integration testing. -- **AWS SDK for Java v2 (`software.amazon.awssdk:s3`):** Used in system tests to drive the cluster through a real S3 client and verify S3-compatible behavior. +- **AWS SDK for Java v2 (`software.amazon.awssdk:s3`):** Used in system tests to drive the cluster through a real S3 client and verify S3-compatible behavior. The async client (`S3AsyncClient`) plus [`s3-transfer-manager 2.41.34`](../system-tests/pom.xml) exercise the high-level `uploadFile` / `downloadFile` flows, which internally chunk large objects into multipart uploads. - **Testcontainers:** Used by [`DockerComposeS3E2EIT`](../system-tests/src/test/java/koop/DockerComposeS3E2EIT.java) (via `ComposeContainer`) to bring up the actual `docker-compose.yml` stack for true end-to-end tests. - **In-process integration cluster:** Suites like [`RealStorageNodesIT`](../system-tests/src/test/java/koop/RealStorageNodesIT.java) spin up 9 `StorageNodeServerV2` instances inside a single JVM and substitute the external dependencies with in-memory fakes — `MemoryFetcher` for Etcd and `MemoryPubSub` for Kafka — so quorum, version, and repair logic can be exercised without Docker. From 37bf2e761ad60000efb1da4075a5b627c68bfae1 Mon Sep 17 00:00:00 2001 From: Michael Kupferstein Date: Tue, 19 May 2026 16:48:11 -0400 Subject: [PATCH 03/14] Refine erasure coding terminology for clarity --- docs/architecture.md | 2 +- query-processor/README.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/architecture.md b/docs/architecture.md index e3f2dfcc..8c4400c6 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -10,7 +10,7 @@ The architecture consists of four primary services: 1. **Query Processors (Gateway):** - **Stateless Front-End:** Accepts S3 API requests from clients. - - **Erasure Coding:** Encodes incoming data into `k` data + `(n-k)` parity shards and distributes them to storage nodes. Reconstructs data from any `k` available shards on retrieval. + - **Erasure Coding:** Encodes incoming data into `m` data + `k = n − m` parity shards and distributes them to storage nodes. Reconstructs data from any `m` available shards on retrieval. (Variable naming matches [`ErasureCoder`](../common-lib/src/main/java/com/github/koop/common/erasure/ErasureCoder.java): `m` = data shards, `k` = parity shards, `n = m + k` = total shards.) - **Routing:** Maps each `(bucket, key)` to a partition and to an erasure set using configuration loaded from Etcd (`erasure_set_configurations`, `partition_spread_configurations`). - **Multipart Manager:** Coordinates multipart upload sessions, tracking parts and publishing the final commit message. - **Discovery Cache (`NodeHealthTracker` + `NodeHealthProbe`):** Each QP maintains a local cache of storage-node liveness. A background probe issues `GET /health` to every known storage node every 5 s (2 s timeout) and drives a `HEALTHY → SUSPECT → DOWN` state machine. Data-path operations (`put`, `fetchShards`, `bucketExists`, `fetchListFromAnyNode`) consult the cache to **soft-exclude** DOWN nodes, eliminating the per-request 30 s timeout penalty for known-dead peers. The cache is advisory only — quorum math is unchanged, and the path falls back to the full node list when too few healthy nodes remain or when a request to a DOWN-marked node succeeds (opportunistic recovery). diff --git a/query-processor/README.md b/query-processor/README.md index 9c378df6..ab69d569 100644 --- a/query-processor/README.md +++ b/query-processor/README.md @@ -18,7 +18,7 @@ Built with **Java 21**, **Javalin**, and **Virtual Threads** for high concurrenc The gateway acts as the smart client for the storage layer. Its responsibilities: 1. **Routing:** Maps a `(bucket, key)` to a partition, then to an erasure set, using configuration from Etcd ([`ErasureRouting`](../common-lib/src/main/java/com/github/koop/common/erasure/ErasureRouting.java) and [`PartitionSpreadConfiguration`](../common-lib/src/main/java/com/github/koop/common/metadata/PartitionSpreadConfiguration.java)). -2. **Erasure Encoding/Decoding:** Splits incoming objects into `k` data + `(n-k)` parity shards on PUT, reconstructs from any `k` available shards on GET. +2. **Erasure Encoding/Decoding:** Splits incoming objects into `m` data + `k = n − m` parity shards on PUT, reconstructs from any `m` available shards on GET. 3. **Quorum Coordination:** Waits for shard write ACKs from a configured `write_quorum` of nodes, then publishes a commit message via Kafka. 4. **Multipart Session State:** Tracks active multipart sessions and their parts in Redis. From 0bb03360f95f0d57955a725bb1c259951ac3d3ac Mon Sep 17 00:00:00 2001 From: Michael Kupferstein Date: Tue, 19 May 2026 22:07:55 -0400 Subject: [PATCH 04/14] updated docs so it can be used on pages --- .../docs/01-index.md} | 18 +++---- docs/{scope.md => diagrams/docs/02-scope.md} | 0 .../docs/03-challenges.md} | 4 +- .../docs/04-architecture.md} | 12 ++--- .../docs/05-workflow.md} | 48 +++++++++---------- .../docs/06-technologies.md} | 10 ++-- .../docs/07-installation.md} | 30 ++++++------ docs/{api.md => diagrams/docs/08-api.md} | 0 .../docs/09-testing.md} | 0 9 files changed, 61 insertions(+), 61 deletions(-) rename docs/{Object Store.md => diagrams/docs/01-index.md} (53%) rename docs/{scope.md => diagrams/docs/02-scope.md} (100%) rename docs/{challenges.md => diagrams/docs/03-challenges.md} (80%) rename docs/{architecture.md => diagrams/docs/04-architecture.md} (84%) rename docs/{workflow.md => diagrams/docs/05-workflow.md} (76%) rename docs/{technologies.md => diagrams/docs/06-technologies.md} (71%) rename docs/{installation.md => diagrams/docs/07-installation.md} (73%) rename docs/{api.md => diagrams/docs/08-api.md} (100%) rename docs/{testing.md => diagrams/docs/09-testing.md} (100%) diff --git a/docs/Object Store.md b/docs/diagrams/docs/01-index.md similarity index 53% rename from docs/Object Store.md rename to docs/diagrams/docs/01-index.md index 3a90297f..2edd8b9f 100644 --- a/docs/Object Store.md +++ b/docs/diagrams/docs/01-index.md @@ -6,18 +6,18 @@ KoopDB is a distributed, S3-compatible object store. Objects are split into Reed-Solomon erasure-coded shards and spread across a cluster of storage nodes; mutations are ordered through Kafka and persisted in RocksDB on each node. A stateless Query Processor fronts the cluster with the S3 HTTP API. -For a top-level summary of features and the system topology, see the project [README](../README.md). +For a top-level summary of features and the system topology, see the project [README](../../../README.md). ## Project Plan -- [Scope (scenarios covered)](scope.md) -- [Distributed System Challenges](challenges.md) -- [Workflow Diagrams](workflow.md) -- [Software Architecture](architecture.md) -- [Tools & Technologies](technologies.md) +- [Scope (scenarios covered)](02-scope.md) +- [Distributed System Challenges](03-challenges.md) +- [Workflow Diagrams](05-workflow.md) +- [Software Architecture](04-architecture.md) +- [Tools & Technologies](06-technologies.md) ## Installation and Usage Guide -- [Installation Instructions](installation.md) -- [API and Usage Documentation](api.md) -- [Query Processor (API Gateway) module README](../query-processor/README.md) +- [Installation Instructions](07-installation.md) +- [API and Usage Documentation](08-api.md) +- [Query Processor (API Gateway) module README](../../../query-processor/README.md) diff --git a/docs/scope.md b/docs/diagrams/docs/02-scope.md similarity index 100% rename from docs/scope.md rename to docs/diagrams/docs/02-scope.md diff --git a/docs/challenges.md b/docs/diagrams/docs/03-challenges.md similarity index 80% rename from docs/challenges.md rename to docs/diagrams/docs/03-challenges.md index 3676a482..d8f6ee65 100644 --- a/docs/challenges.md +++ b/docs/diagrams/docs/03-challenges.md @@ -43,7 +43,7 @@ This throughput increase aligns with the benefits of system scalability. - **Challenge:** Without health awareness, every QP request paid the full per-request HTTP timeout (today: a 10 s connect timeout on the data-path `HttpClient`) for each unreachable storage node. One dead node added the full connect-timeout cost to every GET/PUT; three dead nodes (still within fault tolerance) compounded that delay, inflating sub-second requests into multi-second ones. - **Sub-challenge — stale health cache:** A cached "DOWN" marking can become wrong: the node may have recovered, or a transient network blip may have falsely marked a healthy node as down. A naive hard-exclusion cache would prevent recovery indefinitely (false negative) or shed load unnecessarily (false positive). -- **Solution:** Each QP runs a [`NodeHealthTracker`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/NodeHealthTracker.java) (state machine: `HEALTHY → SUSPECT → DOWN`) fed by both a periodic [`NodeHealthProbe`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/NodeHealthProbe.java) (5 s interval, `GET /health`) and outcomes of real data-path requests. Data-path operations **soft-exclude** DOWN nodes — they are skipped only while enough healthy peers remain to make quorum; otherwise the path falls back to contacting every node. Any successful response from a DOWN-marked node immediately promotes it back to `HEALTHY` (opportunistic recovery). +- **Solution:** Each QP runs a [`NodeHealthTracker`](../../../query-processor/src/main/java/com/github/koop/queryprocessor/processor/NodeHealthTracker.java) (state machine: `HEALTHY → SUSPECT → DOWN`) fed by both a periodic [`NodeHealthProbe`](../../../query-processor/src/main/java/com/github/koop/queryprocessor/processor/NodeHealthProbe.java) (5 s interval, `GET /health`) and outcomes of real data-path requests. Data-path operations **soft-exclude** DOWN nodes — they are skipped only while enough healthy peers remain to make quorum; otherwise the path falls back to contacting every node. Any successful response from a DOWN-marked node immediately promotes it back to `HEALTHY` (opportunistic recovery). - **Benefit:** Dead-node latency drops to sub-second on the fast path, the quorum denominator is never reduced, and the cache self-heals against both false positives (probe re-detects recovery) and false negatives (data-path successes override stale DOWN markings). ## Write Consistency @@ -85,4 +85,4 @@ This throughput increase aligns with the benefits of system scalability. ## Bucket Item Consistency - **Challenge:** Storage nodes commit in order, but not always simultaneously, so the per-node view of a bucket can diverge briefly. -- **Solution:** Bucket listings are served per-partition from a single healthy storage node — [`fetchListFromAnyNode`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/StorageWorker.java) walks the healthy node list and returns the first successful response. Because every mutation is Kafka-sequenced and each storage node converges on the same per-partition log, any node that has caught up gives the same answer; the eventual-consistency window during a divergence is bounded by Kafka consumer lag rather than reconciled per-item across replicas. \ No newline at end of file +- **Solution:** Bucket listings are served per-partition from a single healthy storage node — [`fetchListFromAnyNode`](../../../query-processor/src/main/java/com/github/koop/queryprocessor/processor/StorageWorker.java) walks the healthy node list and returns the first successful response. Because every mutation is Kafka-sequenced and each storage node converges on the same per-partition log, any node that has caught up gives the same answer; the eventual-consistency window during a divergence is bounded by Kafka consumer lag rather than reconciled per-item across replicas. \ No newline at end of file diff --git a/docs/architecture.md b/docs/diagrams/docs/04-architecture.md similarity index 84% rename from docs/architecture.md rename to docs/diagrams/docs/04-architecture.md index 8c4400c6..e9ade1e9 100644 --- a/docs/architecture.md +++ b/docs/diagrams/docs/04-architecture.md @@ -10,7 +10,7 @@ The architecture consists of four primary services: 1. **Query Processors (Gateway):** - **Stateless Front-End:** Accepts S3 API requests from clients. - - **Erasure Coding:** Encodes incoming data into `m` data + `k = n − m` parity shards and distributes them to storage nodes. Reconstructs data from any `m` available shards on retrieval. (Variable naming matches [`ErasureCoder`](../common-lib/src/main/java/com/github/koop/common/erasure/ErasureCoder.java): `m` = data shards, `k` = parity shards, `n = m + k` = total shards.) + - **Erasure Coding:** Encodes incoming data into `m` data + `k = n − m` parity shards and distributes them to storage nodes. Reconstructs data from any `m` available shards on retrieval. (Variable naming matches [`ErasureCoder`](../../../common-lib/src/main/java/com/github/koop/common/erasure/ErasureCoder.java): `m` = data shards, `k` = parity shards, `n = m + k` = total shards.) - **Routing:** Maps each `(bucket, key)` to a partition and to an erasure set using configuration loaded from Etcd (`erasure_set_configurations`, `partition_spread_configurations`). - **Multipart Manager:** Coordinates multipart upload sessions, tracking parts and publishing the final commit message. - **Discovery Cache (`NodeHealthTracker` + `NodeHealthProbe`):** Each QP maintains a local cache of storage-node liveness. A background probe issues `GET /health` to every known storage node every 5 s (2 s timeout) and drives a `HEALTHY → SUSPECT → DOWN` state machine. Data-path operations (`put`, `fetchShards`, `bucketExists`, `fetchListFromAnyNode`) consult the cache to **soft-exclude** DOWN nodes, eliminating the per-request 30 s timeout penalty for known-dead peers. The cache is advisory only — quorum math is unchanged, and the path falls back to the full node list when too few healthy nodes remain or when a request to a DOWN-marked node succeeds (opportunistic recovery). @@ -25,7 +25,7 @@ The architecture consists of four primary services: 3. **Kafka (Sequencer / Pub-Sub):** - Provides total ordering of mutating operations on a per-partition basis. - - Query Processors publish ordered commit messages (`FileCommitMessage`, `MultipartCommitMessage`, `DeleteMessage`, `CreateBucketMessage`, `DeleteBucketMessage` — the full sealed list in [`Message`](../common-lib/src/main/java/com/github/koop/common/messages/Message.java)) to partition-keyed topics. + - Query Processors publish ordered commit messages (`FileCommitMessage`, `MultipartCommitMessage`, `DeleteMessage`, `CreateBucketMessage`, `DeleteBucketMessage` — the full sealed list in [`Message`](../../../common-lib/src/main/java/com/github/koop/common/messages/Message.java)) to partition-keyed topics. - Storage nodes consume those messages and apply the corresponding atomic RocksDB writes. 4. **Coordination Cluster (Metadata & State):** @@ -33,7 +33,7 @@ The architecture consists of four primary services: - **Redis:** Used by Query Processors to track multipart upload session state (active sessions, uploaded parts, cached part sizes). A `MemoryCacheClient` is available as an in-process substitute for dev/test. 5. **Nginx Ingress (Kubernetes deployments only):** - - Acts as the public-facing entry point in the k8s deployments, exposed as a `NodePort` on `30080`. Its `upstream` points at the Query Processor `ClusterIP` Service (`query-processor.koopdb.svc.cluster.local:8080`), letting kube-proxy load-balance across the 3 QP pods. Request buffering and `client_max_body_size` are disabled so large object PUTs stream straight through to the QPs. Configured via the inline ConfigMap inside [`k8s/07-nginx.yaml`](../k8s/07-nginx.yaml) (which additionally applies a `limit_req_zone ... 10r/s burst=20` rate limit on `location /`) and [`k8s-9/07-nginx.yaml`](../k8s-9/07-nginx.yaml) (no rate limit). Not present in the local `docker-compose.yml` topology, where clients hit Query Processors on ports `9001-9003` directly. + - Acts as the public-facing entry point in the k8s deployments, exposed as a `NodePort` on `30080`. Its `upstream` points at the Query Processor `ClusterIP` Service (`query-processor.koopdb.svc.cluster.local:8080`), letting kube-proxy load-balance across the 3 QP pods. Request buffering and `client_max_body_size` are disabled so large object PUTs stream straight through to the QPs. Configured via the inline ConfigMap inside [`k8s/07-nginx.yaml`](../../../k8s/07-nginx.yaml) (which additionally applies a `limit_req_zone ... 10r/s burst=20` rate limit on `location /`) and [`k8s-9/07-nginx.yaml`](../../../k8s-9/07-nginx.yaml) (no rate limit). Not present in the local `docker-compose.yml` topology, where clients hit Query Processors on ports `9001-9003` directly. ### Data Flow @@ -42,7 +42,7 @@ The architecture consists of four primary services: The system uses a two-stage write: shards are streamed first, and only after a write quorum acknowledges the shard upload is an ordered commit message published. This is **not** XA-style two-phase commit — there is no prepare/abort vote — but it does separate durable shard placement from the ordered metadata commit. 1. **Client** sends a `PUT /{bucket}/{key}` request to any Query Processor. -2. **QP** receives the data stream and erasure-encodes it into `n` total shards (`m` data + `k = n − m` parity). The defaults depend on deployment: the `docker-compose.yml` `etcd-seeder` block (used by the local dev stack **and** the system-tests cluster, since [`DockerComposeS3E2EIT`](../system-tests/src/test/java/koop/DockerComposeS3E2EIT.java) launches `../docker-compose.yml`) configures `n=6, m=4, k=2, write_quorum=5` (tolerates 2 node failures); the larger k8s deployments — [`docker-compose.k8s.yml`](../docker-compose.k8s.yml), [`docker-compose.k8s-local.yml`](../docker-compose.k8s-local.yml), and [`k8s-9/`](../k8s-9/) — use `n=9, m=6, k=3, write_quorum=7` (tolerates 3 node failures). Both configurations are read from `erasure_set_configurations` in Etcd at startup. +2. **QP** receives the data stream and erasure-encodes it into `n` total shards (`m` data + `k = n − m` parity). The defaults depend on deployment: the `docker-compose.yml` `etcd-seeder` block (used by the local dev stack **and** the system-tests cluster, since [`DockerComposeS3E2EIT`](../../../system-tests/src/test/java/koop/DockerComposeS3E2EIT.java) launches `../docker-compose.yml`) configures `n=6, m=4, k=2, write_quorum=5` (tolerates 2 node failures); the larger k8s deployments — [`docker-compose.k8s.yml`](../../../docker-compose.k8s.yml), [`docker-compose.k8s-local.yml`](../../../docker-compose.k8s-local.yml), and [`k8s-9/`](../../../k8s-9/) — use `n=9, m=6, k=3, write_quorum=7` (tolerates 3 node failures). Both configurations are read from `erasure_set_configurations` in Etcd at startup. 3. **QP** looks up the partition's erasure set in Etcd and streams shards to the target storage nodes concurrently over HTTP (`PUT /store/{partition}/{bucket}/{key}`). 4. **QP** waits for at least `write_quorum` shard upload ACKs from the storage nodes. 5. **QP** publishes an ordered `FileCommitMessage` via Kafka to the partition's topic. @@ -68,8 +68,8 @@ DELETE, CreateBucket, and DeleteBucket follow the same Kafka-sequenced pattern a #### 5. Repair -When a storage node restarts and detects a gap in its sequence numbers, its `RepairWorkerPool` requests the missing operations from peer nodes and replays them, skipping any keys whose post-recovery state already supersedes the replayed op. See [`workflow.md`](workflow.md#12-storage-node-repair-flow) for the full flow. +When a storage node restarts and detects a gap in its sequence numbers, its `RepairWorkerPool` requests the missing operations from peer nodes and replays them, skipping any keys whose post-recovery state already supersedes the replayed op. See [`workflow.md`](05-workflow.md#12-storage-node-repair-flow) for the full flow. ## Diagram Reference -For a visual representation of these components and their interactions, see the [C4 Container Diagram](diagrams/workspace.dsl) and the rendered SVG diagrams in the `docs/diagrams/` folder. +For a visual representation of these components and their interactions, see the [C4 Container Diagram](../workspace.dsl) and the rendered SVG diagrams in the `docs/diagrams/` folder. diff --git a/docs/workflow.md b/docs/diagrams/docs/05-workflow.md similarity index 76% rename from docs/workflow.md rename to docs/diagrams/docs/05-workflow.md index 9096dcdf..8c4ec423 100644 --- a/docs/workflow.md +++ b/docs/diagrams/docs/05-workflow.md @@ -3,16 +3,16 @@ This document displays the **planned target-state** workflow diagrams for each supported use case in the Koop distributed object store. Koop exposes an S3-compatible API; all operations flow through the **Query Processor (QP)** gateway before being fanned out to **Storage Nodes (SN)** via erasure-coded shards. > **Related documents:** -> - [Scope](scope.md) — supported use cases and error cases -> - [Architecture Overview](architecture.md) — system component overview and redundancy model +> - [Scope](02-scope.md) — supported use cases and error cases +> - [Architecture Overview](04-architecture.md) — system component overview and redundancy model > **Erasure configuration shown in the diagrams:** The sequence diagrams below depict the > larger deployment configuration of `n = 9` total shards (`m = 6` data + `k = 3` -> parity, `write_quorum = 7`), which is what the [`k8s-9/`](../k8s-9/) deployment and the -> [`docker-compose.k8s*.yml`](../docker-compose.k8s.yml) files seed. The default local -> [`docker-compose.yml`](../docker-compose.yml) (used by the system-tests E2E suite via -> [`DockerComposeS3E2EIT`](../system-tests/src/test/java/koop/DockerComposeS3E2EIT.java)) -> and the smaller [`k8s/`](../k8s/) deployment run `n = 6, m = 4, k = 2, write_quorum = 5` +> parity, `write_quorum = 7`), which is what the [`k8s-9/`](../../../k8s-9/) deployment and the +> [`docker-compose.k8s*.yml`](../../../docker-compose.k8s.yml) files seed. The default local +> [`docker-compose.yml`](../../../docker-compose.yml) (used by the system-tests E2E suite via +> [`DockerComposeS3E2EIT`](../../../system-tests/src/test/java/koop/DockerComposeS3E2EIT.java)) +> and the smaller [`k8s/`](../../../k8s/) deployment run `n = 6, m = 4, k = 2, write_quorum = 5` > over 6 storage nodes; the flow is identical, only the numbers change. Where this document > writes `≥ m` it means "at least the data-shard count" (reads need `m` shards to reconstruct); > where it writes `≥ write_quorum` it means the shard upload / commit ACK threshold from @@ -43,9 +43,9 @@ This document displays the **planned target-state** workflow diagrams for each s ## System Component Overview -The following diagram shows the high-level components involved in every workflow. Each request enters through the S3-compatible HTTP gateway ([`Main.java`](../query-processor/src/main/java/com/github/koop/queryprocessor/gateway/Main.java)), is processed by the [`StorageWorker`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/StorageWorker.java), and is fanned out to the storage nodes in the target erasure set. Kafka (pub/sub) provides per-partition sequencing for write/delete and multipart commit ordering — commit messages are published to a partition-keyed topic so that all storage nodes in the erasure set apply mutations for a given partition in the same order. Redis (or an in-memory [`MemoryCacheClient`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/cache/MemoryCacheClient.java) in dev/test) is used by the Query Processor for multipart upload session tracking. Storage nodes persist shard bytes as files on disk and maintain operation metadata in RocksDB through [`StorageNodeServerV2`](../storage-node/src/main/java/com/github/koop/storagenode/StorageNodeServerV2.java), [`StorageNodeV2`](../storage-node/src/main/java/com/github/koop/storagenode/StorageNodeV2.java), and [`Database`](../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java). +The following diagram shows the high-level components involved in every workflow. Each request enters through the S3-compatible HTTP gateway ([`Main.java`](../../../query-processor/src/main/java/com/github/koop/queryprocessor/gateway/Main.java)), is processed by the [`StorageWorker`](../../../query-processor/src/main/java/com/github/koop/queryprocessor/processor/StorageWorker.java), and is fanned out to the storage nodes in the target erasure set. Kafka (pub/sub) provides per-partition sequencing for write/delete and multipart commit ordering — commit messages are published to a partition-keyed topic so that all storage nodes in the erasure set apply mutations for a given partition in the same order. Redis (or an in-memory [`MemoryCacheClient`](../../../query-processor/src/main/java/com/github/koop/queryprocessor/processor/cache/MemoryCacheClient.java) in dev/test) is used by the Query Processor for multipart upload session tracking. Storage nodes persist shard bytes as files on disk and maintain operation metadata in RocksDB through [`StorageNodeServerV2`](../../../storage-node/src/main/java/com/github/koop/storagenode/StorageNodeServerV2.java), [`StorageNodeV2`](../../../storage-node/src/main/java/com/github/koop/storagenode/StorageNodeV2.java), and [`Database`](../../../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java). -![System Container Overview](diagrams/Containers.svg) +![System Container Overview](../Containers.svg) --- @@ -101,7 +101,7 @@ flowchart TD The QP queries all `n` storage nodes for their shard. At least `m` shards (the erasure-coding data-shard count) must be available to reconstruct the object. The QP streams the reconstructed data back to the client. -See [`StorageWorker.get()`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/StorageWorker.java) and [`StorageNodeV2.retrieve()`](../storage-node/src/main/java/com/github/koop/storagenode/StorageNodeV2.java). +See [`StorageWorker.get()`](../../../query-processor/src/main/java/com/github/koop/queryprocessor/processor/StorageWorker.java) and [`StorageNodeV2.retrieve()`](../../../storage-node/src/main/java/com/github/koop/storagenode/StorageNodeV2.java). ```mermaid sequenceDiagram @@ -184,7 +184,7 @@ sequenceDiagram **Route:** `PUT /{bucket}` -Bucket creation is sequenced through Kafka/pub/sub. Storage nodes store the bucket record in their RocksDB bucket table (via [`Database.createBucket()`](../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java) and [`StorageNodeV2.createBucket()`](../storage-node/src/main/java/com/github/koop/storagenode/StorageNodeV2.java)). A write quorum must acknowledge before the client is notified. +Bucket creation is sequenced through Kafka/pub/sub. Storage nodes store the bucket record in their RocksDB bucket table (via [`Database.createBucket()`](../../../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java) and [`StorageNodeV2.createBucket()`](../../../storage-node/src/main/java/com/github/koop/storagenode/StorageNodeV2.java)). A write quorum must acknowledge before the client is notified. ```mermaid sequenceDiagram @@ -214,7 +214,7 @@ sequenceDiagram **Route:** `DELETE /{bucket}` -Bucket deletion publishes a `DeleteBucketMessage` via pub/sub. Storage nodes write a tombstone to the RocksDB bucket table (via [`Database.deleteBucket()`](../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java)). The bucket record is logically deleted; any remaining objects in the bucket are not immediately purged. +Bucket deletion publishes a `DeleteBucketMessage` via pub/sub. Storage nodes write a tombstone to the RocksDB bucket table (via [`Database.deleteBucket()`](../../../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java)). The bucket record is logically deleted; any remaining objects in the bucket are not immediately purged. ```mermaid sequenceDiagram @@ -265,9 +265,9 @@ sequenceDiagram **Route:** `GET /{bucket}?prefix=...&max-keys=...` -The QP fans out per partition, but for each partition it queries one healthy storage node at a time and uses the **first successful response** ([`StorageWorker.fetchListFromAnyNode`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/StorageWorker.java)). No per-item quorum check is performed; because every mutation is Kafka-sequenced and each storage node converges on the same per-partition log, a caught-up node gives the same answer. The eventual-consistency window during a divergence is therefore bounded by Kafka consumer lag rather than reconciled per-item across replicas. Objects in the same bucket may be spread across different erasure sets (based on partitioning), so every owning partition is contacted. +The QP fans out per partition, but for each partition it queries one healthy storage node at a time and uses the **first successful response** ([`StorageWorker.fetchListFromAnyNode`](../../../query-processor/src/main/java/com/github/koop/queryprocessor/processor/StorageWorker.java)). No per-item quorum check is performed; because every mutation is Kafka-sequenced and each storage node converges on the same per-partition log, a caught-up node gives the same answer. The eventual-consistency window during a divergence is therefore bounded by Kafka consumer lag rather than reconciled per-item across replicas. Objects in the same bucket may be spread across different erasure sets (based on partitioning), so every owning partition is contacted. -See [`Database.listItemsInBucket()`](../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java) and [`StorageNodeV2.listItemsInBucket()`](../storage-node/src/main/java/com/github/koop/storagenode/StorageNodeV2.java). +See [`Database.listItemsInBucket()`](../../../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java) and [`StorageNodeV2.listItemsInBucket()`](../../../storage-node/src/main/java/com/github/koop/storagenode/StorageNodeV2.java). ```mermaid sequenceDiagram @@ -295,7 +295,7 @@ sequenceDiagram Initiates a multipart upload session. The QP generates a unique `uploadId` and stores the session state in the cache (Redis in production, in-memory for dev/test). No data is written to storage nodes at this stage. -See [`MultipartUploadManager.initiateMultipartUpload()`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/MultipartUploadManager.java). +See [`MultipartUploadManager.initiateMultipartUpload()`](../../../query-processor/src/main/java/com/github/koop/queryprocessor/processor/MultipartUploadManager.java). ```mermaid sequenceDiagram @@ -317,7 +317,7 @@ sequenceDiagram Each part is stored as an independent erasure-coded object on the storage nodes using a derived key. The cache is updated only **after** the storage nodes confirm the shard write, ensuring the client is not ACKed until the part is durably stored. The part's byte size is also cached for use during completion. -See [`MultipartUploadManager.uploadPart()`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/MultipartUploadManager.java). +See [`MultipartUploadManager.uploadPart()`](../../../query-processor/src/main/java/com/github/koop/queryprocessor/processor/MultipartUploadManager.java). ```mermaid sequenceDiagram @@ -351,9 +351,9 @@ sequenceDiagram **Route:** `POST /{bucket}/{key}?uploadId=X` -The QP verifies all declared parts are present in the cache and that cached sizes are valid, then transitions the session to `COMPLETING` and publishes a [`MultipartCommitMessage`](../common-lib/src/main/java/com/github/koop/common/messages/Message.java) via pub/sub to the partition-keyed topic. The message carries the ordered list of part numbers. **Parts are not concatenated or re-uploaded** — they remain as individual erasure-coded shards in storage; reconstruction happens on read. The cache session and part metadata are cleaned up after successful publish. +The QP verifies all declared parts are present in the cache and that cached sizes are valid, then transitions the session to `COMPLETING` and publishes a [`MultipartCommitMessage`](../../../common-lib/src/main/java/com/github/koop/common/messages/Message.java) via pub/sub to the partition-keyed topic. The message carries the ordered list of part numbers. **Parts are not concatenated or re-uploaded** — they remain as individual erasure-coded shards in storage; reconstruction happens on read. The cache session and part metadata are cleaned up after successful publish. -See [`MultipartUploadManager.completeMultipartUpload()`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/MultipartUploadManager.java). +See [`MultipartUploadManager.completeMultipartUpload()`](../../../query-processor/src/main/java/com/github/koop/queryprocessor/processor/MultipartUploadManager.java). ```mermaid sequenceDiagram @@ -478,7 +478,7 @@ sequenceDiagram ## 14. Node Discovery / Health Probing -Each Query Processor maintains a local [`NodeHealthTracker`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/NodeHealthTracker.java) that classifies every known storage node as `HEALTHY`, `SUSPECT`, or `DOWN`. A background [`NodeHealthProbe`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/NodeHealthProbe.java) refreshes the cache every 5 s by issuing `GET /health` to every machine in the current `ErasureSetConfiguration`. Data-path operations (`put`, `fetchShards`, `bucketExists`, `fetchListFromAnyNode` in [`StorageWorker`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/StorageWorker.java)) consult the tracker to skip nodes known to be down — avoiding the full per-request HTTP timeout (today: a 10 s connect timeout on the data-path `HttpClient`) for each dead peer — while keeping the quorum denominator unchanged. +Each Query Processor maintains a local [`NodeHealthTracker`](../../../query-processor/src/main/java/com/github/koop/queryprocessor/processor/NodeHealthTracker.java) that classifies every known storage node as `HEALTHY`, `SUSPECT`, or `DOWN`. A background [`NodeHealthProbe`](../../../query-processor/src/main/java/com/github/koop/queryprocessor/processor/NodeHealthProbe.java) refreshes the cache every 5 s by issuing `GET /health` to every machine in the current `ErasureSetConfiguration`. Data-path operations (`put`, `fetchShards`, `bucketExists`, `fetchListFromAnyNode` in [`StorageWorker`](../../../query-processor/src/main/java/com/github/koop/queryprocessor/processor/StorageWorker.java)) consult the tracker to skip nodes known to be down — avoiding the full per-request HTTP timeout (today: a 10 s connect timeout on the data-path `HttpClient`) for each dead peer — while keeping the quorum denominator unchanged. Soft-exclusion semantics: if too few healthy nodes remain to make progress (e.g. fewer than `m` for reads or fewer than `writeQuorum` for writes), the path falls back to contacting every node so a recovered-but-still-marked-DOWN node has a chance to succeed. Any successful response to a DOWN-marked node immediately promotes it back to HEALTHY. @@ -520,7 +520,7 @@ The QP `/health` endpoint surfaces the live counts (`API Gateway is healthy! / S ## RocksDB Table Reference -The [`Database`](../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java) facade is backed by [`RocksDbStorageStrategy`](../storage-node/src/main/java/com/github/koop/storagenode/db/RocksDbStorageStrategy.java), which declares **seven** RocksDB column families. The first three are written atomically on every PUT/DELETE/bucket operation; the remaining four support repair, GC, and asynchronous blob deletion. +The [`Database`](../../../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java) facade is backed by [`RocksDbStorageStrategy`](../../../storage-node/src/main/java/com/github/koop/storagenode/db/RocksDbStorageStrategy.java), which declares **seven** RocksDB column families. The first three are written atomically on every PUT/DELETE/bucket operation; the remaining four support repair, GC, and asynchronous blob deletion. | Column family | Key | Value | Purpose | |---|---|---|---| @@ -528,10 +528,10 @@ The [`Database`](../storage-node/src/main/java/com/github/koop/storagenode/db/Da | **`metadata`** | Object Key | `(partition, seq, location)` | Latest shard location per object key; supports regular, multipart, and tombstone versions | | **`buckets`** | Bucket Name | `(partition, seq, deleted)` | Bucket existence and tombstone tracking | | **`uncommitted_writes`** | Request ID | Per-shard staging entry | Tracks shards that have arrived from the QP but whose Kafka commit has not yet been observed | -| **`repair_queue`** | Sequence Number | Pending repair task | Persistent queue consumed by [`RepairWorkerPool`](../storage-node/src/main/java/com/github/koop/storagenode/RepairWorkerPool.java) to catch up after a node restart | -| **`pending_deletes`** | Blob Key | Tombstone marker | Obsolete shard paths enqueued on DELETE / GC; drained asynchronously by [`BlobDeletionWorker`](../storage-node/src/main/java/com/github/koop/storagenode/gc/BlobDeletionWorker.java) | -| **`gc_cursors`** | Partition | Last-scanned seqNum | Persistent cursor used by [`GarbageCollectorWorker`](../storage-node/src/main/java/com/github/koop/storagenode/gc/GarbageCollectorWorker.java) so each GC pass examines only new oplog entries | +| **`repair_queue`** | Sequence Number | Pending repair task | Persistent queue consumed by [`RepairWorkerPool`](../../../storage-node/src/main/java/com/github/koop/storagenode/RepairWorkerPool.java) to catch up after a node restart | +| **`pending_deletes`** | Blob Key | Tombstone marker | Obsolete shard paths enqueued on DELETE / GC; drained asynchronously by [`BlobDeletionWorker`](../../../storage-node/src/main/java/com/github/koop/storagenode/gc/BlobDeletionWorker.java) | +| **`gc_cursors`** | Partition | Last-scanned seqNum | Persistent cursor used by [`GarbageCollectorWorker`](../../../storage-node/src/main/java/com/github/koop/storagenode/gc/GarbageCollectorWorker.java) so each GC pass examines only new oplog entries | The actual shard bytes are **not** stored in RocksDB — they are written as files on the filesystem under `${STORAGE_DIR}` via `java.nio.channels.FileChannel`. The RocksDB instance itself lives at `${STORAGE_DIR}/rocksdb`. -See [`Database.putItem()`](../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java), [`Database.deleteItem()`](../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java), and [`Database.createBucket()`](../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java) for the atomic write implementations. +See [`Database.putItem()`](../../../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java), [`Database.deleteItem()`](../../../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java), and [`Database.createBucket()`](../../../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java) for the atomic write implementations. diff --git a/docs/technologies.md b/docs/diagrams/docs/06-technologies.md similarity index 71% rename from docs/technologies.md rename to docs/diagrams/docs/06-technologies.md index f4252fc7..6aec70de 100644 --- a/docs/technologies.md +++ b/docs/diagrams/docs/06-technologies.md @@ -14,11 +14,11 @@ KoopDB leverages a modern Java stack focused on high-performance concurrency and - Configured to use Virtual Threads for optimal throughput. ### Storage Engine -- **RocksDB (8.10.0):** An embeddable persistent key-value store, used by Storage Nodes for **metadata only**. Column families: `op_log`, `metadata`, `buckets`, `uncommitted_writes`, `repair_queue`, `pending_deletes`, `gc_cursors` (see [`RocksDbStorageStrategy`](../storage-node/src/main/java/com/github/koop/storagenode/db/RocksDbStorageStrategy.java)). The RocksDB instance lives under `${STORAGE_DIR}/rocksdb`. +- **RocksDB (8.10.0):** An embeddable persistent key-value store, used by Storage Nodes for **metadata only**. Column families: `op_log`, `metadata`, `buckets`, `uncommitted_writes`, `repair_queue`, `pending_deletes`, `gc_cursors` (see [`RocksDbStorageStrategy`](../../../storage-node/src/main/java/com/github/koop/storagenode/db/RocksDbStorageStrategy.java)). The RocksDB instance lives under `${STORAGE_DIR}/rocksdb`. - **Filesystem (FileChannel):** The actual shard bytes are written as plain files on disk via `java.nio.channels.FileChannel`, under directories sibling to the RocksDB instance inside `${STORAGE_DIR}`. ### Erasure Coding -- **Backblaze JavaReedSolomon:** Reed-Solomon encoder/decoder used by [`ErasureCoder`](../common-lib/src/main/java/com/github/koop/common/erasure/ErasureCoder.java) on both the encode (PUT) and decode (GET) paths. Dependency `com.github.Backblaze:JavaReedSolomon` in [`common-lib/pom.xml`](../common-lib/pom.xml). +- **Backblaze JavaReedSolomon:** Reed-Solomon encoder/decoder used by [`ErasureCoder`](../../../common-lib/src/main/java/com/github/koop/common/erasure/ErasureCoder.java) on both the encode (PUT) and decode (GET) paths. Dependency `com.github.Backblaze:JavaReedSolomon` in [`common-lib/pom.xml`](../../../common-lib/pom.xml). ### Serialization & Logging - **Jackson (2.17.0):** JSON serialization/deserialization for API responses and internal messages. @@ -31,9 +31,9 @@ KoopDB leverages a modern Java stack focused on high-performance concurrency and ### Testing & Verification - **JUnit 5 (5.10.2):** Unit and integration testing. -- **AWS SDK for Java v2 (`software.amazon.awssdk:s3`):** Used in system tests to drive the cluster through a real S3 client and verify S3-compatible behavior. The async client (`S3AsyncClient`) plus [`s3-transfer-manager 2.41.34`](../system-tests/pom.xml) exercise the high-level `uploadFile` / `downloadFile` flows, which internally chunk large objects into multipart uploads. -- **Testcontainers:** Used by [`DockerComposeS3E2EIT`](../system-tests/src/test/java/koop/DockerComposeS3E2EIT.java) (via `ComposeContainer`) to bring up the actual `docker-compose.yml` stack for true end-to-end tests. -- **In-process integration cluster:** Suites like [`RealStorageNodesIT`](../system-tests/src/test/java/koop/RealStorageNodesIT.java) spin up 9 `StorageNodeServerV2` instances inside a single JVM and substitute the external dependencies with in-memory fakes — `MemoryFetcher` for Etcd and `MemoryPubSub` for Kafka — so quorum, version, and repair logic can be exercised without Docker. +- **AWS SDK for Java v2 (`software.amazon.awssdk:s3`):** Used in system tests to drive the cluster through a real S3 client and verify S3-compatible behavior. The async client (`S3AsyncClient`) plus [`s3-transfer-manager 2.41.34`](../../../system-tests/pom.xml) exercise the high-level `uploadFile` / `downloadFile` flows, which internally chunk large objects into multipart uploads. +- **Testcontainers:** Used by [`DockerComposeS3E2EIT`](../../../system-tests/src/test/java/koop/DockerComposeS3E2EIT.java) (via `ComposeContainer`) to bring up the actual `docker-compose.yml` stack for true end-to-end tests. +- **In-process integration cluster:** Suites like [`RealStorageNodesIT`](../../../system-tests/src/test/java/koop/RealStorageNodesIT.java) spin up 9 `StorageNodeServerV2` instances inside a single JVM and substitute the external dependencies with in-memory fakes — `MemoryFetcher` for Etcd and `MemoryPubSub` for Kafka — so quorum, version, and repair logic can be exercised without Docker. ## Infrastructure - **Docker & Docker Compose:** Containerization of all components for consistent development and deployment environments. diff --git a/docs/installation.md b/docs/diagrams/docs/07-installation.md similarity index 73% rename from docs/installation.md rename to docs/diagrams/docs/07-installation.md index 7432b464..80b9499d 100644 --- a/docs/installation.md +++ b/docs/diagrams/docs/07-installation.md @@ -3,7 +3,7 @@ KoopDB ships two deployment flavors: - **Docker Compose** — single-host local stack (default for development and the `system-tests` suite). -- **Kubernetes (k3s)** — multi-node cluster deployment, available in two sizings under [`k8s/`](../k8s/) and [`k8s-9/`](../k8s-9/). +- **Kubernetes (k3s)** — multi-node cluster deployment, available in two sizings under [`k8s/`](../../../k8s/) and [`k8s-9/`](../../../k8s-9/). Both orchestrate the same services (Query Processors, Storage Nodes, Etcd, Kafka, Redis), so the env-var and erasure-config sections below apply to either. @@ -79,11 +79,11 @@ The erasure-set parameters are seeded into Etcd at startup. Two sizings ship in- | Deployment | Erasure config | Tolerance | | --- | --- | --- | -| [`docker-compose.yml`](../docker-compose.yml) (local dev + system-tests) | n=6, m=4, write_quorum=5 (1 erasure set, 6 storage-node containers) | 2 node failures | -| [`docker-compose-local.yml`](../docker-compose-local.yml) | n=6, m=4, write_quorum=5 | 2 node failures | -| [`k8s/`](../k8s/) (small k3s deployment) | n=6, m=4, write_quorum=5 (2 erasure sets × 6 pods) | 2 node failures | -| [`docker-compose.k8s.yml`](../docker-compose.k8s.yml) / [`docker-compose.k8s-local.yml`](../docker-compose.k8s-local.yml) | n=9, m=6, write_quorum=7 | 3 node failures | -| [`k8s-9/`](../k8s-9/) (large k3s deployment) | n=9, m=6, write_quorum=7 (2 erasure sets × 9 pods) | 3 node failures | +| [`docker-compose.yml`](../../../docker-compose.yml) (local dev + system-tests) | n=6, m=4, write_quorum=5 (1 erasure set, 6 storage-node containers) | 2 node failures | +| [`docker-compose-local.yml`](../../../docker-compose-local.yml) | n=6, m=4, write_quorum=5 | 2 node failures | +| [`k8s/`](../../../k8s/) (small k3s deployment) | n=6, m=4, write_quorum=5 (2 erasure sets × 6 pods) | 2 node failures | +| [`docker-compose.k8s.yml`](../../../docker-compose.k8s.yml) / [`docker-compose.k8s-local.yml`](../../../docker-compose.k8s-local.yml) | n=9, m=6, write_quorum=7 | 3 node failures | +| [`k8s-9/`](../../../k8s-9/) (large k3s deployment) | n=9, m=6, write_quorum=7 (2 erasure sets × 9 pods) | 3 node failures | The flow is identical across all of them; only the seeded numbers and the number of storage-node replicas differ. The seeded etcd keys are: @@ -93,7 +93,7 @@ The flow is identical across all of them; only the seeded numbers and the number **How to change the topology:** - *Docker Compose:* edit the `etcd-seeder` `command` block in the relevant `docker-compose*.yml` and restart the cluster. -- *Kubernetes:* edit the JSON files under [`k8s/etcd-data/`](../k8s/etcd-data/) (or [`k8s-9/etcd-data/`](../k8s-9/etcd-data/)). [`deploy.sh`](../k8s/deploy.sh) mounts them as the `etcd-seeder-config` ConfigMap consumed by [`05-etcd-seeder.yaml`](../k8s/05-etcd-seeder.yaml). +- *Kubernetes:* edit the JSON files under [`k8s/etcd-data/`](../../../k8s/etcd-data/) (or [`k8s-9/etcd-data/`](../../../k8s-9/etcd-data/)). [`deploy.sh`](../../../k8s/deploy.sh) mounts them as the `etcd-seeder-config` ConfigMap consumed by [`05-etcd-seeder.yaml`](../../../k8s/05-etcd-seeder.yaml). ## Stopping the Cluster @@ -109,7 +109,7 @@ docker-compose down -v ## Kubernetes Deployment -The [`k8s/`](../k8s/) (6-node erasure sets) and [`k8s-9/`](../k8s-9/) (9-node erasure sets) directories contain a self-contained k3s deployment: manifests + helper scripts. Both target a remote control-plane node configured in [`config.sh`](../k8s-9/config.sh) (`SERVER`, `DOCKERHUB_USER`, `SSH_PASS`) and apply everything into a `koopdb` namespace. +The [`k8s/`](../../../k8s/) (6-node erasure sets) and [`k8s-9/`](../../../k8s-9/) (9-node erasure sets) directories contain a self-contained k3s deployment: manifests + helper scripts. Both target a remote control-plane node configured in [`config.sh`](../../../k8s-9/config.sh) (`SERVER`, `DOCKERHUB_USER`, `SSH_PASS`) and apply everything into a `koopdb` namespace. ### Manifest layout @@ -124,18 +124,18 @@ The [`k8s/`](../k8s/) (6-node erasure sets) and [`k8s-9/`](../k8s-9/) (9-node er | `06-query-processor.yaml` | 3-replica QP Deployment + `ClusterIP` Service | | `07-nginx.yaml` | 2-replica nginx fronting the QP service, exposed as `NodePort` on `30080` | -Nodes are pinned with the `koopdb/role` label: `cp` for control-plane (etcd, seeder) and `worker` for data-plane (storage nodes, QPs, nginx, kafka, redis). [`label-nodes.sh`](../k8s-9/label-nodes.sh) holds the IP-to-role mapping for the target cluster. +Nodes are pinned with the `koopdb/role` label: `cp` for control-plane (etcd, seeder) and `worker` for data-plane (storage nodes, QPs, nginx, kafka, redis). [`label-nodes.sh`](../../../k8s-9/label-nodes.sh) holds the IP-to-role mapping for the target cluster. ### Scripts | Script | What it does | | --- | --- | -| [`build.sh`](../k8s-9/build.sh) | Builds `${DOCKERHUB_USER}/query-processor:latest` and `${DOCKERHUB_USER}/storage-node:latest` and pushes both to Docker Hub | -| [`label-nodes.sh`](../k8s-9/label-nodes.sh) | Labels k3s nodes `koopdb/role=cp|worker` | -| [`deploy.sh`](../k8s-9/deploy.sh) | SCPs the manifests, generates the `etcd-seeder-config` ConfigMap from `etcd-data/*.json`, patches the image references, and `kubectl apply`s the manifests in sorted order | -| [`status.sh`](../k8s-9/status.sh) | Lists pods / services / StatefulSets / Deployments in the namespace | -| [`logs.sh`](../k8s-9/logs.sh), [`all_logs.sh`](../k8s-9/all_logs.sh) | Tail pod logs | -| [`teardown.sh`](../k8s-9/teardown.sh) | `kubectl delete namespace koopdb` | +| [`build.sh`](../../../k8s-9/build.sh) | Builds `${DOCKERHUB_USER}/query-processor:latest` and `${DOCKERHUB_USER}/storage-node:latest` and pushes both to Docker Hub | +| [`label-nodes.sh`](../../../k8s-9/label-nodes.sh) | Labels k3s nodes `koopdb/role=cp|worker` | +| [`deploy.sh`](../../../k8s-9/deploy.sh) | SCPs the manifests, generates the `etcd-seeder-config` ConfigMap from `etcd-data/*.json`, patches the image references, and `kubectl apply`s the manifests in sorted order | +| [`status.sh`](../../../k8s-9/status.sh) | Lists pods / services / StatefulSets / Deployments in the namespace | +| [`logs.sh`](../../../k8s-9/logs.sh), [`all_logs.sh`](../../../k8s-9/all_logs.sh) | Tail pod logs | +| [`teardown.sh`](../../../k8s-9/teardown.sh) | `kubectl delete namespace koopdb` | ### Typical workflow diff --git a/docs/api.md b/docs/diagrams/docs/08-api.md similarity index 100% rename from docs/api.md rename to docs/diagrams/docs/08-api.md diff --git a/docs/testing.md b/docs/diagrams/docs/09-testing.md similarity index 100% rename from docs/testing.md rename to docs/diagrams/docs/09-testing.md From c2d132fa0ee0a280d4257dac7aab4c3acc6ec845 Mon Sep 17 00:00:00 2001 From: Michael Kupferstein <116312352+MichaelKupferstein@users.noreply.github.com> Date: Tue, 19 May 2026 22:11:08 -0400 Subject: [PATCH 05/14] Add 'final-docs' branch to docs deployment workflow Updated the GitHub Actions workflow to include the 'final-docs' branch for documentation deployment. --- .github/workflows/docs.yml | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index 4d022370..6fdf0664 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -2,7 +2,8 @@ name: Build and Deploy Architecture Docs on: push: - branches: ["main"] + # 1. ADD YOUR NEW BRANCH HERE + branches: ["main", "final-docs"] permissions: contents: read @@ -13,7 +14,6 @@ concurrency: group: "pages" cancel-in-progress: true -# Fixes the Node.js warning by opting into the new Node 24 runtime env: FORCE_JAVASCRIPT_ACTIONS_TO_NODE24: true @@ -27,18 +27,19 @@ jobs: steps: - name: Check out repository uses: actions/checkout@v4 + with: + # 2. ADD THIS: Fetches all branches so the site generator can see them both + fetch-depth: 0 - name: Generate Site via Docker run: | - # 1. Create the output directory beforehand to prevent the FileNotFoundException mkdir -p build/site - # 2. Run the site generator as the current GitHub runner user docker run --rm -u $(id -u):$(id -g) -v ${{ github.workspace }}:/app -w /app \ ghcr.io/avisi-cloud/structurizr-site-generatr \ generate-site -w docs/diagrams/workspace.dsl \ --default-branch main \ - --branches main + --branches main,final-docs # 3. ADD YOUR NEW BRANCH HERE - name: Setup GitHub Pages uses: actions/configure-pages@v4 From b1703d40744ad5ebdef290453eb873b41c16d50a Mon Sep 17 00:00:00 2001 From: Michael Kupferstein Date: Tue, 19 May 2026 22:25:40 -0400 Subject: [PATCH 06/14] added docs to dsl --- docs/diagrams/workspace.dsl | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/diagrams/workspace.dsl b/docs/diagrams/workspace.dsl index 385bc0b3..4b8f0455 100644 --- a/docs/diagrams/workspace.dsl +++ b/docs/diagrams/workspace.dsl @@ -1,5 +1,7 @@ workspace "Distributed Object Store" "An S3-compatible distributed object store using erasure encoding for redundancy." { + !docs docs + model { # ── External Actors ────────────────────────────────────────────── From 79e2fcd5166a4f879e885f91b2851d34011d3b32 Mon Sep 17 00:00:00 2001 From: Yonatan Ginsburg <38598309+ygins@users.noreply.github.com> Date: Wed, 20 May 2026 11:07:54 -0400 Subject: [PATCH 07/14] Improve clarity and detail in challenges documentation Enhanced explanations of scalability, durability, fault tolerance, and consistency challenges. Added details on Kafka's role in durability and fault tolerance. --- docs/diagrams/docs/03-challenges.md | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/docs/diagrams/docs/03-challenges.md b/docs/diagrams/docs/03-challenges.md index d8f6ee65..c455143b 100644 --- a/docs/diagrams/docs/03-challenges.md +++ b/docs/diagrams/docs/03-challenges.md @@ -4,7 +4,7 @@ Storing all data on a single node is problematic; utilizing additional machines provides more storage capacity. -Fixed partitions, or virtual nodes, allow data to be split across nodes when adding capacity, avoiding the need to repartition everything. +Fixed partitions, or virtual nodes, allow data to be split across nodes when adding capacity, avoiding the need to repartition everything when adding nodes. ## Durability @@ -12,6 +12,10 @@ Relying on a single node risks complete data loss if that node experiences a fai The system addresses this durability concern through erasure coding. +Kafka ensures durable message delivery to storage nodes + +Storage nodes store metadata in persistent RocksDB databases which is durable to failures + ## Throughput Separating writers and readers from the storage nodes allows the system to scale its IO capabilities. @@ -25,8 +29,9 @@ This throughput increase aligns with the benefits of system scalability. ## Fault Tolerance - **Challenge:** Distributing data across nodes means the system must handle individual node failures. -- **Solution:** Erasure coding allows the system to tolerate a specified number (`k`) of node failures. +- **Solution:** Erasure coding + asynchronous repair allows the system to tolerate a specified number (`k`) of node failures. - **Benefit:** Erasure coding saves space compared to standard redundant data replication. +- Control plane (etcd, redis, kafka) run as clusters which are fault tolerant ## Recovery & Retries @@ -54,13 +59,13 @@ This throughput increase aligns with the benefits of system scalability. ## PUT/DELETE Ordering - **Challenge:** Concurrent writes, such as two PUTs to the same key, can lead to inconsistent states if storage nodes apply them in different orders. -- **Solution:** Kafka sequencing assigns a sequence number for each operation within a data partition. +- **Solution:** Kafka sequencing (same topic and partition) assigns a sequence number for each operation within a data partition. - **Benefit:** Regardless of arrival time, nodes know how requests compare to one another, allowing the system to logically resolve the latest commit. ## Read Consistency & Version Conflicts - **Challenge:** Different nodes may return conflicting versions of data during a read. -- **Solution:** The QP selects the **maximum version** reported across responding storage nodes and reconstructs from the shards at that version. +- **Solution:** The QP selects the **maximum readable version** reported across responding storage nodes and reconstructs from the shards at that version. ### Handling Stale Data @@ -85,4 +90,4 @@ This throughput increase aligns with the benefits of system scalability. ## Bucket Item Consistency - **Challenge:** Storage nodes commit in order, but not always simultaneously, so the per-node view of a bucket can diverge briefly. -- **Solution:** Bucket listings are served per-partition from a single healthy storage node — [`fetchListFromAnyNode`](../../../query-processor/src/main/java/com/github/koop/queryprocessor/processor/StorageWorker.java) walks the healthy node list and returns the first successful response. Because every mutation is Kafka-sequenced and each storage node converges on the same per-partition log, any node that has caught up gives the same answer; the eventual-consistency window during a divergence is bounded by Kafka consumer lag rather than reconciled per-item across replicas. \ No newline at end of file +- **Solution:** Bucket listings are served per-partition from a single healthy storage node — [`fetchListFromAnyNode`](../../../query-processor/src/main/java/com/github/koop/queryprocessor/processor/StorageWorker.java) walks the healthy node list and returns the first successful response. Because every mutation is Kafka-sequenced and each storage node converges on the same per-partition log, any node that has caught up gives the same answer; the eventual-consistency window during a divergence is bounded by Kafka consumer lag rather than reconciled per-item across replicas. From 279bbc371d5e573fe0f91127c3686df8e2e28ec7 Mon Sep 17 00:00:00 2001 From: Eitan Leitner <123138188+EitanLeit23@users.noreply.github.com> Date: Wed, 20 May 2026 11:08:11 -0400 Subject: [PATCH 08/14] Refine sequence diagrams and update participant labels --- docs/diagrams/docs/05-workflow.md | 60 +++++++++++++++++++------------ 1 file changed, 38 insertions(+), 22 deletions(-) diff --git a/docs/diagrams/docs/05-workflow.md b/docs/diagrams/docs/05-workflow.md index 8c4ec423..46596ea7 100644 --- a/docs/diagrams/docs/05-workflow.md +++ b/docs/diagrams/docs/05-workflow.md @@ -60,24 +60,32 @@ sequenceDiagram participant C as S3 Client participant QP as Query Processor participant Kafka as Kafka / PubSub (Sequencer) - participant SN as Storage Nodes (×n, here n=9) + participant SN as Storage Nodes (×9) participant RDB as RocksDB (each SN) C->>QP: PUT /{bucket}/{key} [data] - QP->>QP: Erasure-encode → n shards (here 9) + QP->>QP: Erasure-encode → 9 shards + par Stream shards concurrently - QP->>SN: PUT /store/{partition}/{bucket}/{key}?requestId=X [shard i] + QP->>SN: PUT /store/{partition}/{key}?id=X [shard i] + and SN-->>QP: 200 shard received end - Note over QP: Wait for shard upload quorum (≥ write_quorum; here ≥7) - QP->>Kafka: Publish ordered PUT commit message - Kafka->>SN: Deliver sequenced PUT for this partition + + Note over QP: Wait for write_quorum (≥ 7) + + QP->>Kafka: Publish ordered PUT commit + Kafka->>SN: Deliver sequenced PUT + par Apply commit on nodes - SN->>RDB: Atomic write OpLog + Metadata (incl. object size) + SN->>RDB: Atomic write OpLog + Metadata + and RDB-->>SN: OK + and SN-->>QP: 200 ACK end - Note over QP: Wait for write-quorum commit ACKs (≥ write_quorum; here ≥7) + + Note over QP: Wait for write_quorum (≥ 7) QP-->>C: 200 OK ``` @@ -159,23 +167,27 @@ sequenceDiagram participant C as S3 Client participant QP as Query Processor participant Kafka as Kafka / PubSub (Sequencer) - participant SN as Storage Nodes (×n, here n=9) + participant SN as Storage Nodes (×n, n=9) participant RDB as RocksDB (each SN) C->>QP: DELETE /{bucket}/{key} QP->>Kafka: Publish ordered DELETE commit message Kafka->>SN: Deliver sequenced DELETE for this partition - par Apply tombstone commit on nodes + + par Apply tombstone commit SN->>RDB: Atomic write tombstone in Metadata + OpLog + and RDB-->>SN: OK + and SN-->>QP: 200 ACK end - Note over QP: Wait for write_quorum commit ACKs (here ≥7) + + Note over QP: Wait for write_quorum (≥ 7) QP-->>C: 204 No Content - Note over SN: Physical shard file deletion is decoupled: the commit marks a - Note over SN: tombstone in metadata + enqueues shards in the pending_deletes - Note over SN: column family; the BlobDeletionWorker drains it asynchronously. + Note over SN: Asynchronous cleanup: Tombstone marks intent; + Note over SN: BlobDeletionWorker drains pending_deletes + Note over SN: column family. ``` --- @@ -329,20 +341,24 @@ sequenceDiagram C->>QP: PUT /{bucket}/{key}?partNumber=N&uploadId=X [part data] QP->>Cache: Check mpu:session:{uploadId} exists + status=ACTIVE + alt Session not found or not ACTIVE QP-->>C: 404 Not Found / 409 Conflict end + QP->>Cache: Check mpu:parts:{uploadId} for partNumber N + alt Part already uploaded - QP-->>C: 409 Conflict (Part already uploaded) + QP-->>C: 409 Conflict + else Part not uploaded + QP->>SW: put(requestId, bucket, partStorageKey, length, data) + SW->>SN: Stream erasure-coded shards concurrently + SN-->>SW: ACKs (≥ 7) + SW-->>QP: Success + QP->>Cache: Add partNumber N → mpu:parts:{uploadId} + QP->>Cache: Store part size → mpu:partsize:{uploadId}:{N} + QP-->>C: 200 OK end - QP->>SW: put(requestId, bucket, partStorageKey, length, data) - SW->>SN: Stream erasure-coded shards concurrently - SN-->>SW: ACKs (≥ write_quorum required; here ≥7) - SW-->>QP: Success - QP->>Cache: Add partNumber N → mpu:parts:{uploadId} - QP->>Cache: Store part size → mpu:partsize:{uploadId}:{N} - QP-->>C: 200 OK ``` --- From 412597b172735bb19781af135157f91f4e8b73fc Mon Sep 17 00:00:00 2001 From: Yonatan Ginsburg <38598309+ygins@users.noreply.github.com> Date: Wed, 20 May 2026 11:16:55 -0400 Subject: [PATCH 09/14] Fix duplicate solution entry in challenges documentation Removed duplicate solution explanation for the discovery cache challenge. --- docs/diagrams/docs/03-challenges.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/diagrams/docs/03-challenges.md b/docs/diagrams/docs/03-challenges.md index c455143b..c96b23a3 100644 --- a/docs/diagrams/docs/03-challenges.md +++ b/docs/diagrams/docs/03-challenges.md @@ -48,7 +48,7 @@ This throughput increase aligns with the benefits of system scalability. - **Challenge:** Without health awareness, every QP request paid the full per-request HTTP timeout (today: a 10 s connect timeout on the data-path `HttpClient`) for each unreachable storage node. One dead node added the full connect-timeout cost to every GET/PUT; three dead nodes (still within fault tolerance) compounded that delay, inflating sub-second requests into multi-second ones. - **Sub-challenge — stale health cache:** A cached "DOWN" marking can become wrong: the node may have recovered, or a transient network blip may have falsely marked a healthy node as down. A naive hard-exclusion cache would prevent recovery indefinitely (false negative) or shed load unnecessarily (false positive). -- **Solution:** Each QP runs a [`NodeHealthTracker`](../../../query-processor/src/main/java/com/github/koop/queryprocessor/processor/NodeHealthTracker.java) (state machine: `HEALTHY → SUSPECT → DOWN`) fed by both a periodic [`NodeHealthProbe`](../../../query-processor/src/main/java/com/github/koop/queryprocessor/processor/NodeHealthProbe.java) (5 s interval, `GET /health`) and outcomes of real data-path requests. Data-path operations **soft-exclude** DOWN nodes — they are skipped only while enough healthy peers remain to make quorum; otherwise the path falls back to contacting every node. Any successful response from a DOWN-marked node immediately promotes it back to `HEALTHY` (opportunistic recovery). +- **Solution:** Each QP runs a [`NodeHealthTracker`](../../../query-processor/src/main/java/com/github/koop/queryprocessor/processor/NodeHealthTracker.java) (state machine: `HEALTHY → SUSPECT → DOWN`) fed by both a periodic [`NodeHealthProbe`](../../../query-processor/src/main/java/com/github/koop/queryprocessor/processor/NodeHealthProbe.java) (5 s interval, `GET /health`) and outcomes of real data-path requests. - **Benefit:** Dead-node latency drops to sub-second on the fast path, the quorum denominator is never reduced, and the cache self-heals against both false positives (probe re-detects recovery) and false negatives (data-path successes override stale DOWN markings). ## Write Consistency From 500eb3d26ab6a60da2713c2c1eeebb05f44f68df Mon Sep 17 00:00:00 2001 From: Yonatan Ginsburg <38598309+ygins@users.noreply.github.com> Date: Wed, 20 May 2026 11:17:26 -0400 Subject: [PATCH 10/14] Refine Discovery Cache explanation in architecture docs Updated the description of the Discovery Cache to clarify its function and removed redundant information. --- docs/diagrams/docs/04-architecture.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/diagrams/docs/04-architecture.md b/docs/diagrams/docs/04-architecture.md index e9ade1e9..65f26243 100644 --- a/docs/diagrams/docs/04-architecture.md +++ b/docs/diagrams/docs/04-architecture.md @@ -13,7 +13,7 @@ The architecture consists of four primary services: - **Erasure Coding:** Encodes incoming data into `m` data + `k = n − m` parity shards and distributes them to storage nodes. Reconstructs data from any `m` available shards on retrieval. (Variable naming matches [`ErasureCoder`](../../../common-lib/src/main/java/com/github/koop/common/erasure/ErasureCoder.java): `m` = data shards, `k` = parity shards, `n = m + k` = total shards.) - **Routing:** Maps each `(bucket, key)` to a partition and to an erasure set using configuration loaded from Etcd (`erasure_set_configurations`, `partition_spread_configurations`). - **Multipart Manager:** Coordinates multipart upload sessions, tracking parts and publishing the final commit message. - - **Discovery Cache (`NodeHealthTracker` + `NodeHealthProbe`):** Each QP maintains a local cache of storage-node liveness. A background probe issues `GET /health` to every known storage node every 5 s (2 s timeout) and drives a `HEALTHY → SUSPECT → DOWN` state machine. Data-path operations (`put`, `fetchShards`, `bucketExists`, `fetchListFromAnyNode`) consult the cache to **soft-exclude** DOWN nodes, eliminating the per-request 30 s timeout penalty for known-dead peers. The cache is advisory only — quorum math is unchanged, and the path falls back to the full node list when too few healthy nodes remain or when a request to a DOWN-marked node succeeds (opportunistic recovery). + - **Discovery Cache (`NodeHealthTracker` + `NodeHealthProbe`):** Each QP maintains a local cache of storage-node liveness. A background probe issues `GET /health` to every known storage node every 5 s (2 s timeout) and drives a `HEALTHY → SUSPECT → DOWN` state machine. Data-path operations (`put`, `fetchShards`, `bucketExists`, `fetchListFromAnyNode`) consult the cache. 2. **Storage Nodes:** - **Stateful Backend:** Receives erasure-coded shards over HTTP and persists them to disk. From d9c40715c71e3b48b80d9f5fd1ab26131884a968 Mon Sep 17 00:00:00 2001 From: Yonatan Ginsburg <38598309+ygins@users.noreply.github.com> Date: Wed, 20 May 2026 11:20:22 -0400 Subject: [PATCH 11/14] Revise storage node repair process details Updated the repair process description for storage nodes to include Kafka message handling and RocksDb queue usage. --- docs/diagrams/docs/04-architecture.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/diagrams/docs/04-architecture.md b/docs/diagrams/docs/04-architecture.md index 65f26243..3834bd15 100644 --- a/docs/diagrams/docs/04-architecture.md +++ b/docs/diagrams/docs/04-architecture.md @@ -68,7 +68,7 @@ DELETE, CreateBucket, and DeleteBucket follow the same Kafka-sequenced pattern a #### 5. Repair -When a storage node restarts and detects a gap in its sequence numbers, its `RepairWorkerPool` requests the missing operations from peer nodes and replays them, skipping any keys whose post-recovery state already supersedes the replayed op. See [`workflow.md`](05-workflow.md#12-storage-node-repair-flow) for the full flow. +When a storage node restarts and catches up on Kafka messages that it missed, it compacts messages using the latest version of each key to prevent redundant repairs, and then repairs any missing blobs in the background with a persistent RocksDb queue. ## Diagram Reference From b4cd7b65db5bc0dfdf4260f7bc380da6f1f39810 Mon Sep 17 00:00:00 2001 From: Eitan Leitner <123138188+EitanLeit23@users.noreply.github.com> Date: Wed, 20 May 2026 11:24:49 -0400 Subject: [PATCH 12/14] Update workflow diagram for multipart upload part --- docs/diagrams/docs/05-workflow.md | 58 +++++++++++++++++++++---------- 1 file changed, 40 insertions(+), 18 deletions(-) diff --git a/docs/diagrams/docs/05-workflow.md b/docs/diagrams/docs/05-workflow.md index 46596ea7..7f91ac37 100644 --- a/docs/diagrams/docs/05-workflow.md +++ b/docs/diagrams/docs/05-workflow.md @@ -335,29 +335,51 @@ See [`MultipartUploadManager.uploadPart()`](../../../query-processor/src/main/ja sequenceDiagram participant C as S3 Client participant QP as Query Processor - participant Cache as Cache (Redis / MemoryCache) - participant SW as StorageWorker - participant SN as Storage Nodes (×n, here n=9) + participant Cache as Cache (Redis) + participant SW as Storage Worker + participant SN as Storage Nodes - C->>QP: PUT /{bucket}/{key}?partNumber=N&uploadId=X [part data] - QP->>Cache: Check mpu:session:{uploadId} exists + status=ACTIVE + C->>QP: PUT /{bucket}/{key}?partNumber=N&uploadId=X - alt Session not found or not ACTIVE - QP-->>C: 404 Not Found / 409 Conflict + QP->>Cache: Check mpu:session:{uploadId} is ACTIVE + alt Session missing/aborted + QP-->>C: 404/409 Error end + + Note over QP,Cache: 1. Atomically reserve part number + QP->>Cache: setAddIfAbsent(mpu:parts:{uploadId}, partNumber) - QP->>Cache: Check mpu:parts:{uploadId} for partNumber N - - alt Part already uploaded + alt Part already reserved QP-->>C: 409 Conflict - else Part not uploaded - QP->>SW: put(requestId, bucket, partStorageKey, length, data) - SW->>SN: Stream erasure-coded shards concurrently - SN-->>SW: ACKs (≥ 7) - SW-->>QP: Success - QP->>Cache: Add partNumber N → mpu:parts:{uploadId} - QP->>Cache: Store part size → mpu:partsize:{uploadId}:{N} - QP-->>C: 200 OK + else Reservation Success + + QP->>SW: put(partStorageKey, length, data) + + Note over SW,SN: SW handles erasure-coding, concurrent
streaming, and distributed commit. + SW->>SN: Stream erasure-coded shards + SN-->>SW: Quorum ACKs received + + alt Storage Failure + SW-->>QP: False / Exception + QP->>Cache: setRemove(mpu:parts:{uploadId}, partNumber) + QP-->>C: 500/503 Storage Failure + else Storage Success + SW-->>QP: True + + Note over QP,Cache: 2. Verify session wasn't aborted during I/O + QP->>Cache: Check session still ACTIVE + + alt Session aborted during upload + QP->>SW: delete(partStorageKey) + SW->>SN: Delete orphaned shards + QP->>Cache: setRemove(mpu:parts:{uploadId}, partNumber) + QP-->>C: 409 Conflict + else Session Still Active + Note over QP,Cache: 3. Finalize part metadata + QP->>Cache: Store part size (mpu:partsize:{uploadId}:{N}) + QP-->>C: 200 OK + end + end end ``` From 78e8c027a4383cc3cab9c85b33a755bb5dadbed3 Mon Sep 17 00:00:00 2001 From: Michael Kupferstein Date: Wed, 20 May 2026 11:28:31 -0400 Subject: [PATCH 13/14] Refine node health tracking and exclusion logic in documentation --- docs/diagrams/docs/03-challenges.md | 2 +- docs/diagrams/docs/04-architecture.md | 2 +- docs/diagrams/docs/05-workflow.md | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/diagrams/docs/03-challenges.md b/docs/diagrams/docs/03-challenges.md index c455143b..306d25f0 100644 --- a/docs/diagrams/docs/03-challenges.md +++ b/docs/diagrams/docs/03-challenges.md @@ -48,7 +48,7 @@ This throughput increase aligns with the benefits of system scalability. - **Challenge:** Without health awareness, every QP request paid the full per-request HTTP timeout (today: a 10 s connect timeout on the data-path `HttpClient`) for each unreachable storage node. One dead node added the full connect-timeout cost to every GET/PUT; three dead nodes (still within fault tolerance) compounded that delay, inflating sub-second requests into multi-second ones. - **Sub-challenge — stale health cache:** A cached "DOWN" marking can become wrong: the node may have recovered, or a transient network blip may have falsely marked a healthy node as down. A naive hard-exclusion cache would prevent recovery indefinitely (false negative) or shed load unnecessarily (false positive). -- **Solution:** Each QP runs a [`NodeHealthTracker`](../../../query-processor/src/main/java/com/github/koop/queryprocessor/processor/NodeHealthTracker.java) (state machine: `HEALTHY → SUSPECT → DOWN`) fed by both a periodic [`NodeHealthProbe`](../../../query-processor/src/main/java/com/github/koop/queryprocessor/processor/NodeHealthProbe.java) (5 s interval, `GET /health`) and outcomes of real data-path requests. Data-path operations **soft-exclude** DOWN nodes — they are skipped only while enough healthy peers remain to make quorum; otherwise the path falls back to contacting every node. Any successful response from a DOWN-marked node immediately promotes it back to `HEALTHY` (opportunistic recovery). +- **Solution:** Each QP runs a [`NodeHealthTracker`](../../../query-processor/src/main/java/com/github/koop/queryprocessor/processor/NodeHealthTracker.java) (state machine: `HEALTHY → SUSPECT → DOWN`) fed by both a periodic [`NodeHealthProbe`](../../../query-processor/src/main/java/com/github/koop/queryprocessor/processor/NodeHealthProbe.java) (5 s interval, `GET /health`) and outcomes of real data-path requests. Data-path operations **skip** DOWN nodes — the exclusion is unconditional, but the cache is advisory only: it never reduces quorum requirements. If too few healthy nodes remain to meet quorum, the operation fails fast rather than contacting nodes the tracker has written off. A DOWN-marked node returns to service only when the background probe re-detects it, or when a later data-path request to it succeeds — any successful response immediately promotes it back to `HEALTHY` (opportunistic recovery). - **Benefit:** Dead-node latency drops to sub-second on the fast path, the quorum denominator is never reduced, and the cache self-heals against both false positives (probe re-detects recovery) and false negatives (data-path successes override stale DOWN markings). ## Write Consistency diff --git a/docs/diagrams/docs/04-architecture.md b/docs/diagrams/docs/04-architecture.md index e9ade1e9..70c1de58 100644 --- a/docs/diagrams/docs/04-architecture.md +++ b/docs/diagrams/docs/04-architecture.md @@ -13,7 +13,7 @@ The architecture consists of four primary services: - **Erasure Coding:** Encodes incoming data into `m` data + `k = n − m` parity shards and distributes them to storage nodes. Reconstructs data from any `m` available shards on retrieval. (Variable naming matches [`ErasureCoder`](../../../common-lib/src/main/java/com/github/koop/common/erasure/ErasureCoder.java): `m` = data shards, `k` = parity shards, `n = m + k` = total shards.) - **Routing:** Maps each `(bucket, key)` to a partition and to an erasure set using configuration loaded from Etcd (`erasure_set_configurations`, `partition_spread_configurations`). - **Multipart Manager:** Coordinates multipart upload sessions, tracking parts and publishing the final commit message. - - **Discovery Cache (`NodeHealthTracker` + `NodeHealthProbe`):** Each QP maintains a local cache of storage-node liveness. A background probe issues `GET /health` to every known storage node every 5 s (2 s timeout) and drives a `HEALTHY → SUSPECT → DOWN` state machine. Data-path operations (`put`, `fetchShards`, `bucketExists`, `fetchListFromAnyNode`) consult the cache to **soft-exclude** DOWN nodes, eliminating the per-request 30 s timeout penalty for known-dead peers. The cache is advisory only — quorum math is unchanged, and the path falls back to the full node list when too few healthy nodes remain or when a request to a DOWN-marked node succeeds (opportunistic recovery). + - **Discovery Cache (`NodeHealthTracker` + `NodeHealthProbe`):** Each QP maintains a local cache of storage-node liveness. A background probe issues `GET /health` to every known storage node every 5 s (2 s timeout) and drives a `HEALTHY → SUSPECT → DOWN` state machine. Data-path operations (`put`, `fetchShards`, `bucketExists`, `fetchListFromAnyNode`) consult the cache to **skip** DOWN nodes, eliminating the per-request connect-timeout penalty (today: a 10 s connect timeout on the data-path `HttpClient`) for known-dead peers. The cache is advisory only — quorum math is unchanged: DOWN nodes are skipped unconditionally, and if too few healthy nodes remain to meet quorum the operation fails fast rather than contacting written-off nodes. A DOWN-marked node is restored only by the background probe re-detecting it or by a later data-path request to it succeeding (opportunistic recovery). 2. **Storage Nodes:** - **Stateful Backend:** Receives erasure-coded shards over HTTP and persists them to disk. diff --git a/docs/diagrams/docs/05-workflow.md b/docs/diagrams/docs/05-workflow.md index 46596ea7..c1673e24 100644 --- a/docs/diagrams/docs/05-workflow.md +++ b/docs/diagrams/docs/05-workflow.md @@ -496,7 +496,7 @@ sequenceDiagram Each Query Processor maintains a local [`NodeHealthTracker`](../../../query-processor/src/main/java/com/github/koop/queryprocessor/processor/NodeHealthTracker.java) that classifies every known storage node as `HEALTHY`, `SUSPECT`, or `DOWN`. A background [`NodeHealthProbe`](../../../query-processor/src/main/java/com/github/koop/queryprocessor/processor/NodeHealthProbe.java) refreshes the cache every 5 s by issuing `GET /health` to every machine in the current `ErasureSetConfiguration`. Data-path operations (`put`, `fetchShards`, `bucketExists`, `fetchListFromAnyNode` in [`StorageWorker`](../../../query-processor/src/main/java/com/github/koop/queryprocessor/processor/StorageWorker.java)) consult the tracker to skip nodes known to be down — avoiding the full per-request HTTP timeout (today: a 10 s connect timeout on the data-path `HttpClient`) for each dead peer — while keeping the quorum denominator unchanged. -Soft-exclusion semantics: if too few healthy nodes remain to make progress (e.g. fewer than `m` for reads or fewer than `writeQuorum` for writes), the path falls back to contacting every node so a recovered-but-still-marked-DOWN node has a chance to succeed. Any successful response to a DOWN-marked node immediately promotes it back to HEALTHY. +Exclusion semantics: DOWN nodes are skipped unconditionally, but the tracker is advisory only — it never reduces quorum requirements. If too few healthy nodes remain to make progress (e.g. fewer than `m` for reads or fewer than `writeQuorum` for writes), the operation fails fast rather than contacting nodes the tracker has written off. A DOWN-marked node returns to service only when the background probe re-detects it, or when a later data-path request to it succeeds — any successful response immediately promotes it back to HEALTHY (opportunistic recovery). ```mermaid flowchart LR @@ -517,8 +517,8 @@ flowchart LR Probe -- HTTP GET /health
2s timeout --> SN2 Probe -- HTTP GET /health
2s timeout --> SN3 - Worker -- shard / bucket /
list HTTP calls
(DOWN nodes skipped
if enough healthy) --> SN1 - Worker -- skipped while DOWN
(fallback contacts
everyone) --> SN2 + Worker -- shard / bucket /
list HTTP calls
(healthy nodes only) --> SN1 + Worker -- skipped while DOWN
(no fallback —
fail fast if quorum lost) --> SN2 Worker --> SN3 ``` From 7681a7fc0437ff27a228bb4563897e0d6319a28b Mon Sep 17 00:00:00 2001 From: Yonatan Ginsburg <38598309+ygins@users.noreply.github.com> Date: Wed, 20 May 2026 11:59:42 -0400 Subject: [PATCH 14/14] Enhance documentation with new challenges and solutions Added sections on asynchronous repair concurrency, garbage collection, clock drift, hung reads, physical disk consistency, and multipart upload consistency. --- docs/diagrams/docs/03-challenges.md | 29 +++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/docs/diagrams/docs/03-challenges.md b/docs/diagrams/docs/03-challenges.md index 306d25f0..f1bcfaf4 100644 --- a/docs/diagrams/docs/03-challenges.md +++ b/docs/diagrams/docs/03-challenges.md @@ -39,6 +39,11 @@ This throughput increase aligns with the benefits of system scalability. - If an internal write or delete operation fails, the system retries the operation internally. - If the gateway goes down, the client handles the retry from the S3 side. +### Asynchronous Repair Concurrency & Resilience +- **Challenge:** Node repairs must survive node restarts, avoid duplicating effort for the same object, and avoid interfering with ongoing live client writes. +- **Solution:** Repairs are enqueued into a persistent `RocksDbRepairQueue`. This queue automatically deduplicates overlapping repair requests for the same blob, keeping only the highest sequence offset. A pool of virtual threads (`RepairWorkerPool`) polls this queue but explicitly defers any repairs for keys that are actively being written to (checked via a `WriteTracker`). +- **Benefit:** If a storage node crashes, no distinct recovery phase is needed—the poller simply resumes processing the persistent queue on startup. Concurrent writes are protected from being overwritten by stale background repairs. + ### Offline Nodes - The system is fault-tolerant and continues to operate even if nodes are missing. @@ -72,20 +77,36 @@ This throughput increase aligns with the benefits of system scalability. - If the chosen max version has fewer than `m` shards available (so Reed-Solomon cannot reconstruct from it), the QP falls back to an older version via `reconstructFromOlderVersion` instead of failing. - A latest-version tombstone short-circuits to `404 NoSuchKey`; an outright reconstruction failure surfaces as `500 InternalError`. -### Garbage Collection +## Garbage Collection & Watermarks - Storage nodes gossip about their per-partition cursors and active reads; the resulting **gossip-derived watermark** (not a formal consensus protocol) tells each node which versions are safe to physically reap. +### Clock Drift & Stale Peers in Gossip +- **Challenge:** Wall-clock drift between nodes makes absolute timestamps unreliable for staleness checks. Furthermore, if a node goes offline, its last gossiped state could permanently stall the global watermark. +- **Solution:** The system uses **receiver-side clock timestamping**. When a node receives a gossip message, it records the update against its *own* clock, ignoring the sender's absolute timestamp. If a peer hasn't been heard from within a strict staleness window, it is excluded from the global minimum calculation. + +### Hung Reads (Watermark Pinning) +- **Challenge:** If a client disconnects unexpectedly or a server-side bug bypasses a close procedure during a GET request, the active read handle might never close. This permanently pins the partition's minimum active sequence number, blocking the garbage collector from reclaiming old versions. +- **Solution:** The `ActiveReadTracker` enforces **TTL leases** for active reads. A background pruner sweeps the tracker and forcibly evicts any read handle older than a configured maximum lease duration, decrementing the reference count and allowing the GC watermark to safely advance. + ## Data Version Overload - **Challenge:** Versioning can result in stale object versions consuming too much space over time. - **Solution:** Nodes gossip to determine the lowest version level still actively in use. - **Cleanup:** Any node possessing a version lower than the active baseline deletes that data. -## Multi-Stage Operation Consistency +## Physical Disk Consistency (Crash Resilience) + +- **Challenge:** Deleting a physical blob from the filesystem and removing its metadata from the database are separate operations. If a storage node crashes exactly between committing the metadata deletion and unlinking the physical file, the untracked file will permanently consume disk space (a storage leak). +- **Solution:** The system decouples physical deletion from metadata cleanup. Deletions are logged to a durable `pending_deletes` queue in RocksDB in the exact same transaction as the metadata removal. A background `BlobDeletionWorker` asynchronously drains this queue and issues the `Files.deleteIfExists` calls. +- **Benefit:** If the node crashes, the pending-deletion entry survives in RocksDB and is re-processed on startup, guaranteeing that no untracked blobs accumulate on disk. + +## Multi-Stage Operation Consistency (Multipart Uploads) -- **Challenge:** Maintaining consensus for multi-stage operations, such as multipart uploads. -- **Solution:** A Redis cache is utilized to store the ongoing status of multipart uploads. +- **Challenge:** Multi-part uploads introduce severe concurrency and synchronization risks (e.g., race conditions between concurrent part uploads, completion calls, and aborts) and pose a massive I/O penalty if large files must be physically assembled upon completion. +- **Solution (Concurrency Control):** The system enforces strict state machine transitions (`ACTIVE` → `COMPLETING` / `ABORTING`) using a Redis cache. Part uploads require an **atomic reservation** (`setAddIfAbsent`) in the cache's part set before any data is streamed to storage nodes. If an upload is concurrently aborted or completing, the reservation fails instantly, and any orphaned shards are rolled back. +- **Solution (Read-Time Reconstruction):** To eliminate the I/O bottleneck of assembling files on write, the `completeMultipartUpload` process never materializes the full object. Instead, it dispatches a **manifest** (via the two-phase `CommitCoordinator` and Kafka) listing the individual part keys. The erasure-coded shards remain distributed; the system dynamically fetches, reconstructs, and concatenates the parts on the fly during a client GET request. +- **Solution (Namespace Isolation):** To prevent multipart chunk keys from colliding with user-defined keys on the storage nodes, individual part shards are written utilizing a protected, non-user-controllable namespace format (`__mpu__:{bucket}:{key}:{uploadId}:{partNumber}`). ## Bucket Item Consistency