Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions .github/workflows/distributed-ingestion-coordinator-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ on:
- develop
paths:
- 'include/ingestion/ingestion_coordinator.h'
- 'src/ingestion/ingestion_coordinator.cpp'
- 'proto/ingestion_coordinator.proto'
- 'include/ingestion/ingestion_manager.h'
- 'src/ingestion/ingestion_coordinator.cpp'
- 'tests/test_ingestion_coordinator.cpp'
Expand All @@ -24,6 +26,8 @@ on:
types: [opened, synchronize, reopened]
paths:
- 'include/ingestion/ingestion_coordinator.h'
- 'src/ingestion/ingestion_coordinator.cpp'
- 'proto/ingestion_coordinator.proto'
- 'include/ingestion/ingestion_manager.h'
- 'src/ingestion/ingestion_coordinator.cpp'
- 'tests/test_ingestion_coordinator.cpp'
Expand All @@ -41,6 +45,32 @@ jobs:
contents: read
uses: ./.github/workflows/ci-scope-classifier.yml

# ---------------------------------------------------------------------------
# Build and run IngestionCoordinatorFocusedTests.
# Tests cover all acceptance criteria for the distributed ingestion
# coordinator (v1.8.0):
# AC-1 IngestionCoordinator partitions sources across worker nodes via
# consistent hashing of source_id; all sources are assigned.
# AC-2 Workers receive their assigned sources and run ingestAll() locally;
# partial IngestionReports are aggregated by the coordinator.
# AC-3 ISharedCheckpointStore / InMemorySharedCheckpointStore: write,
# read, exists, clear, size, thread-safe concurrent writes.
# AC-4 setSharedCheckpointStoreForTesting() / getSharedCheckpointStore()
# allow injection of a custom store; coordinator writes checkpoints
# for every ingested source.
# AC-5 Leader election uses TTL-based lease (InProcessLeaderElection);
# split-brain is avoided; leader-acquisition failure returns a
# structured error report.
# AC-COORD-5 Linear throughput scaling ≥ 3.5× for 4 vs 1 worker node
# (THEMIS_RUN_PERF_TESTS=1 gated).
# AC-COORD-6 Coordinator overhead (partitioning + progress aggregation)
# ≤ 5% of total ingestion wall-clock time
# (THEMIS_RUN_PERF_TESTS=1 gated).
# ---------------------------------------------------------------------------
distributed-ingestion-coordinator-tests:
needs: ci-scope-classifier
if: needs.ci-scope-classifier.outputs.has_code_changes == 'true'
name: IngestionCoordinator (${{ matrix.os }} / ${{ matrix.compiler }})
ingestion-coordinator-tests:
needs: ci-scope-classifier
if: needs.ci-scope-classifier.outputs.has_code_changes == 'true'
Expand All @@ -59,6 +89,10 @@ jobs:
compiler: gcc-12
cc: gcc-12
cxx: g++-12
- os: ubuntu-22.04
compiler: clang-15
cc: clang-15
cxx: clang++-15
- os: ubuntu-24.04
compiler: gcc-14
cc: gcc-14
Expand All @@ -68,18 +102,24 @@ jobs:
- name: Checkout repository
uses: actions/checkout@v4
with:
submodules: recursive
fetch-depth: 0

- name: Set up C++ build environment
uses: ./.github/actions/setup-cpp-build
with:
cc: ${{ matrix.cc }}
cxx: ${{ matrix.cxx }}
extra-packages: libssl-dev librocksdb-dev libfmt-dev libspdlog-dev nlohmann-json3-dev libzstd-dev
extra-packages: libssl-dev libcurl4-openssl-dev libboost-all-dev librocksdb-dev libfmt-dev libtbb-dev libspdlog-dev nlohmann-json3-dev

- name: Configure and build
uses: ./.github/actions/configure-themis
with:
cc: ${{ matrix.cc }}
cxx: ${{ matrix.cxx }}
build-target: test_ingestion_coordinator_focused

cc: ${{ matrix.cc }}
cxx: ${{ matrix.cxx }}

Expand All @@ -98,5 +138,27 @@ jobs:
uses: actions/upload-artifact@v4
with:
name: ingestion-coordinator-results-${{ matrix.os }}-${{ matrix.compiler }}
path: |
build/ingestion_coordinator_results.txt
retention-days: 30

