diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index 4d022370..6fdf0664 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -2,7 +2,8 @@ name: Build and Deploy Architecture Docs on: push: - branches: ["main"] + # 1. ADD YOUR NEW BRANCH HERE + branches: ["main", "final-docs"] permissions: contents: read @@ -13,7 +14,6 @@ concurrency: group: "pages" cancel-in-progress: true -# Fixes the Node.js warning by opting into the new Node 24 runtime env: FORCE_JAVASCRIPT_ACTIONS_TO_NODE24: true @@ -27,18 +27,19 @@ jobs: steps: - name: Check out repository uses: actions/checkout@v4 + with: + # 2. ADD THIS: Fetches all branches so the site generator can see them both + fetch-depth: 0 - name: Generate Site via Docker run: | - # 1. Create the output directory beforehand to prevent the FileNotFoundException mkdir -p build/site - # 2. Run the site generator as the current GitHub runner user docker run --rm -u $(id -u):$(id -g) -v ${{ github.workspace }}:/app -w /app \ ghcr.io/avisi-cloud/structurizr-site-generatr \ generate-site -w docs/diagrams/workspace.dsl \ --default-branch main \ - --branches main + --branches main,final-docs # 3. ADD YOUR NEW BRANCH HERE - name: Setup GitHub Pages uses: actions/configure-pages@v4 diff --git a/.gitignore b/.gitignore index 2f323a94..cb576389 100644 --- a/.gitignore +++ b/.gitignore @@ -11,4 +11,6 @@ CLAUDE.md settings.local.json create.py k8s-9/k8-logs/ -k8s/k8-logs/ \ No newline at end of file +k8s/k8-logs/ +venv/ +docs/superpowers \ No newline at end of file diff --git a/docs/challenges.md b/docs/challenges.md deleted file mode 100644 index e43c5583..00000000 --- a/docs/challenges.md +++ /dev/null @@ -1,90 +0,0 @@ -# Distributed System Rationale - -## Scalability - -Storing all data on a single node is problematic; utilizing additional machines provides more storage capacity. - -Fixed partitions, or virtual nodes, allow data to be split across nodes when adding capacity, avoiding the need to repartition everything. - -## Durability - -Relying on a single node risks complete data loss if that node experiences a failure. - -The system addresses this durability concern through erasure coding. - -## Throughput - -Separating writers and readers from the storage nodes allows the system to scale its IO capabilities. - -Utilizing multiple nodes significantly increases overall throughput. - -This throughput increase aligns with the benefits of system scalability. - -# Distributed System Challenges & Solutions - -## Fault Tolerance - -- **Challenge:** Distributing data across nodes means the system must handle individual node failures. -- **Solution:** Erasure coding allows the system to tolerate a specified number (`k`) of node failures. -- **Benefit:** Erasure coding saves space compared to standard redundant data replication. - -## Recovery & Retries - -- When a node returns online, it initiates a repair procedure utilizing durable message delivery via Kafka. -- If an internal write or delete operation fails, the system retries the operation internally. -- If the gateway goes down, the client handles the retry from the S3 side. - -### Offline Nodes - -- The system is fault-tolerant and continues to operate even if nodes are missing. -- Returning nodes can retrieve missed operations from Kafka or other nodes. - -### Discovery Cache for Dead Storage Nodes - -- **Challenge:** Without health awareness, every QP request paid the full per-node HTTP timeout (10 s connect + 30 s read) for each unreachable storage node. One dead node added up to 30 s to every GET/PUT; three dead nodes (still within fault tolerance) inflated sub-second requests to 90 s+. -- **Sub-challenge — stale health cache:** A cached "DOWN" marking can become wrong: the node may have recovered, or a transient network blip may have falsely marked a healthy node as down. A naive hard-exclusion cache would prevent recovery indefinitely (false negative) or shed load unnecessarily (false positive). -- **Solution:** Each QP runs a [`NodeHealthTracker`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/NodeHealthTracker.java) (state machine: `HEALTHY → SUSPECT → DOWN`) fed by both a periodic [`NodeHealthProbe`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/NodeHealthProbe.java) (5 s interval, `GET /health`) and outcomes of real data-path requests. Data-path operations **soft-exclude** DOWN nodes — they are skipped only while enough healthy peers remain to make quorum; otherwise the path falls back to contacting every node. Any successful response from a DOWN-marked node immediately promotes it back to `HEALTHY` (opportunistic recovery). -- **Benefit:** Dead-node latency drops from ~30 s to sub-second on the fast path, the quorum denominator is never reduced, and the cache self-heals against both false positives (probe re-detects recovery) and false negatives (data-path successes override stale DOWN markings). - -## Write Consistency - -- **Challenge:** Ensuring shards are successfully written to a write quorum of nodes. -- **Solution:** The system employs a two-phase commit strategy. - -The shards are first uploaded to the nodes; once all are uploaded, a commit message is dispatched via Kafka. - -## PUT/DELETE Ordering - -- **Challenge:** Concurrent writes, such as two PUTs to the same key, can lead to inconsistent states if storage nodes apply them in different orders. -- **Solution:** Kafka sequencing assigns a sequence number for each operation within a data partition. -- **Benefit:** Regardless of arrival time, nodes know how requests compare to one another, allowing the system to logically resolve the latest commit. - -## Read Consistency & Version Conflicts - -- **Challenge:** Different nodes may return conflicting versions of data during a read. -- **Solution:** The system returns the highest version agreed upon by a quorum. - -### Handling Stale Data - -- If a version lacks a quorum, it is not considered fully acknowledged or logically valid. -- In cases of missing quorum, the highest quorum version is returned instead. - -### Garbage Collection - -- Consensus among storage nodes determines the latest version that is safe to delete, utilizing a gossip-based watermark. - -## Data Version Overload - -- **Challenge:** Versioning can result in stale object versions consuming too much space over time. -- **Solution:** Nodes gossip to determine the lowest version level still actively in use. -- **Cleanup:** Any node possessing a version lower than the active baseline deletes that data. - -## Multi-Stage Operation Consistency - -- **Challenge:** Maintaining consensus for multi-stage operations, such as multipart uploads. -- **Solution:** A Redis cache is utilized to store the ongoing status of multipart uploads. - -## Bucket Item Consistency - -- **Challenge:** Storage nodes commit in order, but not always simultaneously, potentially affecting bucket item consistency across nodes. -- **Solution:** When streaming items in a bucket, an item is considered to be in the system if it is present on at least a write quorum of nodes. \ No newline at end of file diff --git a/docs/Object Store.md b/docs/diagrams/docs/01-index.md similarity index 53% rename from docs/Object Store.md rename to docs/diagrams/docs/01-index.md index 3a90297f..2edd8b9f 100644 --- a/docs/Object Store.md +++ b/docs/diagrams/docs/01-index.md @@ -6,18 +6,18 @@ KoopDB is a distributed, S3-compatible object store. Objects are split into Reed-Solomon erasure-coded shards and spread across a cluster of storage nodes; mutations are ordered through Kafka and persisted in RocksDB on each node. A stateless Query Processor fronts the cluster with the S3 HTTP API. -For a top-level summary of features and the system topology, see the project [README](../README.md). +For a top-level summary of features and the system topology, see the project [README](../../../README.md). ## Project Plan -- [Scope (scenarios covered)](scope.md) -- [Distributed System Challenges](challenges.md) -- [Workflow Diagrams](workflow.md) -- [Software Architecture](architecture.md) -- [Tools & Technologies](technologies.md) +- [Scope (scenarios covered)](02-scope.md) +- [Distributed System Challenges](03-challenges.md) +- [Workflow Diagrams](05-workflow.md) +- [Software Architecture](04-architecture.md) +- [Tools & Technologies](06-technologies.md) ## Installation and Usage Guide -- [Installation Instructions](installation.md) -- [API and Usage Documentation](api.md) -- [Query Processor (API Gateway) module README](../query-processor/README.md) +- [Installation Instructions](07-installation.md) +- [API and Usage Documentation](08-api.md) +- [Query Processor (API Gateway) module README](../../../query-processor/README.md) diff --git a/docs/scope.md b/docs/diagrams/docs/02-scope.md similarity index 76% rename from docs/scope.md rename to docs/diagrams/docs/02-scope.md index 8328417c..b0cd970f 100644 --- a/docs/scope.md +++ b/docs/diagrams/docs/02-scope.md @@ -6,14 +6,14 @@ The KoopDB project focuses on building a **distributed, fault-tolerant object st ### Core Features - **S3 Compatibility:** Support for standard S3 tools (like AWS CLI and SDKs) for fundamental operations. -- **Distributed Architecture:** A headless cluster of Query Processors (gateways) and Storage Nodes. -- **Fault Tolerance:** Data is sharded using **Erasure Coding** (Reed-Solomon equivalent), allowing the system to survive node failures without data loss. -- **Metadata Management:** Centralized, consistent metadata storage using **Etcd** to manage partition maps +- **Distributed Architecture:** A horizontally scaled cluster of Query Processors (stateless gateways) and Storage Nodes, with no single coordinator on the data path. +- **Fault Tolerance:** Data is sharded using **Reed-Solomon erasure coding**, allowing the system to survive node failures without data loss. +- **Metadata Management:** Centralized, consistent metadata storage using **Etcd** to manage partition maps. - **Multipart Uploads:** Support for uploading large objects in parts, reassembled upon completion. -### supported Operations -- **Buckets:** Create, Delete, List, Head. -- **Objects:** Put, Get, Delete. +### Supported Operations +- **Buckets:** Create, Delete, List, Head, List Buckets. +- **Objects:** Put, Get, Head, Delete. - **Multipart:** Initiate, Upload Part, Complete, Abort. ## Out of Scope diff --git a/docs/diagrams/docs/03-challenges.md b/docs/diagrams/docs/03-challenges.md new file mode 100644 index 00000000..f1bcfaf4 --- /dev/null +++ b/docs/diagrams/docs/03-challenges.md @@ -0,0 +1,114 @@ +# Distributed System Rationale + +## Scalability + +Storing all data on a single node is problematic; utilizing additional machines provides more storage capacity. + +Fixed partitions, or virtual nodes, allow data to be split across nodes when adding capacity, avoiding the need to repartition everything when adding nodes. + +## Durability + +Relying on a single node risks complete data loss if that node experiences a failure. + +The system addresses this durability concern through erasure coding. + +Kafka ensures durable message delivery to storage nodes + +Storage nodes store metadata in persistent RocksDB databases which is durable to failures + +## Throughput + +Separating writers and readers from the storage nodes allows the system to scale its IO capabilities. + +Utilizing multiple nodes significantly increases overall throughput. + +This throughput increase aligns with the benefits of system scalability. + +# Distributed System Challenges & Solutions + +## Fault Tolerance + +- **Challenge:** Distributing data across nodes means the system must handle individual node failures. +- **Solution:** Erasure coding + asynchronous repair allows the system to tolerate a specified number (`k`) of node failures. +- **Benefit:** Erasure coding saves space compared to standard redundant data replication. +- Control plane (etcd, redis, kafka) run as clusters which are fault tolerant + +## Recovery & Retries + +- When a node returns online, it initiates a repair procedure utilizing durable message delivery via Kafka. +- If an internal write or delete operation fails, the system retries the operation internally. +- If the gateway goes down, the client handles the retry from the S3 side. + +### Asynchronous Repair Concurrency & Resilience +- **Challenge:** Node repairs must survive node restarts, avoid duplicating effort for the same object, and avoid interfering with ongoing live client writes. +- **Solution:** Repairs are enqueued into a persistent `RocksDbRepairQueue`. This queue automatically deduplicates overlapping repair requests for the same blob, keeping only the highest sequence offset. A pool of virtual threads (`RepairWorkerPool`) polls this queue but explicitly defers any repairs for keys that are actively being written to (checked via a `WriteTracker`). +- **Benefit:** If a storage node crashes, no distinct recovery phase is needed—the poller simply resumes processing the persistent queue on startup. Concurrent writes are protected from being overwritten by stale background repairs. + +### Offline Nodes + +- The system is fault-tolerant and continues to operate even if nodes are missing. +- Returning nodes can retrieve missed operations from Kafka or other nodes. + +### Discovery Cache for Dead Storage Nodes + +- **Challenge:** Without health awareness, every QP request paid the full per-request HTTP timeout (today: a 10 s connect timeout on the data-path `HttpClient`) for each unreachable storage node. One dead node added the full connect-timeout cost to every GET/PUT; three dead nodes (still within fault tolerance) compounded that delay, inflating sub-second requests into multi-second ones. +- **Sub-challenge — stale health cache:** A cached "DOWN" marking can become wrong: the node may have recovered, or a transient network blip may have falsely marked a healthy node as down. A naive hard-exclusion cache would prevent recovery indefinitely (false negative) or shed load unnecessarily (false positive). +- **Solution:** Each QP runs a [`NodeHealthTracker`](../../../query-processor/src/main/java/com/github/koop/queryprocessor/processor/NodeHealthTracker.java) (state machine: `HEALTHY → SUSPECT → DOWN`) fed by both a periodic [`NodeHealthProbe`](../../../query-processor/src/main/java/com/github/koop/queryprocessor/processor/NodeHealthProbe.java) (5 s interval, `GET /health`) and outcomes of real data-path requests. Data-path operations **skip** DOWN nodes — the exclusion is unconditional, but the cache is advisory only: it never reduces quorum requirements. If too few healthy nodes remain to meet quorum, the operation fails fast rather than contacting nodes the tracker has written off. A DOWN-marked node returns to service only when the background probe re-detects it, or when a later data-path request to it succeeds — any successful response immediately promotes it back to `HEALTHY` (opportunistic recovery). +- **Benefit:** Dead-node latency drops to sub-second on the fast path, the quorum denominator is never reduced, and the cache self-heals against both false positives (probe re-detects recovery) and false negatives (data-path successes override stale DOWN markings). + +## Write Consistency + +- **Challenge:** Ensuring shards are successfully written to a write quorum of nodes. +- **Solution:** The system uses a **two-stage write** (not XA-style two-phase commit — there is no prepare/abort vote). Shards are first streamed to the storage nodes; once a write quorum of shard ACKs is received, a single ordered commit message is dispatched via Kafka and storage nodes apply the corresponding RocksDB write when they consume it. + +## PUT/DELETE Ordering + +- **Challenge:** Concurrent writes, such as two PUTs to the same key, can lead to inconsistent states if storage nodes apply them in different orders. +- **Solution:** Kafka sequencing (same topic and partition) assigns a sequence number for each operation within a data partition. +- **Benefit:** Regardless of arrival time, nodes know how requests compare to one another, allowing the system to logically resolve the latest commit. + +## Read Consistency & Version Conflicts + +- **Challenge:** Different nodes may return conflicting versions of data during a read. +- **Solution:** The QP selects the **maximum readable version** reported across responding storage nodes and reconstructs from the shards at that version. + +### Handling Stale Data + +- If the chosen max version has fewer than `m` shards available (so Reed-Solomon cannot reconstruct from it), the QP falls back to an older version via `reconstructFromOlderVersion` instead of failing. +- A latest-version tombstone short-circuits to `404 NoSuchKey`; an outright reconstruction failure surfaces as `500 InternalError`. + +## Garbage Collection & Watermarks + +- Storage nodes gossip about their per-partition cursors and active reads; the resulting **gossip-derived watermark** (not a formal consensus protocol) tells each node which versions are safe to physically reap. + +### Clock Drift & Stale Peers in Gossip +- **Challenge:** Wall-clock drift between nodes makes absolute timestamps unreliable for staleness checks. Furthermore, if a node goes offline, its last gossiped state could permanently stall the global watermark. +- **Solution:** The system uses **receiver-side clock timestamping**. When a node receives a gossip message, it records the update against its *own* clock, ignoring the sender's absolute timestamp. If a peer hasn't been heard from within a strict staleness window, it is excluded from the global minimum calculation. + +### Hung Reads (Watermark Pinning) +- **Challenge:** If a client disconnects unexpectedly or a server-side bug bypasses a close procedure during a GET request, the active read handle might never close. This permanently pins the partition's minimum active sequence number, blocking the garbage collector from reclaiming old versions. +- **Solution:** The `ActiveReadTracker` enforces **TTL leases** for active reads. A background pruner sweeps the tracker and forcibly evicts any read handle older than a configured maximum lease duration, decrementing the reference count and allowing the GC watermark to safely advance. + +## Data Version Overload + +- **Challenge:** Versioning can result in stale object versions consuming too much space over time. +- **Solution:** Nodes gossip to determine the lowest version level still actively in use. +- **Cleanup:** Any node possessing a version lower than the active baseline deletes that data. + +## Physical Disk Consistency (Crash Resilience) + +- **Challenge:** Deleting a physical blob from the filesystem and removing its metadata from the database are separate operations. If a storage node crashes exactly between committing the metadata deletion and unlinking the physical file, the untracked file will permanently consume disk space (a storage leak). +- **Solution:** The system decouples physical deletion from metadata cleanup. Deletions are logged to a durable `pending_deletes` queue in RocksDB in the exact same transaction as the metadata removal. A background `BlobDeletionWorker` asynchronously drains this queue and issues the `Files.deleteIfExists` calls. +- **Benefit:** If the node crashes, the pending-deletion entry survives in RocksDB and is re-processed on startup, guaranteeing that no untracked blobs accumulate on disk. + +## Multi-Stage Operation Consistency (Multipart Uploads) + +- **Challenge:** Multi-part uploads introduce severe concurrency and synchronization risks (e.g., race conditions between concurrent part uploads, completion calls, and aborts) and pose a massive I/O penalty if large files must be physically assembled upon completion. +- **Solution (Concurrency Control):** The system enforces strict state machine transitions (`ACTIVE` → `COMPLETING` / `ABORTING`) using a Redis cache. Part uploads require an **atomic reservation** (`setAddIfAbsent`) in the cache's part set before any data is streamed to storage nodes. If an upload is concurrently aborted or completing, the reservation fails instantly, and any orphaned shards are rolled back. +- **Solution (Read-Time Reconstruction):** To eliminate the I/O bottleneck of assembling files on write, the `completeMultipartUpload` process never materializes the full object. Instead, it dispatches a **manifest** (via the two-phase `CommitCoordinator` and Kafka) listing the individual part keys. The erasure-coded shards remain distributed; the system dynamically fetches, reconstructs, and concatenates the parts on the fly during a client GET request. +- **Solution (Namespace Isolation):** To prevent multipart chunk keys from colliding with user-defined keys on the storage nodes, individual part shards are written utilizing a protected, non-user-controllable namespace format (`__mpu__:{bucket}:{key}:{uploadId}:{partNumber}`). + +## Bucket Item Consistency + +- **Challenge:** Storage nodes commit in order, but not always simultaneously, so the per-node view of a bucket can diverge briefly. +- **Solution:** Bucket listings are served per-partition from a single healthy storage node — [`fetchListFromAnyNode`](../../../query-processor/src/main/java/com/github/koop/queryprocessor/processor/StorageWorker.java) walks the healthy node list and returns the first successful response. Because every mutation is Kafka-sequenced and each storage node converges on the same per-partition log, any node that has caught up gives the same answer; the eventual-consistency window during a divergence is bounded by Kafka consumer lag rather than reconciled per-item across replicas. diff --git a/docs/architecture.md b/docs/diagrams/docs/04-architecture.md similarity index 51% rename from docs/architecture.md rename to docs/diagrams/docs/04-architecture.md index 2da0ab97..210c7292 100644 --- a/docs/architecture.md +++ b/docs/diagrams/docs/04-architecture.md @@ -10,26 +10,31 @@ The architecture consists of four primary services: 1. **Query Processors (Gateway):** - **Stateless Front-End:** Accepts S3 API requests from clients. - - **Erasure Coding:** Encodes incoming data into `k` data + `(n-k)` parity shards and distributes them to storage nodes. Reconstructs data from any `k` available shards on retrieval. + - **Erasure Coding:** Encodes incoming data into `m` data + `k = n − m` parity shards and distributes them to storage nodes. Reconstructs data from any `m` available shards on retrieval. (Variable naming matches [`ErasureCoder`](../../../common-lib/src/main/java/com/github/koop/common/erasure/ErasureCoder.java): `m` = data shards, `k` = parity shards, `n = m + k` = total shards.) - **Routing:** Maps each `(bucket, key)` to a partition and to an erasure set using configuration loaded from Etcd (`erasure_set_configurations`, `partition_spread_configurations`). - **Multipart Manager:** Coordinates multipart upload sessions, tracking parts and publishing the final commit message. - - **Discovery Cache (`NodeHealthTracker` + `NodeHealthProbe`):** Each QP maintains a local cache of storage-node liveness. A background probe issues `GET /health` to every known storage node every 5 s (2 s timeout) and drives a `HEALTHY → SUSPECT → DOWN` state machine. Data-path operations (`put`, `fetchShards`, `bucketExists`, `fetchListFromAnyNode`) consult the cache to **soft-exclude** DOWN nodes, eliminating the per-request 30 s timeout penalty for known-dead peers. The cache is advisory only — quorum math is unchanged, and the path falls back to the full node list when too few healthy nodes remain or when a request to a DOWN-marked node succeeds (opportunistic recovery). + - **Discovery Cache (`NodeHealthTracker` + `NodeHealthProbe`):** Each QP maintains a local cache of storage-node liveness. A background probe issues `GET /health` to every known storage node every 5 s (2 s timeout) and drives a `HEALTHY → SUSPECT → DOWN` state machine. Data-path operations (`put`, `fetchShards`, `bucketExists`, `fetchListFromAnyNode`) consult the cache to **skip** DOWN nodes, eliminating the per-request connect-timeout penalty (today: a 10 s connect timeout on the data-path `HttpClient`) for known-dead peers. The cache is advisory only — quorum math is unchanged: DOWN nodes are skipped unconditionally, and if too few healthy nodes remain to meet quorum the operation fails fast rather than contacting written-off nodes. A DOWN-marked node is restored only by the background probe re-detecting it or by a later data-path request to it succeeding (opportunistic recovery). 2. **Storage Nodes:** - **Stateful Backend:** Receives erasure-coded shards over HTTP and persists them to disk. - **Storage Engine:** Uses **RocksDB** (embedded key-value store) for the OpLog, object metadata (including each object's byte size, recorded on PUT so HEAD, GET, and ListObjectsV2 can report content length without re-reading shards and so multipart completion can validate part sizes), and bucket tables. - **Operation Log:** Maintains a per-partition, sequenced log of mutations (PUT, DELETE, bucket ops) used for repair. - **Repair Worker Pool:** Background workers (`RepairWorkerPool` in `StorageNodeServerV2`) detect missed sequence numbers when a node comes back online and replay them from peers to catch up. + - **Garbage Collector:** A `GarbageCollectorWorker` per node walks the OpLog forward from a persistent `gc_cursor`, using a gossip-derived watermark to reap stale (superseded) versions in RocksDB transactions. A companion `BlobDeletionWorker` physically removes the blob bytes for keys the GC pass has marked obsolete. + - **Multipart Chunk Storage:** Multipart parts are stored as ordinary shards under internal keys prefixed with `__mpu__:` on the same `/store/{partition}/{bucket}/` path; on `CompleteMultipartUpload` the QP emits a `MultipartCommitMessage` whose payload lists the chunks rather than a single blob. 3. **Kafka (Sequencer / Pub-Sub):** - Provides total ordering of mutating operations on a per-partition basis. - - Query Processors publish ordered commit messages (`PutMessage`, `DeleteMessage`, `CreateBucketMessage`, `DeleteBucketMessage`, `MultipartCommitMessage`) to partition-keyed topics. + - Query Processors publish ordered commit messages (`FileCommitMessage`, `MultipartCommitMessage`, `DeleteMessage`, `CreateBucketMessage`, `DeleteBucketMessage` — the full sealed list in [`Message`](../../../common-lib/src/main/java/com/github/koop/common/messages/Message.java)) to partition-keyed topics. - Storage nodes consume those messages and apply the corresponding atomic RocksDB writes. 4. **Coordination Cluster (Metadata & State):** - **Etcd (3-node quorum):** Stores cluster topology, erasure-set configuration, and partition→erasure-set mapping. Acts as the source of truth for routing config; Query Processors and Storage Workers watch the relevant keys for changes. - **Redis:** Used by Query Processors to track multipart upload session state (active sessions, uploaded parts, cached part sizes). A `MemoryCacheClient` is available as an in-process substitute for dev/test. +5. **Nginx Ingress (Kubernetes deployments only):** + - Acts as the public-facing entry point in the k8s deployments, exposed as a `NodePort` on `30080`. Its `upstream` points at the Query Processor `ClusterIP` Service (`query-processor.koopdb.svc.cluster.local:8080`), letting kube-proxy load-balance across the 3 QP pods. Request buffering and `client_max_body_size` are disabled so large object PUTs stream straight through to the QPs. Configured via the inline ConfigMap inside [`k8s/07-nginx.yaml`](../../../k8s/07-nginx.yaml) (which additionally applies a `limit_req_zone ... 10r/s burst=20` rate limit on `location /`) and [`k8s-9/07-nginx.yaml`](../../../k8s-9/07-nginx.yaml) (no rate limit). Not present in the local `docker-compose.yml` topology, where clients hit Query Processors on ports `9001-9003` directly. + ### Data Flow #### 1. PUT Object @@ -37,19 +42,19 @@ The architecture consists of four primary services: The system uses a two-stage write: shards are streamed first, and only after a write quorum acknowledges the shard upload is an ordered commit message published. This is **not** XA-style two-phase commit — there is no prepare/abort vote — but it does separate durable shard placement from the ordered metadata commit. 1. **Client** sends a `PUT /{bucket}/{key}` request to any Query Processor. -2. **QP** receives the data stream and erasure-encodes it into `n` total shards (`m` data + `k = n − m` parity). The defaults depend on deployment: the `docker-compose.yml` `etcd-seeder` block configures `n=6, m=4, k=2, write_quorum=5` (tolerates 2 node failures), while the system-tests cluster and the recommended full deployment use `n=9, m=6, k=3, write_quorum=7` (tolerates 3 node failures). Both configurations are read from `erasure_set_configurations` in Etcd at startup. -3. **QP** looks up the partition's erasure set in Etcd and streams shards to the target storage nodes concurrently over HTTP (`PUT /store/{partition}/{storageKey}`). +2. **QP** receives the data stream and erasure-encodes it into `n` total shards (`m` data + `k = n − m` parity). The defaults depend on deployment: the `docker-compose.yml` `etcd-seeder` block (used by the local dev stack **and** the system-tests cluster, since [`DockerComposeS3E2EIT`](../../../system-tests/src/test/java/koop/DockerComposeS3E2EIT.java) launches `../docker-compose.yml`) configures `n=6, m=4, k=2, write_quorum=5` (tolerates 2 node failures); the larger k8s deployments — [`docker-compose.k8s.yml`](../../../docker-compose.k8s.yml), [`docker-compose.k8s-local.yml`](../../../docker-compose.k8s-local.yml), and [`k8s-9/`](../../../k8s-9/) — use `n=9, m=6, k=3, write_quorum=7` (tolerates 3 node failures). Both configurations are read from `erasure_set_configurations` in Etcd at startup. +3. **QP** looks up the partition's erasure set in Etcd and streams shards to the target storage nodes concurrently over HTTP (`PUT /store/{partition}/{bucket}/{key}`). 4. **QP** waits for at least `write_quorum` shard upload ACKs from the storage nodes. -5. **QP** publishes an ordered `PutMessage` via Kafka to the partition's topic. +5. **QP** publishes an ordered `FileCommitMessage` via Kafka to the partition's topic. 6. **Storage Nodes** consume the sequenced commit message and atomically write OpLog + Metadata in RocksDB. 7. **QP** waits for `write_quorum` commit ACKs and then responds `200 OK` to the client. #### 2. GET Object 1. **Client** sends a `GET /{bucket}/{key}` request. -2. **QP** resolves the erasure set for the key and queries all `n` storage nodes for their shard. -3. **QP** reconciles version conflicts: the version that a read quorum agrees on is selected. If no version reaches quorum the request fails with `500 InternalError`. -4. **QP** reconstructs the original object from any `k` available shards via Reed-Solomon decoding. +2. **QP** resolves the erasure set for the key and queries all `n` storage nodes for shard metadata. +3. **QP** picks the **maximum reported version** across responding nodes. If that version's record is a tombstone the request returns `404 NoSuchKey`; if no shards respond at all the request also returns `404`. +4. **QP** reconstructs the object via Reed-Solomon decoding from the shards at that max version. If fewer than `m` shards are available at the chosen version it falls back to an older version via `reconstructFromOlderVersion`; only an outright reconstruction failure surfaces as `500 InternalError`. 5. **QP** streams the data back to the client. #### 3. DELETE / Bucket Ops @@ -63,8 +68,8 @@ DELETE, CreateBucket, and DeleteBucket follow the same Kafka-sequenced pattern a #### 5. Repair -When a storage node restarts and detects a gap in its sequence numbers, its `RepairWorkerPool` requests the missing operations from peer nodes and replays them, skipping any keys whose post-recovery state already supersedes the replayed op. See [`workflow.md`](workflow.md#12-storage-node-repair-flow) for the full flow. +When a storage node restarts and catches up on Kafka messages that it missed, it compacts messages using the latest version of each key to prevent redundant repairs, and then repairs any missing blobs in the background with a persistent RocksDb queue. ## Diagram Reference -For a visual representation of these components and their interactions, see the [C4 Container Diagram](diagrams/workspace.dsl) and the rendered SVG diagrams in the `docs/diagrams/` folder. +For a visual representation of these components and their interactions, see the [C4 Container Diagram](../workspace.dsl) and the rendered SVG diagrams in the `docs/diagrams/` folder. diff --git a/docs/workflow.md b/docs/diagrams/docs/05-workflow.md similarity index 56% rename from docs/workflow.md rename to docs/diagrams/docs/05-workflow.md index 16c8986c..9045248d 100644 --- a/docs/workflow.md +++ b/docs/diagrams/docs/05-workflow.md @@ -3,13 +3,16 @@ This document displays the **planned target-state** workflow diagrams for each supported use case in the Koop distributed object store. Koop exposes an S3-compatible API; all operations flow through the **Query Processor (QP)** gateway before being fanned out to **Storage Nodes (SN)** via erasure-coded shards. > **Related documents:** -> - [Scope](scope.md) — supported use cases and error cases -> - [Architecture Overview](architecture.md) — system component overview and redundancy model +> - [Scope](02-scope.md) — supported use cases and error cases +> - [Architecture Overview](04-architecture.md) — system component overview and redundancy model > **Erasure configuration shown in the diagrams:** The sequence diagrams below depict the -> recommended full-deployment configuration of `n = 9` total shards (`m = 6` data + `k = 3` -> parity, `write_quorum = 7`), which is what the system-tests cluster uses. The -> `docker-compose.yml` setup runs a smaller cluster (`n = 6, m = 4, k = 2, write_quorum = 5`) +> larger deployment configuration of `n = 9` total shards (`m = 6` data + `k = 3` +> parity, `write_quorum = 7`), which is what the [`k8s-9/`](../../../k8s-9/) deployment and the +> [`docker-compose.k8s*.yml`](../../../docker-compose.k8s.yml) files seed. The default local +> [`docker-compose.yml`](../../../docker-compose.yml) (used by the system-tests E2E suite via +> [`DockerComposeS3E2EIT`](../../../system-tests/src/test/java/koop/DockerComposeS3E2EIT.java)) +> and the smaller [`k8s/`](../../../k8s/) deployment run `n = 6, m = 4, k = 2, write_quorum = 5` > over 6 storage nodes; the flow is identical, only the numbers change. Where this document > writes `≥ m` it means "at least the data-shard count" (reads need `m` shards to reconstruct); > where it writes `≥ write_quorum` it means the shard upload / commit ACK threshold from @@ -40,9 +43,9 @@ This document displays the **planned target-state** workflow diagrams for each s ## System Component Overview -The following diagram shows the high-level components involved in every workflow. Each request enters through the S3-compatible HTTP gateway ([`Main.java`](../query-processor/src/main/java/com/github/koop/queryprocessor/gateway/Main.java)), is processed by the [`StorageWorker`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/StorageWorker.java), and is fanned out to the storage nodes in the target erasure set. Kafka (pub/sub) provides per-partition sequencing for write/delete and multipart commit ordering — commit messages are published to a partition-keyed topic so that all storage nodes in the erasure set apply mutations for a given partition in the same order. Redis (or an in-memory [`MemoryCacheClient`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/cache/MemoryCacheClient.java) in dev/test) is used by the Query Processor for multipart upload session tracking. Storage nodes persist shard data to disk and maintain operation metadata in RocksDB through [`StorageNodeServer`](../storage-node/src/main/java/com/github/koop/storagenode/StorageNodeServer.java), [`StorageNodeV2`](../storage-node/src/main/java/com/github/koop/storagenode/StorageNodeV2.java), and [`Database`](../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java). +The following diagram shows the high-level components involved in every workflow. Each request enters through the S3-compatible HTTP gateway ([`Main.java`](../../../query-processor/src/main/java/com/github/koop/queryprocessor/gateway/Main.java)), is processed by the [`StorageWorker`](../../../query-processor/src/main/java/com/github/koop/queryprocessor/processor/StorageWorker.java), and is fanned out to the storage nodes in the target erasure set. Kafka (pub/sub) provides per-partition sequencing for write/delete and multipart commit ordering — commit messages are published to a partition-keyed topic so that all storage nodes in the erasure set apply mutations for a given partition in the same order. Redis (or an in-memory [`MemoryCacheClient`](../../../query-processor/src/main/java/com/github/koop/queryprocessor/processor/cache/MemoryCacheClient.java) in dev/test) is used by the Query Processor for multipart upload session tracking. Storage nodes persist shard bytes as files on disk and maintain operation metadata in RocksDB through [`StorageNodeServerV2`](../../../storage-node/src/main/java/com/github/koop/storagenode/StorageNodeServerV2.java), [`StorageNodeV2`](../../../storage-node/src/main/java/com/github/koop/storagenode/StorageNodeV2.java), and [`Database`](../../../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java). -![System Container Overview](diagrams/Containers.svg) +![System Container Overview](../Containers.svg) --- @@ -57,24 +60,32 @@ sequenceDiagram participant C as S3 Client participant QP as Query Processor participant Kafka as Kafka / PubSub (Sequencer) - participant SN as Storage Nodes (×n, here n=9) + participant SN as Storage Nodes (×9) participant RDB as RocksDB (each SN) C->>QP: PUT /{bucket}/{key} [data] - QP->>QP: Erasure-encode → n shards (here 9) + QP->>QP: Erasure-encode → 9 shards + par Stream shards concurrently - QP->>SN: PUT /store/{partition}/{storageKey}?requestId=X [shard i] + QP->>SN: PUT /store/{partition}/{key}?id=X [shard i] + and SN-->>QP: 200 shard received end - Note over QP: Wait for shard upload quorum (≥ write_quorum; here ≥7) - QP->>Kafka: Publish ordered PUT commit message - Kafka->>SN: Deliver sequenced PUT for this partition + + Note over QP: Wait for write_quorum (≥ 7) + + QP->>Kafka: Publish ordered PUT commit + Kafka->>SN: Deliver sequenced PUT + par Apply commit on nodes - SN->>RDB: Atomic write OpLog + Metadata (incl. object size) + SN->>RDB: Atomic write OpLog + Metadata + and RDB-->>SN: OK + and SN-->>QP: 200 ACK end - Note over QP: Wait for write-quorum commit ACKs (≥ write_quorum; here ≥7) + + Note over QP: Wait for write_quorum (≥ 7) QP-->>C: 200 OK ``` @@ -98,7 +109,7 @@ flowchart TD The QP queries all `n` storage nodes for their shard. At least `m` shards (the erasure-coding data-shard count) must be available to reconstruct the object. The QP streams the reconstructed data back to the client. -See [`StorageWorker.get()`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/StorageWorker.java:180) and [`StorageNode.retrieve()`](../storage-node/src/main/java/com/github/koop/storagenode/StorageNode.java:171). +See [`StorageWorker.get()`](../../../query-processor/src/main/java/com/github/koop/queryprocessor/processor/StorageWorker.java) and [`StorageNodeV2.retrieve()`](../../../storage-node/src/main/java/com/github/koop/storagenode/StorageNodeV2.java). ```mermaid sequenceDiagram @@ -110,7 +121,7 @@ sequenceDiagram C->>QP: GET /{bucket}/{key} QP->>QP: Hash key → erasure set + partition par Request shards concurrently - QP->>SN: GET /store/{partition}/{storageKey} + QP->>SN: GET /store/{partition}/{bucket}/{key} SN->>Disk: Read shard via "current" version pointer Disk-->>SN: shard bytes SN-->>QP: 200 + shard bytes @@ -126,15 +137,21 @@ sequenceDiagram ### Conflicting Versions -When nodes return different versions of the same key (e.g., a write is mid-commit), the QP returns the version that at least a read quorum of `m` nodes agree on (in the 9-node configuration, that is `6/9`). If no version reaches quorum, the operation fails immediately with `500 InternalError` — the system does **not** wait for stabilization. +When nodes report different versions of the same key (e.g. a write is mid-commit), the QP picks the **maximum reported version** across responders and tries to reconstruct from the shards at that version. If fewer than `m` shards are present at the max version, it falls back to an older version via `reconstructFromOlderVersion` rather than failing. A latest-version tombstone short-circuits to `404 NoSuchKey`; only an outright reconstruction failure surfaces as `500 InternalError`. ```mermaid flowchart TD - A[Collect shard responses] --> B{All nodes agree on version?} - B -- Yes --> C[Erasure-decode + return to client] - B -- No --> D{Does a read quorum ≥ m share the same version?} - D -- Yes --> E[Use quorum version → decode + return] - D -- No --> F[500 Internal Error\nNo quorum version available] + A[Collect shard metadata responses] --> B{Any shards responded?} + B -- No --> N[404 NoSuchKey] + B -- Yes --> C[Pick maxVersion across responders] + C --> T{Is maxVersion a tombstone?} + T -- Yes --> N + T -- No --> Q{≥ m shards at maxVersion?} + Q -- Yes --> D[Reconstruct at maxVersion → return] + Q -- No --> F[reconstructFromOlderVersion: try the next-lower version that has ≥ m shards] + F --> R{Reconstruction succeeded?} + R -- Yes --> D + R -- No --> X[500 InternalError] ``` --- @@ -150,23 +167,27 @@ sequenceDiagram participant C as S3 Client participant QP as Query Processor participant Kafka as Kafka / PubSub (Sequencer) - participant SN as Storage Nodes (×n, here n=9) + participant SN as Storage Nodes (×n, n=9) participant RDB as RocksDB (each SN) C->>QP: DELETE /{bucket}/{key} QP->>Kafka: Publish ordered DELETE commit message Kafka->>SN: Deliver sequenced DELETE for this partition - par Apply tombstone commit on nodes + + par Apply tombstone commit SN->>RDB: Atomic write tombstone in Metadata + OpLog + and RDB-->>SN: OK + and SN-->>QP: 200 ACK end - Note over QP: Wait for write_quorum commit ACKs (here ≥7) + + Note over QP: Wait for write_quorum (≥ 7) QP-->>C: 204 No Content - Note over SN: Physical shard file deletion is decoupled: the commit marks a - Note over SN: tombstone in metadata + enqueues shards in the pending_deletes - Note over SN: column family; the BlobDeletionWorker drains it asynchronously. + Note over SN: Asynchronous cleanup: Tombstone marks intent; + Note over SN: BlobDeletionWorker drains pending_deletes + Note over SN: column family. ``` --- @@ -175,7 +196,7 @@ sequenceDiagram **Route:** `PUT /{bucket}` -Bucket creation is sequenced through Kafka/pub/sub. Storage nodes store the bucket record in their RocksDB bucket table (via [`Database.createBucket()`](../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java:81) and [`StorageNodeV2.createBucket()`](../storage-node/src/main/java/com/github/koop/storagenode/StorageNodeV2.java:248)). A write quorum must acknowledge before the client is notified. +Bucket creation is sequenced through Kafka/pub/sub. Storage nodes store the bucket record in their RocksDB bucket table (via [`Database.createBucket()`](../../../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java) and [`StorageNodeV2.createBucket()`](../../../storage-node/src/main/java/com/github/koop/storagenode/StorageNodeV2.java)). A write quorum must acknowledge before the client is notified. ```mermaid sequenceDiagram @@ -205,7 +226,7 @@ sequenceDiagram **Route:** `DELETE /{bucket}` -Bucket deletion publishes a `DeleteBucketMessage` via pub/sub. Storage nodes write a tombstone to the RocksDB bucket table (via [`Database.deleteBucket()`](../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java:91)). The bucket record is logically deleted; any remaining objects in the bucket are not immediately purged. +Bucket deletion publishes a `DeleteBucketMessage` via pub/sub. Storage nodes write a tombstone to the RocksDB bucket table (via [`Database.deleteBucket()`](../../../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java)). The bucket record is logically deleted; any remaining objects in the bucket are not immediately purged. ```mermaid sequenceDiagram @@ -256,9 +277,9 @@ sequenceDiagram **Route:** `GET /{bucket}?prefix=...&max-keys=...` -The QP streams metadata from all storage nodes using a prefix range scan on the RocksDB metadata table. Because objects in the same bucket may be spread across different erasure sets (based on key hashing), all nodes must be queried. Conflicting versions are resolved using the same read-quorum semantics as GET Object. +The QP fans out per partition, but for each partition it queries one healthy storage node at a time and uses the **first successful response** ([`StorageWorker.fetchListFromAnyNode`](../../../query-processor/src/main/java/com/github/koop/queryprocessor/processor/StorageWorker.java)). No per-item quorum check is performed; because every mutation is Kafka-sequenced and each storage node converges on the same per-partition log, a caught-up node gives the same answer. The eventual-consistency window during a divergence is therefore bounded by Kafka consumer lag rather than reconciled per-item across replicas. Objects in the same bucket may be spread across different erasure sets (based on partitioning), so every owning partition is contacted. -See [`Database.listItemsInBucket()`](../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java:111) and [`StorageNodeV2.listItemsInBucket()`](../storage-node/src/main/java/com/github/koop/storagenode/StorageNodeV2.java:295). +See [`Database.listItemsInBucket()`](../../../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java) and [`StorageNodeV2.listItemsInBucket()`](../../../storage-node/src/main/java/com/github/koop/storagenode/StorageNodeV2.java). ```mermaid sequenceDiagram @@ -268,14 +289,13 @@ sequenceDiagram participant RDB as RocksDB (each SN) C->>QP: GET /{bucket}?prefix=animals/ - par Stream metadata from all nodes - QP->>SN: Stream metadata with prefix "animals/" + loop For each partition owning the bucket + QP->>SN: Try the next healthy node (skip DOWN nodes) SN->>RDB: Range scan metadata table (sorted by key) RDB-->>SN: Metadata entries - SN-->>QP: Stream of ObjectSummary records + SN-->>QP: ObjectSummary records (or error → try next node) end - QP->>QP: Merge + deduplicate results - Note over QP: Conflicting versions → apply read-quorum semantics + QP->>QP: Merge per-partition results QP-->>C: 200 OK [XML ListBucketResult] ``` @@ -287,7 +307,7 @@ sequenceDiagram Initiates a multipart upload session. The QP generates a unique `uploadId` and stores the session state in the cache (Redis in production, in-memory for dev/test). No data is written to storage nodes at this stage. -See [`MultipartUploadManager.initiateMultipartUpload()`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/MultipartUploadManager.java:35). +See [`MultipartUploadManager.initiateMultipartUpload()`](../../../query-processor/src/main/java/com/github/koop/queryprocessor/processor/MultipartUploadManager.java). ```mermaid sequenceDiagram @@ -309,32 +329,58 @@ sequenceDiagram Each part is stored as an independent erasure-coded object on the storage nodes using a derived key. The cache is updated only **after** the storage nodes confirm the shard write, ensuring the client is not ACKed until the part is durably stored. The part's byte size is also cached for use during completion. -See [`MultipartUploadManager.uploadPart()`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/MultipartUploadManager.java:50). +See [`MultipartUploadManager.uploadPart()`](../../../query-processor/src/main/java/com/github/koop/queryprocessor/processor/MultipartUploadManager.java). ```mermaid sequenceDiagram participant C as S3 Client participant QP as Query Processor - participant Cache as Cache (Redis / MemoryCache) - participant SW as StorageWorker - participant SN as Storage Nodes (×n, here n=9) - - C->>QP: PUT /{bucket}/{key}?partNumber=N&uploadId=X [part data] - QP->>Cache: Check mpu:session:{uploadId} exists + status=ACTIVE - alt Session not found or not ACTIVE - QP-->>C: 404 Not Found / 409 Conflict + participant Cache as Cache (Redis) + participant SW as Storage Worker + participant SN as Storage Nodes + + C->>QP: PUT /{bucket}/{key}?partNumber=N&uploadId=X + + QP->>Cache: Check mpu:session:{uploadId} is ACTIVE + alt Session missing/aborted + QP-->>C: 404/409 Error end - QP->>Cache: Check mpu:parts:{uploadId} for partNumber N - alt Part already uploaded - QP-->>C: 409 Conflict (Part already uploaded) + + Note over QP,Cache: 1. Atomically reserve part number + QP->>Cache: setAddIfAbsent(mpu:parts:{uploadId}, partNumber) + + alt Part already reserved + QP-->>C: 409 Conflict + else Reservation Success + + QP->>SW: put(partStorageKey, length, data) + + Note over SW,SN: SW handles erasure-coding, concurrent
streaming, and distributed commit. + SW->>SN: Stream erasure-coded shards + SN-->>SW: Quorum ACKs received + + alt Storage Failure + SW-->>QP: False / Exception + QP->>Cache: setRemove(mpu:parts:{uploadId}, partNumber) + QP-->>C: 500/503 Storage Failure + else Storage Success + SW-->>QP: True + + Note over QP,Cache: 2. Verify session wasn't aborted during I/O + QP->>Cache: Check session still ACTIVE + + alt Session aborted during upload + QP->>SW: delete(partStorageKey) + SW->>SN: Delete orphaned shards + QP->>Cache: setRemove(mpu:parts:{uploadId}, partNumber) + QP-->>C: 409 Conflict + else Session Still Active + Note over QP,Cache: 3. Finalize part metadata + QP->>Cache: Store part size (mpu:partsize:{uploadId}:{N}) + QP-->>C: 200 OK + end + end end - QP->>SW: put(requestId, bucket, partStorageKey, length, data) - SW->>SN: Stream erasure-coded shards concurrently - SN-->>SW: ACKs (≥ write_quorum required; here ≥7) - SW-->>QP: Success - QP->>Cache: Add partNumber N → mpu:parts:{uploadId} - QP->>Cache: Store part size → mpu:partsize:{uploadId}:{N} - QP-->>C: 200 OK ``` --- @@ -343,9 +389,9 @@ sequenceDiagram **Route:** `POST /{bucket}/{key}?uploadId=X` -The QP verifies all declared parts are present in the cache and that cached sizes are valid, then transitions the session to `COMPLETING` and publishes a [`MultipartCommitMessage`](../common-lib/src/main/java/com/github/koop/common/messages/Message.java:97) via pub/sub to the partition-keyed topic. The message carries the ordered list of part numbers. **Parts are not concatenated or re-uploaded** — they remain as individual erasure-coded shards in storage; reconstruction happens on read. The cache session and part metadata are cleaned up after successful publish. +The QP verifies all declared parts are present in the cache and that cached sizes are valid, then transitions the session to `COMPLETING` and publishes a [`MultipartCommitMessage`](../../../common-lib/src/main/java/com/github/koop/common/messages/Message.java) via pub/sub to the partition-keyed topic. The message carries the ordered list of part numbers. **Parts are not concatenated or re-uploaded** — they remain as individual erasure-coded shards in storage; reconstruction happens on read. The cache session and part metadata are cleaned up after successful publish. -See [`MultipartUploadManager.completeMultipartUpload()`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/MultipartUploadManager.java:104). +See [`MultipartUploadManager.completeMultipartUpload()`](../../../query-processor/src/main/java/com/github/koop/queryprocessor/processor/MultipartUploadManager.java). ```mermaid sequenceDiagram @@ -406,7 +452,7 @@ sequenceDiagram QP->>Cache: Read mpu:parts:{uploadId} loop For each uploaded part (asynchronous, best-effort) QP->>SW: delete(requestId, bucket, partStorageKey) - SW->>SN: DELETE /store/{partition}/{storageKey} + SW->>SN: DELETE /store/{partition}/{bucket}/{key} SN-->>SW: ACK (errors ignored, cleanup continues) QP->>Cache: Delete part size entry for this part end @@ -470,9 +516,9 @@ sequenceDiagram ## 14. Node Discovery / Health Probing -Each Query Processor maintains a local [`NodeHealthTracker`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/NodeHealthTracker.java) that classifies every known storage node as `HEALTHY`, `SUSPECT`, or `DOWN`. A background [`NodeHealthProbe`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/NodeHealthProbe.java) refreshes the cache every 5 s by issuing `GET /health` to every machine in the current `ErasureSetConfiguration`. Data-path operations (`put`, `fetchShards`, `bucketExists`, `fetchListFromAnyNode` in [`StorageWorker`](../query-processor/src/main/java/com/github/koop/queryprocessor/processor/StorageWorker.java)) consult the tracker to skip nodes known to be down — eliminating the 30 s per-request timeout penalty for dead peers — while keeping the quorum denominator unchanged. +Each Query Processor maintains a local [`NodeHealthTracker`](../../../query-processor/src/main/java/com/github/koop/queryprocessor/processor/NodeHealthTracker.java) that classifies every known storage node as `HEALTHY`, `SUSPECT`, or `DOWN`. A background [`NodeHealthProbe`](../../../query-processor/src/main/java/com/github/koop/queryprocessor/processor/NodeHealthProbe.java) refreshes the cache every 5 s by issuing `GET /health` to every machine in the current `ErasureSetConfiguration`. Data-path operations (`put`, `fetchShards`, `bucketExists`, `fetchListFromAnyNode` in [`StorageWorker`](../../../query-processor/src/main/java/com/github/koop/queryprocessor/processor/StorageWorker.java)) consult the tracker to skip nodes known to be down — avoiding the full per-request HTTP timeout (today: a 10 s connect timeout on the data-path `HttpClient`) for each dead peer — while keeping the quorum denominator unchanged. -Soft-exclusion semantics: if too few healthy nodes remain to make progress (e.g. fewer than `m` for reads or fewer than `writeQuorum` for writes), the path falls back to contacting every node so a recovered-but-still-marked-DOWN node has a chance to succeed. Any successful response to a DOWN-marked node immediately promotes it back to HEALTHY. +Exclusion semantics: DOWN nodes are skipped unconditionally, but the tracker is advisory only — it never reduces quorum requirements. If too few healthy nodes remain to make progress (e.g. fewer than `m` for reads or fewer than `writeQuorum` for writes), the operation fails fast rather than contacting nodes the tracker has written off. A DOWN-marked node returns to service only when the background probe re-detects it, or when a later data-path request to it succeeds — any successful response immediately promotes it back to HEALTHY (opportunistic recovery). ```mermaid flowchart LR @@ -493,8 +539,8 @@ flowchart LR Probe -- HTTP GET /health
2s timeout --> SN2 Probe -- HTTP GET /health
2s timeout --> SN3 - Worker -- shard / bucket /
list HTTP calls
(DOWN nodes skipped
if enough healthy) --> SN1 - Worker -- skipped while DOWN
(fallback contacts
everyone) --> SN2 + Worker -- shard / bucket /
list HTTP calls
(healthy nodes only) --> SN1 + Worker -- skipped while DOWN
(no fallback —
fail fast if quorum lost) --> SN2 Worker --> SN3 ``` @@ -512,12 +558,18 @@ The QP `/health` endpoint surfaces the live counts (`API Gateway is healthy! / S ## RocksDB Table Reference -The [`Database`](../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java) facade and [`StorageNodeV2`](../storage-node/src/main/java/com/github/koop/storagenode/StorageNodeV2.java) define three RocksDB tables written atomically on every PUT/DELETE/bucket operation in the planned flow. +The [`Database`](../../../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java) facade is backed by [`RocksDbStorageStrategy`](../../../storage-node/src/main/java/com/github/koop/storagenode/db/RocksDbStorageStrategy.java), which declares **seven** RocksDB column families. The first three are written atomically on every PUT/DELETE/bucket operation; the remaining four support repair, GC, and asynchronous blob deletion. -| Table | Key | Value | Purpose | +| Column family | Key | Value | Purpose | |---|---|---|---| -| **OpLog** | Sequence Number | `(key, operation)` | Ordered log of all mutations; enables repair | -| **Metadata** | Object Key | `(partition, seq, location)` | Latest shard location per object key; supports regular, multipart, and tombstone versions | -| **Buckets** | Bucket Name | `(partition, seq, deleted)` | Bucket existence and tombstone tracking | +| **`op_log`** | Sequence Number | `(key, operation)` | Ordered per-partition log of all mutations; enables repair | +| **`metadata`** | Object Key | `(partition, seq, location)` | Latest shard location per object key; supports regular, multipart, and tombstone versions | +| **`buckets`** | Bucket Name | `(partition, seq, deleted)` | Bucket existence and tombstone tracking | +| **`uncommitted_writes`** | Request ID | Per-shard staging entry | Tracks shards that have arrived from the QP but whose Kafka commit has not yet been observed | +| **`repair_queue`** | Sequence Number | Pending repair task | Persistent queue consumed by [`RepairWorkerPool`](../../../storage-node/src/main/java/com/github/koop/storagenode/RepairWorkerPool.java) to catch up after a node restart | +| **`pending_deletes`** | Blob Key | Tombstone marker | Obsolete shard paths enqueued on DELETE / GC; drained asynchronously by [`BlobDeletionWorker`](../../../storage-node/src/main/java/com/github/koop/storagenode/gc/BlobDeletionWorker.java) | +| **`gc_cursors`** | Partition | Last-scanned seqNum | Persistent cursor used by [`GarbageCollectorWorker`](../../../storage-node/src/main/java/com/github/koop/storagenode/gc/GarbageCollectorWorker.java) so each GC pass examines only new oplog entries | + +The actual shard bytes are **not** stored in RocksDB — they are written as files on the filesystem under `${STORAGE_DIR}` via `java.nio.channels.FileChannel`. The RocksDB instance itself lives at `${STORAGE_DIR}/rocksdb`. -See [`Database.putItem()`](../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java:24), [`Database.deleteItem()`](../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java:45), and [`Database.createBucket()`](../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java:81) for the atomic write implementations. +See [`Database.putItem()`](../../../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java), [`Database.deleteItem()`](../../../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java), and [`Database.createBucket()`](../../../storage-node/src/main/java/com/github/koop/storagenode/db/Database.java) for the atomic write implementations. diff --git a/docs/diagrams/docs/06-technologies.md b/docs/diagrams/docs/06-technologies.md new file mode 100644 index 00000000..6aec70de --- /dev/null +++ b/docs/diagrams/docs/06-technologies.md @@ -0,0 +1,40 @@ +# Technologies & Dependencies + +KoopDB leverages a modern Java stack focused on high-performance concurrency and distributed systems primitives. + +## Core Language +- **Java 21 (LTS):** Utilizing the latest language features, particularly **Virtual Threads (Project Loom)** for handling massive concurrency in the Query Processors and Storage Nodes without reactive complexity. + +## Key Libraries + +### Web Framework +- **Javalin (7.0.1):** A lightweight, unopinionated web framework for Java/Kotlin. + - Used for the REST API (S3 routes) on Query Processors. + - Used for internal node-to-node communication APIs on Storage Nodes. + - Configured to use Virtual Threads for optimal throughput. + +### Storage Engine +- **RocksDB (8.10.0):** An embeddable persistent key-value store, used by Storage Nodes for **metadata only**. Column families: `op_log`, `metadata`, `buckets`, `uncommitted_writes`, `repair_queue`, `pending_deletes`, `gc_cursors` (see [`RocksDbStorageStrategy`](../../../storage-node/src/main/java/com/github/koop/storagenode/db/RocksDbStorageStrategy.java)). The RocksDB instance lives under `${STORAGE_DIR}/rocksdb`. +- **Filesystem (FileChannel):** The actual shard bytes are written as plain files on disk via `java.nio.channels.FileChannel`, under directories sibling to the RocksDB instance inside `${STORAGE_DIR}`. + +### Erasure Coding +- **Backblaze JavaReedSolomon:** Reed-Solomon encoder/decoder used by [`ErasureCoder`](../../../common-lib/src/main/java/com/github/koop/common/erasure/ErasureCoder.java) on both the encode (PUT) and decode (GET) paths. Dependency `com.github.Backblaze:JavaReedSolomon` in [`common-lib/pom.xml`](../../../common-lib/pom.xml). + +### Serialization & Logging +- **Jackson (2.17.0):** JSON serialization/deserialization for API responses and internal messages. +- **Log4j 2 (2.23.1):** Asynchronous logging framework. + +### Distributed Coordination +- **Etcd (via Jetcd 0.8.6):** Distributed key-value store holding cluster topology, erasure-set configuration, and partition→erasure-set mapping. Run as a 3-node quorum. The QP and Storage Nodes register Etcd watches so config changes propagate without a restart. +- **Redis (via Jedis 5.1.0):** In-memory store used by Query Processors to track multipart upload session state (active sessions, uploaded part numbers, cached part sizes). Not used for ordering. +- **Kafka (kafka-clients 3.7.0):** Per-partition sequencer and pub/sub bus for ordered commit messages (`FileCommitMessage`, `MultipartCommitMessage`, `DeleteMessage`, `CreateBucketMessage`, `DeleteBucketMessage`). Provides total ordering of mutations within a partition; "last write" is then determined by the sequence numbers persisted in each storage node's RocksDB metadata table. Reads pick the max version reported across storage nodes and reconstruct from ≥ `m` shards (falling back to an older version if not enough shards are present). + +### Testing & Verification +- **JUnit 5 (5.10.2):** Unit and integration testing. +- **AWS SDK for Java v2 (`software.amazon.awssdk:s3`):** Used in system tests to drive the cluster through a real S3 client and verify S3-compatible behavior. The async client (`S3AsyncClient`) plus [`s3-transfer-manager 2.41.34`](../../../system-tests/pom.xml) exercise the high-level `uploadFile` / `downloadFile` flows, which internally chunk large objects into multipart uploads. +- **Testcontainers:** Used by [`DockerComposeS3E2EIT`](../../../system-tests/src/test/java/koop/DockerComposeS3E2EIT.java) (via `ComposeContainer`) to bring up the actual `docker-compose.yml` stack for true end-to-end tests. +- **In-process integration cluster:** Suites like [`RealStorageNodesIT`](../../../system-tests/src/test/java/koop/RealStorageNodesIT.java) spin up 9 `StorageNodeServerV2` instances inside a single JVM and substitute the external dependencies with in-memory fakes — `MemoryFetcher` for Etcd and `MemoryPubSub` for Kafka — so quorum, version, and repair logic can be exercised without Docker. + +## Infrastructure +- **Docker & Docker Compose:** Containerization of all components for consistent development and deployment environments. +- **Maven:** Build automation and dependency management. diff --git a/docs/diagrams/docs/07-installation.md b/docs/diagrams/docs/07-installation.md new file mode 100644 index 00000000..80b9499d --- /dev/null +++ b/docs/diagrams/docs/07-installation.md @@ -0,0 +1,176 @@ +# Installation and Setup + +KoopDB ships two deployment flavors: + +- **Docker Compose** — single-host local stack (default for development and the `system-tests` suite). +- **Kubernetes (k3s)** — multi-node cluster deployment, available in two sizings under [`k8s/`](../../../k8s/) and [`k8s-9/`](../../../k8s-9/). + +Both orchestrate the same services (Query Processors, Storage Nodes, Etcd, Kafka, Redis), so the env-var and erasure-config sections below apply to either. + +## Prerequisites + +- **Java 21 JDK** (for building the JARs) +- **Maven 3.8+** +- **Docker** and **Docker Compose** +- **AWS CLI** (optional, for interacting with the running cluster) + +## Building the Project + +1. Clone the repository: + ```bash + git clone https://github.com/S26-Distributed-Capstone/Koop.git + cd Koop + ``` + +2. Compile and package the project using Maven: + ```bash + mvn clean package -DskipTests + ``` + This will generate the necessary JAR files in `query-processor/target/` and `storage-node/target/`. + +## Running the Cluster + +1. Start the cluster using Docker Compose: + ```bash + docker-compose up --build + ``` + This command will build the Docker images for the query processor and storage node services and launch: + - **6 Storage Nodes** (replicas) on ports `8001-8006` internally mapped to `8080`. + - **3 Query Processors** (replicas) on ports `9001-9003` internally mapped to `8080`. + - **Etcd Cluster** (3 nodes, `etcd1`, `etcd2`, `etcd3`) for metadata management. + - **Kafka** (single broker, KRaft mode, port `9092`) for per-partition commit-message ordering. + - **Redis** instance for multipart upload session state. + - **etcd-seeder** (one-shot) which writes the initial `erasure_set_configurations` and `partition_spread_configurations` keys into Etcd. + +2. Verify the services are running: + ```bash + docker-compose ps + ``` + You should see `storage-node-1..6`, `query-processor-1..3`, `etcd1/etcd2/etcd3`, `kafka`, and `redis-master` in the `Up` state (plus a short-lived `etcd-seeder` that exits after seeding). + +## Service Environment Variables + +If you run services outside Docker Compose, you must supply these environment variables. The lists below match what the code actually reads (`System.getenv` callsites) and what the in-tree compose / k8s manifests set. + +### Storage Node +| Variable | Description | +| --- | --- | +| `APP_PORT` | HTTP listen port (default `8080`) | +| `STORAGE_DIR` | On-disk persistence directory (the k8s manifests mount `/storage`) | +| `ETCD_URL` | Etcd endpoint (e.g. `http://etcd1:2379`) | +| `KAFKA_BOOTSTRAP_SERVERS` | Kafka bootstrap servers (e.g. `kafka:9092`) | +| `NODE_IP` | This node's identity / advertised hostname | +| `LOG_LEVEL` | log4j level (optional; both manifests set `DEBUG`) | + +### Query Processor +| Variable | Description | +| --- | --- | +| `ETCD_URL` | Etcd endpoint | +| `REDIS_URL` | Redis URL for multipart session state (default `redis://localhost:6379`) | +| `KAFKA_BOOTSTRAP_SERVERS` | Kafka bootstrap servers | +| `NODE_IP` | This QP's identity for logging/metrics | +| `LOG_LEVEL` | log4j level (optional; both manifests set `DEBUG`) | + +> The QP HTTP port is hardcoded to `8080`; `APP_PORT` and `STORAGE_NODE_URL` are still set in some compose files but no longer read by the QP and should be considered vestigial. + +## Erasure Configuration + +The erasure-set parameters are seeded into Etcd at startup. Two sizings ship in-tree: + +| Deployment | Erasure config | Tolerance | +| --- | --- | --- | +| [`docker-compose.yml`](../../../docker-compose.yml) (local dev + system-tests) | n=6, m=4, write_quorum=5 (1 erasure set, 6 storage-node containers) | 2 node failures | +| [`docker-compose-local.yml`](../../../docker-compose-local.yml) | n=6, m=4, write_quorum=5 | 2 node failures | +| [`k8s/`](../../../k8s/) (small k3s deployment) | n=6, m=4, write_quorum=5 (2 erasure sets × 6 pods) | 2 node failures | +| [`docker-compose.k8s.yml`](../../../docker-compose.k8s.yml) / [`docker-compose.k8s-local.yml`](../../../docker-compose.k8s-local.yml) | n=9, m=6, write_quorum=7 | 3 node failures | +| [`k8s-9/`](../../../k8s-9/) (large k3s deployment) | n=9, m=6, write_quorum=7 (2 erasure sets × 9 pods) | 3 node failures | + +The flow is identical across all of them; only the seeded numbers and the number of storage-node replicas differ. The seeded etcd keys are: + +- `erasure_set_configurations` — list of erasure sets, each with `n` (total shards), `m` (data shards), `write_quorum`, and the list of `(ip, port)` machines. The parity-shard count is computed as `k = n − m`. +- `partition_spread_configurations` — mapping of partitions to erasure sets. + +**How to change the topology:** + +- *Docker Compose:* edit the `etcd-seeder` `command` block in the relevant `docker-compose*.yml` and restart the cluster. +- *Kubernetes:* edit the JSON files under [`k8s/etcd-data/`](../../../k8s/etcd-data/) (or [`k8s-9/etcd-data/`](../../../k8s-9/etcd-data/)). [`deploy.sh`](../../../k8s/deploy.sh) mounts them as the `etcd-seeder-config` ConfigMap consumed by [`05-etcd-seeder.yaml`](../../../k8s/05-etcd-seeder.yaml). + +## Stopping the Cluster + +To stop the cluster and remove containers/networks: +```bash +docker-compose down +``` + +To remove volumes (persisted data) as well: +```bash +docker-compose down -v +``` + +## Kubernetes Deployment + +The [`k8s/`](../../../k8s/) (6-node erasure sets) and [`k8s-9/`](../../../k8s-9/) (9-node erasure sets) directories contain a self-contained k3s deployment: manifests + helper scripts. Both target a remote control-plane node configured in [`config.sh`](../../../k8s-9/config.sh) (`SERVER`, `DOCKERHUB_USER`, `SSH_PASS`) and apply everything into a `koopdb` namespace. + +### Manifest layout + +| File | Purpose | +| --- | --- | +| `00-namespace.yaml` | Creates the `koopdb` namespace | +| `01-etcd.yaml` | 3-replica etcd StatefulSet (`etcd-headless` + `etcd` services) | +| `02-kafka.yaml` | Kafka broker (KRaft mode) | +| `03-redis.yaml` | Redis StatefulSet (multipart session state) | +| `04-storage-nodes.yaml` | Two `storage-node-setN` StatefulSets with pod anti-affinity per erasure set | +| `05-etcd-seeder.yaml` | One-shot Job that writes `erasure_set_configurations` + `partition_spread_configurations` from the `etcd-seeder-config` ConfigMap | +| `06-query-processor.yaml` | 3-replica QP Deployment + `ClusterIP` Service | +| `07-nginx.yaml` | 2-replica nginx fronting the QP service, exposed as `NodePort` on `30080` | + +Nodes are pinned with the `koopdb/role` label: `cp` for control-plane (etcd, seeder) and `worker` for data-plane (storage nodes, QPs, nginx, kafka, redis). [`label-nodes.sh`](../../../k8s-9/label-nodes.sh) holds the IP-to-role mapping for the target cluster. + +### Scripts + +| Script | What it does | +| --- | --- | +| [`build.sh`](../../../k8s-9/build.sh) | Builds `${DOCKERHUB_USER}/query-processor:latest` and `${DOCKERHUB_USER}/storage-node:latest` and pushes both to Docker Hub | +| [`label-nodes.sh`](../../../k8s-9/label-nodes.sh) | Labels k3s nodes `koopdb/role=cp|worker` | +| [`deploy.sh`](../../../k8s-9/deploy.sh) | SCPs the manifests, generates the `etcd-seeder-config` ConfigMap from `etcd-data/*.json`, patches the image references, and `kubectl apply`s the manifests in sorted order | +| [`status.sh`](../../../k8s-9/status.sh) | Lists pods / services / StatefulSets / Deployments in the namespace | +| [`logs.sh`](../../../k8s-9/logs.sh), [`all_logs.sh`](../../../k8s-9/all_logs.sh) | Tail pod logs | +| [`teardown.sh`](../../../k8s-9/teardown.sh) | `kubectl delete namespace koopdb` | + +### Typical workflow + +```bash +# From the repo root, against the cluster configured in k8s-9/config.sh +k8s-9/build.sh # build + push images +k8s-9/label-nodes.sh # one-time per fresh k3s cluster +k8s-9/deploy.sh # apply manifests +k8s-9/status.sh # check pods are Ready +``` + +### Accessing the cluster + +The nginx ingress exposes the QPs on `NodePort 30080`: + +```bash +curl http://:30080/health +aws --endpoint-url=http://:30080 s3 ls +``` + +## Accessing the API + +The Query Processors expose an S3-compatible API on ports `9001`, `9002`, and `9003`. You can point any S3 client to these endpoints. + +**Example Health Check:** +```bash +curl http://localhost:9001/health +# Response: "API Gateway is healthy!" +``` + +**AWS CLI Configuration:** +To use the AWS CLI with KoopDB, configure a profile or pass endpoints explicitly: +```bash +aws --endpoint-url=http://localhost:9001 s3 mb s3://my-bucket +aws --endpoint-url=http://localhost:9001 s3 cp test-file.txt s3://my-bucket/ +aws --endpoint-url=http://localhost:9001 s3 ls s3://my-bucket/ +``` +Note: Authentication is currently disabled, so any credentials will suffice. diff --git a/docs/api.md b/docs/diagrams/docs/08-api.md similarity index 79% rename from docs/api.md rename to docs/diagrams/docs/08-api.md index 968a2a8e..865d8526 100644 --- a/docs/api.md +++ b/docs/diagrams/docs/08-api.md @@ -2,7 +2,7 @@ KoopDB exposes a subset of the Amazon S3 API. Authentication is intentionally out of scope for this project; all requests are accepted anonymously and any credentials supplied by an S3 client are ignored. Use the normal S3 API to interact. -> **Note on `ETag`:** ETags are not relevant for KoopDB — the gateway does not generate or return ETag headers, and the XML body of `CompleteMultipartUpload` is parsed for `` only; any `` values supplied by the client are ignored. +> **Note on `ETag`:** ETags are not meaningful in KoopDB — the gateway returns `ETag` headers populated with placeholder values (random or dummy), not real content hashes, so they should not be used for integrity checks. The XML body of `CompleteMultipartUpload` is parsed for `` only; any `` values supplied by the client are ignored. ## Base URL The API is available via the Query Processor services, typically exposed on ports `9001-9003` in a default Docker Compose setup. @@ -19,6 +19,7 @@ The API is available via the Query Processor services, typically exposed on port ### Bucket Operations | Method | Path | Description | S3 Operation | | :--- | :--- | :--- | :--- | +| `GET` | `/` | Lists all buckets. Returns a `ListAllMyBucketsResult` XML payload. | `ListBuckets` | | `PUT` | `/{bucket}` | Creates a new bucket. | `CreateBucket` | | `DELETE` | `/{bucket}` | Deletes an empty bucket. | `DeleteBucket` | | `GET` | `/{bucket}` | Lists objects in a bucket (v2). Supports `prefix` and `max-keys` query params. | `ListObjectsV2` | @@ -31,10 +32,11 @@ The API is available via the Query Processor services, typically exposed on port | :--- | :--- | :--- | :--- | | `PUT` | `/{bucket}/{key}` | Uploads an object. Max size defaults to 100MB. | `PutObject` | | `GET` | `/{bucket}/{key}` | Retrieves an object. Returns binary stream. | `GetObject` | +| `HEAD` | `/{bucket}/{key}` | Returns object size and `Last-Modified` without the body. | `HeadObject` | | `DELETE` | `/{bucket}/{key}` | Deletes an object. | `DeleteObject` | ### Multipart Upload Operations -| Method | Path | Queue Parameters | Description | S3 Operation | +| Method | Path | Query Parameters | Description | S3 Operation | | :--- | :--- | :--- | :--- | :--- | | `POST` | `/{bucket}/{key}` | `?uploads` | Initiates a new multipart upload. Returns an XML payload with `UploadId`. | `CreateMultipartUpload` | | `PUT` | `/{bucket}/{key}` | `?partNumber=N&uploadId=X` | Uploads a specific part for an upload ID. | `UploadPart` | @@ -74,6 +76,8 @@ The API returns standard S3 XML error responses when applicable. **Common Error Codes:** - `NoSuchBucket`: The specified bucket does not exist. - `NoSuchKey`: The specified key does not exist. -- `EntityTooLarge`: The object exceeded maximum allowed size. +- `NoSuchUpload`: The specified multipart `uploadId` does not exist. +- `InvalidPart`: A part referenced in `CompleteMultipartUpload` is missing or invalid. +- `InvalidRequest`: The request is malformed (e.g. missing `Content-Length` on `PUT`, or `POST` without `?uploads`/`?uploadId`). - `InternalError`: Unexpected server-side failure. - `NotImplemented`: The requested S3 operation is not supported by Koop. diff --git a/docs/testing.md b/docs/diagrams/docs/09-testing.md similarity index 100% rename from docs/testing.md rename to docs/diagrams/docs/09-testing.md diff --git a/docs/diagrams/workspace.dsl b/docs/diagrams/workspace.dsl index 385bc0b3..4b8f0455 100644 --- a/docs/diagrams/workspace.dsl +++ b/docs/diagrams/workspace.dsl @@ -1,5 +1,7 @@ workspace "Distributed Object Store" "An S3-compatible distributed object store using erasure encoding for redundancy." { + !docs docs + model { # ── External Actors ────────────────────────────────────────────── diff --git a/docs/installation.md b/docs/installation.md deleted file mode 100644 index f5c76ea1..00000000 --- a/docs/installation.md +++ /dev/null @@ -1,106 +0,0 @@ -# Installation and Setup - -KoopDB uses Docker Compose for orchestrating its services (Query Processors, Storage Nodes, Etcd, Redis). - -## Prerequisites - -- **Java 21 JDK** (for building the JARs) -- **Maven 3.8+** -- **Docker** and **Docker Compose** -- **AWS CLI** (optional, for interacting with the running cluster) - -## Building the Project - -1. Clone the repository: - ```bash - git clone https://github.com/S26-Distributed-Capstone/Koop.git - cd Koop - ``` - -2. Compile and package the project using Maven: - ```bash - mvn clean package -DskipTests - ``` - This will generate the necessary JAR files in `query-processor/target/` and `storage-node/target/`. - -## Running the Cluster - -1. Start the cluster using Docker Compose: - ```bash - docker-compose up --build - ``` - This command will build the Docker images for the query processor and storage node services and launch: - - **6 Storage Nodes** (replicas) on ports `8001-8006` internally mapped to `8080`. - - **3 Query Processors** (replicas) on ports `9001-9003` internally mapped to `8080`. - - **Etcd Cluster** (3 nodes, `etcd1`, `etcd2`, `etcd3`) for metadata management. - - **Kafka** (single broker, KRaft mode, port `9092`) for per-partition commit-message ordering. - - **Redis** instance for multipart upload session state. - - **etcd-seeder** (one-shot) which writes the initial `erasure_set_configurations` and `partition_spread_configurations` keys into Etcd. - -2. Verify the services are running: - ```bash - docker-compose ps - ``` - You should see containers for `storage-node`, `query-processor`, `etcd`, and `redis-master` in the `Up` state. - -## Service Environment Variables - -If you run services outside Docker Compose, you must supply these environment variables. - -### Storage Node -| Variable | Description | -| --- | --- | -| `ETCD_URL` | Etcd endpoint (e.g. `http://etcd1:2379`) | -| `REDIS_URL` | Redis URL (e.g. `redis://redis-master:6379`) | -| `KAFKA_BOOTSTRAP_SERVERS` | Kafka bootstrap servers (e.g. `kafka:9092`) | -| `NODE_IP` | This node's identity / advertised hostname | - -### Query Processor -| Variable | Description | -| --- | --- | -| `APP_PORT` | HTTP listen port (default `8080`) | -| `ETCD_URL` | Etcd endpoint | -| `REDIS_URL` | Redis URL (multipart session state) | -| `KAFKA_BOOTSTRAP_SERVERS` | Kafka bootstrap servers | -| `STORAGE_NODE_URL` | A storage node URL used during initial bootstrap | -| `NODE_IP` | This QP's identity for logging/metrics | - -## Erasure Configuration - -> **Note on cluster sizing:** This guide describes the `docker-compose.yml` setup, which runs a **6-node cluster** with `n = 6, m = 4, write_quorum = 5` (4 data + 2 parity shards, tolerates 2 node failures). The system-tests integration suite (`system-tests/src/test/java/koop/RealStorageNodesIT.java`) and the recommended full deployment use a **9-node cluster** with `n = 9, m = 6, write_quorum = 7` (6 data + 3 parity, tolerates 3 failures). The architecture and workflow documents describe the 9-node configuration as the default; the flow is identical, only the seeded numbers and the number of storage-node replicas differ. - -The erasure-set parameters are seeded into Etcd at startup by the `etcd-seeder` service in `docker-compose.yml`. To change `n`, `m`, `write_quorum`, the participating storage nodes, or the partition spread, edit the `etcd-seeder` `command` block in `docker-compose.yml` and restart the cluster. The seeded keys are: - -- `erasure_set_configurations` — list of erasure sets, each with `n` (total shards), `m` (data shards), `write_quorum`, and the list of `(ip, port)` machines. The parity-shard count is computed as `k = n − m`. -- `partition_spread_configurations` — mapping of partitions to erasure sets. - -## Stopping the Cluster - -To stop the cluster and remove containers/networks: -```bash -docker-compose down -``` - -To remove volumes (persisted data) as well: -```bash -docker-compose down -v -``` - -## Accessing the API - -The Query Processors expose an S3-compatible API on ports `9001`, `9002`, and `9003`. You can point any S3 client to these endpoints. - -**Example Health Check:** -```bash -curl http://localhost:9001/health -# Response: "API Gateway is healthy!" -``` - -**AWS CLI Configuration:** -To use the AWS CLI with KoopDB, configure a profile or pass endpoints explicitly: -```bash -aws --endpoint-url=http://localhost:9001 s3 mb s3://my-bucket -aws --endpoint-url=http://localhost:9001 s3 cp test-file.txt s3://my-bucket/ -aws --endpoint-url=http://localhost:9001 s3 ls s3://my-bucket/ -``` -Note: Authentication is currently disabled, so any credentials will suffice. diff --git a/docs/technologies.md b/docs/technologies.md deleted file mode 100644 index bf64e91c..00000000 --- a/docs/technologies.md +++ /dev/null @@ -1,37 +0,0 @@ -# Technologies & Dependencies - -KoopDB leverages a modern Java stack focused on high-performance concurrency and distributed systems primitives. - -## Core Language -- **Java 21 (LTS):** Utilizing the latest language features, particularly **Virtual Threads (Project Loom)** for handling massive concurrency in the Query Processors and Storage Nodes without reactive complexity. - -## Key Libraries - -### Web Framework -- **Javalin (7.0.1):** A lightweight, unopinionated web framework for Java/Kotlin. - - Used for the REST API (S3 routes) on Query Processors. - - Used for internal node-to-node communication APIs on Storage Nodes. - - Configured to use Virtual Threads for optimal throughput. - -### Storage Engine -- **RocksDB (8.10.0):** An embeddable persistent key-value store for fast storage. - - Used by Storage Nodes to persist object metadata, data shards, and operation logs on disk. - - Efficient for high write throughput (LSM-Tree based). - -### Serialization & Logging -- **Jackson (2.17.0):** JSON serialization/deserialization for API responses and internal messages. -- **Log4j 2 (2.23.1):** Asynchronous logging framework. - -### Distributed Coordination -- **Etcd (via Jetcd):** Distributed key-value store holding cluster topology, erasure-set configuration, and partition→erasure-set mapping. Run as a 3-node quorum. -- **Redis:** In-memory store used by Query Processors to track multipart upload session state (active sessions, uploaded part numbers, cached part sizes). Not used for ordering. -- **Kafka:** Per-partition sequencer and pub/sub bus for ordered commit messages (`PutMessage`, `DeleteMessage`, `CreateBucketMessage`, `DeleteBucketMessage`, `MultipartCommitMessage`). Provides total ordering of mutations within a partition; "last write" is then determined by the sequence numbers persisted in each storage node's RocksDB metadata table, resolved at read time via read quorum. - -### Testing & Verification -- **JUnit 5:** Unit and integration testing. -- **AWS SDK for Java (2.x):** Used in system tests to verify S3 compatibility against the running cluster. -- **In-process integration cluster:** End-to-end tests (e.g. `RealStorageNodesIT`) start Etcd, Redis, Kafka, and storage nodes in-process rather than via Testcontainers. - -## Infrastructure -- **Docker & Docker Compose:** Containerization of all components for consistent development and deployment environments. -- **Maven:** Build automation and dependency management. diff --git a/query-processor/README.md b/query-processor/README.md index 9c378df6..ab69d569 100644 --- a/query-processor/README.md +++ b/query-processor/README.md @@ -18,7 +18,7 @@ Built with **Java 21**, **Javalin**, and **Virtual Threads** for high concurrenc The gateway acts as the smart client for the storage layer. Its responsibilities: 1. **Routing:** Maps a `(bucket, key)` to a partition, then to an erasure set, using configuration from Etcd ([`ErasureRouting`](../common-lib/src/main/java/com/github/koop/common/erasure/ErasureRouting.java) and [`PartitionSpreadConfiguration`](../common-lib/src/main/java/com/github/koop/common/metadata/PartitionSpreadConfiguration.java)). -2. **Erasure Encoding/Decoding:** Splits incoming objects into `k` data + `(n-k)` parity shards on PUT, reconstructs from any `k` available shards on GET. +2. **Erasure Encoding/Decoding:** Splits incoming objects into `m` data + `k = n − m` parity shards on PUT, reconstructs from any `m` available shards on GET. 3. **Quorum Coordination:** Waits for shard write ACKs from a configured `write_quorum` of nodes, then publishes a commit message via Kafka. 4. **Multipart Session State:** Tracks active multipart sessions and their parts in Redis.