- name: Write job summary
if: always()
run: |
echo "## 🔗 Distributed Ingestion Coordinator – Unit Tests" >> "$GITHUB_STEP_SUMMARY"
echo "" >> "$GITHUB_STEP_SUMMARY"
echo "| Parameter | Value |" >> "$GITHUB_STEP_SUMMARY"
echo "|-----------|-------|" >> "$GITHUB_STEP_SUMMARY"
echo "| **OS** | \`${{ matrix.os }}\` |" >> "$GITHUB_STEP_SUMMARY"
echo "| **Compiler** | \`${{ matrix.compiler }}\` |" >> "$GITHUB_STEP_SUMMARY"
echo "| **Event** | \`${{ github.event_name }}\` |" >> "$GITHUB_STEP_SUMMARY"
echo "| **Branch** | \`${{ github.ref_name }}\` |" >> "$GITHUB_STEP_SUMMARY"
echo "| **Commit** | \`${{ github.sha }}\` |" >> "$GITHUB_STEP_SUMMARY"
echo "| **Triggered by** | ${{ github.actor }} |" >> "$GITHUB_STEP_SUMMARY"
echo "" >> "$GITHUB_STEP_SUMMARY"
echo "IngestionCoordinator: consistent-hash ring partitioning, InProcessLeaderElection TTL lease," >> "$GITHUB_STEP_SUMMARY"
echo "WorkStealingPool, ISharedCheckpointStore / InMemorySharedCheckpointStore," >> "$GITHUB_STEP_SUMMARY"
echo "setSharedCheckpointStoreForTesting(), checkpoint-per-source write." >> "$GITHUB_STEP_SUMMARY"
echo "Performance tests (AC-COORD-5, AC-COORD-6) are opt-in via THEMIS_RUN_PERF_TESTS=1." >> "$GITHUB_STEP_SUMMARY"
path: build/ingestion_coordinator_results.txt
retention-days: 30
113 changes: 113 additions & 0 deletions include/ingestion/ingestion_coordinator.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,83 @@
namespace themis {
namespace ingestion {

// ============================================================================
// ISharedCheckpointStore — pluggable shared-backend checkpoint interface
// ============================================================================

/**
* @brief Strategy interface for a cluster-wide checkpoint store.
*
* All worker nodes in a distributed ingestion run share a single
* `ISharedCheckpointStore` so that incremental progress is visible across
* nodes and a failover worker can resume from the last committed offset.
*
* In production this is backed by Redis or the ThemisDB checkpoint
* collection. For single-process deployments the default
* `InMemorySharedCheckpointStore` keeps state in a mutex-protected map
* so that multiple in-process workers see each other's checkpoints.
*/
class ISharedCheckpointStore {
public:
virtual ~ISharedCheckpointStore() = default;

/**
* @brief Atomically write (or overwrite) a checkpoint.
* @return true on success
*/
virtual bool write(const IngestionCheckpoint& cp) = 0;

/**
* @brief Read the checkpoint for a source.
* @param source_id Source whose checkpoint to retrieve
* @param out Populated on success
* @return true if a checkpoint was found and read successfully
*/
virtual bool read(const std::string& source_id,
IngestionCheckpoint& out) const = 0;

/**
* @brief Remove the checkpoint for a source.
* @return true if the checkpoint existed and was deleted
*/
virtual bool clear(const std::string& source_id) = 0;

/**
* @brief Check whether a checkpoint exists for a source.
*/
virtual bool exists(const std::string& source_id) const = 0;
};

// ============================================================================
// InMemorySharedCheckpointStore — thread-safe in-process implementation
// ============================================================================

/**
* @brief `ISharedCheckpointStore` backed by a mutex-protected in-memory map.
*
* Suitable for single-process multi-worker deployments (e.g. tests and the
* default `InProcessWorkerNode` mode). Production deployments inject a Redis-
* or database-backed implementation via
* `IngestionCoordinator::setSharedCheckpointStoreForTesting()`.
*
* Thread-safety: fully thread-safe.
*/
class InMemorySharedCheckpointStore : public ISharedCheckpointStore {
public:
bool write(const IngestionCheckpoint& cp) override;
bool read(const std::string& source_id,
IngestionCheckpoint& out) const override;
bool clear(const std::string& source_id) override;
bool exists(const std::string& source_id) const override;

/** @return Number of checkpoints currently held in memory. */
size_t size() const;

private:
mutable std::mutex mutex_;
std::unordered_map<std::string, IngestionCheckpoint> store_;
};

// ============================================================================
// NodeInfo — lightweight description of a coordinator-managed worker node
// ============================================================================
Expand Down Expand Up @@ -550,6 +627,28 @@ class IngestionCoordinator {
/** @return Snapshot of coordinator runtime metrics. */
CoordinatorMetrics getMetrics() const;

// ── Checkpoint store configuration ──────────────────────────────────────

/**
* @brief Replace the shared checkpoint store.
*
* Must be called before `start()`. The default store is an
* `InMemorySharedCheckpointStore`, which is sufficient for single-process
* deployments. Multi-host deployments should provide a Redis- or
* database-backed implementation before starting the coordinator.
*
* @throws std::logic_error if called while the coordinator is running.
*/
void setSharedCheckpointStore(std::shared_ptr<ISharedCheckpointStore> store);

/**
* @brief Return the shared checkpoint store currently in use.
*
* Useful for test assertions (e.g. verifying a checkpoint was written
* after ingestion).
*/
std::shared_ptr<ISharedCheckpointStore> getSharedCheckpointStore() const;

// ── Testing hooks ────────────────────────────────────────────────────────

/**
Expand All @@ -559,6 +658,17 @@ class IngestionCoordinator {
*/
void setLeaderElectionForTesting(std::shared_ptr<ILeaderElection> election);

/**
* @brief Alias for `setSharedCheckpointStore()` — test code only.
*
* Prefer `setSharedCheckpointStore()` in production. This alias is kept
* so that existing test code continues to compile.
*
* @throws std::logic_error if called while the coordinator is running.
*/
void setSharedCheckpointStoreForTesting(
std::shared_ptr<ISharedCheckpointStore> store);
Comment on lines +661 to +670

private:
Config config_;
std::atomic<bool> running_{false};
Expand All @@ -572,6 +682,9 @@ class IngestionCoordinator {
// Leader election
std::shared_ptr<ILeaderElection> leader_election_;

// Shared checkpoint store (visible to all worker nodes)
std::shared_ptr<ISharedCheckpointStore> checkpoint_store_;

// Metrics
mutable std::mutex metrics_mutex_;
CoordinatorMetrics metrics_;
Expand Down
Loading
Loading