From a5edc69b365e6cfb70cacc57399b5c6ea3c903a8 Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Sun, 5 Apr 2026 15:56:25 +0100 Subject: [PATCH 1/4] [test][infra] Unify IT infrastructure for all clients --- .github/actions/docker-cache/action.yml | 49 ++ .github/workflows/build_and_test_cpp.yml | 9 +- .github/workflows/build_and_test_python.yml | 11 +- .github/workflows/build_and_test_rust.yml | 31 +- .licenserc.yaml | 1 + Cargo.toml | 2 +- bindings/cpp/CMakeLists.txt | 16 +- bindings/cpp/test/test_main.cpp | 6 - bindings/cpp/test/test_utils.h | 311 +++-------- bindings/python/pyproject.toml | 2 +- bindings/python/test/conftest.py | 253 ++------- crates/fluss-test-cluster/Cargo.toml | 36 ++ crates/fluss-test-cluster/build.rs | 32 ++ crates/fluss-test-cluster/src/lib.rs | 514 ++++++++++++++++++ crates/fluss-test-cluster/src/main.rs | 103 ++++ crates/fluss-test-cluster/test-images.env | 4 + crates/fluss/Cargo.toml | 3 +- .../fluss/tests/integration/fluss_cluster.rs | 451 +-------------- crates/fluss/tests/integration/utils.rs | 2 +- 19 files changed, 928 insertions(+), 908 deletions(-) create mode 100644 .github/actions/docker-cache/action.yml create mode 100644 crates/fluss-test-cluster/Cargo.toml create mode 100644 crates/fluss-test-cluster/build.rs create mode 100644 crates/fluss-test-cluster/src/lib.rs create mode 100644 crates/fluss-test-cluster/src/main.rs create mode 100644 crates/fluss-test-cluster/test-images.env diff --git a/.github/actions/docker-cache/action.yml b/.github/actions/docker-cache/action.yml new file mode 100644 index 00000000..8bbee699 --- /dev/null +++ b/.github/actions/docker-cache/action.yml @@ -0,0 +1,49 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: 'Cache Docker images for Fluss tests' +description: 'Reads image versions from test-images.env, caches via docker save/load.' + +runs: + using: 'composite' + steps: + - name: Read image versions + id: v + shell: bash + run: | + source crates/fluss-test-cluster/test-images.env + echo "fluss=${FLUSS_IMAGE}:${FLUSS_VERSION}" >> "$GITHUB_OUTPUT" + echo "zk=${ZOOKEEPER_IMAGE}:${ZOOKEEPER_VERSION}" >> "$GITHUB_OUTPUT" + + - uses: actions/cache@v4 + with: + path: /tmp/docker-images + key: docker-${{ runner.os }}-${{ steps.v.outputs.fluss }}-${{ steps.v.outputs.zk }} + + - name: Load or pull Docker images + shell: bash + run: | + if [ -f /tmp/docker-images/fluss.tar ]; then + docker load -i /tmp/docker-images/fluss.tar + docker load -i /tmp/docker-images/zookeeper.tar + else + docker pull "${{ steps.v.outputs.fluss }}" + docker pull "${{ steps.v.outputs.zk }}" + mkdir -p /tmp/docker-images + docker save "${{ steps.v.outputs.fluss }}" -o /tmp/docker-images/fluss.tar + docker save "${{ steps.v.outputs.zk }}" -o /tmp/docker-images/zookeeper.tar + fi diff --git a/.github/workflows/build_and_test_cpp.yml b/.github/workflows/build_and_test_cpp.yml index 88a896bc..f2f7a027 100644 --- a/.github/workflows/build_and_test_cpp.yml +++ b/.github/workflows/build_and_test_cpp.yml @@ -38,9 +38,11 @@ concurrency: cancel-in-progress: true jobs: - build-and-test-cpp: + build-and-test: timeout-minutes: 60 runs-on: ubuntu-latest + env: + FLUSS_TEST_CLUSTER_BIN: ${{ github.workspace }}/target/debug/fluss-test-cluster steps: - uses: actions/checkout@v4 @@ -66,6 +68,11 @@ jobs: restore-keys: | cpp-test-${{ runner.os }}- + - uses: ./.github/actions/docker-cache + + - name: Build fluss-test-cluster binary + run: cargo build -p fluss-test-cluster + - name: Build C++ bindings and tests working-directory: bindings/cpp run: | diff --git a/.github/workflows/build_and_test_python.yml b/.github/workflows/build_and_test_python.yml index 3f67157f..1abe0436 100644 --- a/.github/workflows/build_and_test_python.yml +++ b/.github/workflows/build_and_test_python.yml @@ -38,12 +38,14 @@ concurrency: cancel-in-progress: true jobs: - build-and-test-python: + build-and-test: timeout-minutes: 60 runs-on: ubuntu-latest strategy: matrix: python: ["3.9", "3.10", "3.11", "3.12"] + env: + FLUSS_TEST_CLUSTER_BIN: ${{ github.workspace }}/target/debug/fluss-test-cluster steps: - uses: actions/checkout@v4 @@ -69,13 +71,18 @@ jobs: restore-keys: | python-test-${{ runner.os }}-${{ matrix.python }}- + - uses: ./.github/actions/docker-cache + + - name: Build fluss-test-cluster binary + run: cargo build -p fluss-test-cluster + - name: Build Python bindings working-directory: bindings/python run: | uv sync --extra dev --no-install-project uv run maturin develop - - name: Run Python integration tests (parallel) + - name: Run tests (parallel) working-directory: bindings/python run: uv run pytest test/ -v -n auto env: diff --git a/.github/workflows/build_and_test_rust.yml b/.github/workflows/build_and_test_rust.yml index c904e300..7e1ba674 100644 --- a/.github/workflows/build_and_test_rust.yml +++ b/.github/workflows/build_and_test_rust.yml @@ -39,7 +39,7 @@ concurrency: cancel-in-progress: true jobs: - build-and-test-rust: + build-and-unit-test: timeout-minutes: 60 runs-on: ${{ matrix.os }} strategy: @@ -78,10 +78,31 @@ jobs: RUST_LOG: DEBUG RUST_BACKTRACE: full - - name: Integration Test (Linux only) - if: runner.os == 'Linux' - run: | - cargo test --features integration_tests --all-targets --workspace --exclude fluss_python --exclude fluss-cpp + integration-test: + needs: build-and-unit-test + timeout-minutes: 60 + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Install protoc + run: sudo apt-get update && sudo apt-get install -y protobuf-compiler + + - name: Rust Cache + uses: actions/cache@v4 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: rust-${{ runner.os }}-${{ hashFiles('**/Cargo.lock') }} + restore-keys: | + rust-${{ runner.os }}- + + - uses: ./.github/actions/docker-cache + + - name: Integration Test + run: cargo test --features integration_tests --all-targets --workspace --exclude fluss_python --exclude fluss-cpp env: RUST_LOG: DEBUG RUST_BACKTRACE: full diff --git a/.licenserc.yaml b/.licenserc.yaml index 4afd5a9d..27558537 100644 --- a/.licenserc.yaml +++ b/.licenserc.yaml @@ -30,4 +30,5 @@ header: - 'website/**' - '**/*.md' - '**/DEPENDENCIES.*.tsv' + - '**/*.env' comment: on-failure diff --git a/Cargo.toml b/Cargo.toml index d4d262ad..2abdbf06 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,7 @@ keywords = ["fluss", "streaming-storage", "datalake"] [workspace] resolver = "2" -members = ["crates/fluss", "crates/examples", "bindings/python", "bindings/cpp"] +members = ["crates/fluss", "crates/fluss-test-cluster", "crates/examples", "bindings/python", "bindings/cpp"] [workspace.dependencies] fluss = { package = "fluss-rs", version = "0.2.0", path = "crates/fluss", features = ["storage-all"] } diff --git a/bindings/cpp/CMakeLists.txt b/bindings/cpp/CMakeLists.txt index ac936116..f58b7efe 100644 --- a/bindings/cpp/CMakeLists.txt +++ b/bindings/cpp/CMakeLists.txt @@ -25,6 +25,7 @@ project(fluss-cpp LANGUAGES CXX) include(FetchContent) set(FLUSS_GOOGLETEST_VERSION 1.15.2 CACHE STRING "version of GoogleTest") +set(FLUSS_NLOHMANN_JSON_VERSION 3.11.3 CACHE STRING "version of nlohmann/json") set(CMAKE_EXPORT_COMPILE_COMMANDS ON) set(FLUSS_CPP_DEP_MODE "system" CACHE STRING "Dependency provisioning mode for fluss-cpp (system|build)") @@ -253,14 +254,22 @@ if (FLUSS_ENABLE_TESTING) URL https://github.com/google/googletest/archive/refs/tags/v${FLUSS_GOOGLETEST_VERSION}.tar.gz ) set(gtest_force_shared_crt ON CACHE BOOL "" FORCE) - FetchContent_MakeAvailable(googletest) + + set(JSON_BuildTests OFF CACHE INTERNAL "") + FetchContent_Declare( + nlohmann_json + URL https://github.com/nlohmann/json/archive/refs/tags/v${FLUSS_NLOHMANN_JSON_VERSION}.tar.gz + URL_HASH SHA256=0d8ef5af7f9794e3263480193c491549b2ba6cc74bb018906202ada498a79406 + ) + + FetchContent_MakeAvailable(googletest nlohmann_json) enable_testing() include(GoogleTest) file(GLOB TEST_SOURCE_FILES "test/*.cpp") add_executable(fluss_cpp_test ${TEST_SOURCE_FILES}) - target_link_libraries(fluss_cpp_test PRIVATE fluss_cpp GTest::gtest) + target_link_libraries(fluss_cpp_test PRIVATE fluss_cpp GTest::gtest nlohmann_json::nlohmann_json) target_link_libraries(fluss_cpp_test PRIVATE Arrow::arrow_shared) target_compile_definitions(fluss_cpp_test PRIVATE ARROW_FOUND) target_include_directories(fluss_cpp_test PRIVATE @@ -268,15 +277,12 @@ if (FLUSS_ENABLE_TESTING) ${PROJECT_SOURCE_DIR}/test ) - # Individual tests for parallel execution via ctest -j. gtest_discover_tests(fluss_cpp_test PROPERTIES TIMEOUT 120 FIXTURES_REQUIRED fluss_cluster ) - # Cleanup: stop Docker containers after all tests finish. - # Mirrors Python's pytest_unconfigure and Rust's atexit cleanup. add_test(NAME fluss_cluster_cleanup COMMAND fluss_cpp_test --cleanup) set_tests_properties(fluss_cluster_cleanup PROPERTIES FIXTURES_CLEANUP fluss_cluster diff --git a/bindings/cpp/test/test_main.cpp b/bindings/cpp/test/test_main.cpp index 7b132d2c..48d1050b 100644 --- a/bindings/cpp/test/test_main.cpp +++ b/bindings/cpp/test/test_main.cpp @@ -22,20 +22,14 @@ #include "test_utils.h" int main(int argc, char** argv) { - // --cleanup: stop Docker containers and exit (used by ctest FIXTURES_CLEANUP). for (int i = 1; i < argc; ++i) { if (std::string(argv[i]) == "--cleanup") { - const char* env = std::getenv("FLUSS_BOOTSTRAP_SERVERS"); - if (env && std::strlen(env) > 0) return 0; fluss_test::FlussTestCluster::StopAll(); return 0; } } ::testing::InitGoogleTest(&argc, argv); - - // Register the global test environment (manages the Fluss cluster lifecycle). ::testing::AddGlobalTestEnvironment(fluss_test::FlussTestEnvironment::Instance()); - return RUN_ALL_TESTS(); } diff --git a/bindings/cpp/test/test_utils.h b/bindings/cpp/test/test_utils.h index 1ff7e281..27287985 100644 --- a/bindings/cpp/test/test_utils.h +++ b/bindings/cpp/test/test_utils.h @@ -21,266 +21,124 @@ #include -#include #include #include #include #include +#include +#include #include -#include #include -#ifdef _WIN32 -#include -#include -#pragma comment(lib, "ws2_32.lib") -#else -#include -#include -#include -#include -#endif - #include "fluss.hpp" -// Macro to assert Result is OK and print error message on failure #define ASSERT_OK(result) ASSERT_TRUE((result).Ok()) << (result).error_message #define EXPECT_OK(result) EXPECT_TRUE((result).Ok()) << (result).error_message namespace fluss_test { -static constexpr const char* kFlussImage = "apache/fluss"; -static constexpr const char* kFlussVersion = "0.9.0-incubating"; -static constexpr const char* kNetworkName = "fluss-cpp-test-network"; -static constexpr const char* kZookeeperName = "zookeeper-cpp-test"; -static constexpr const char* kCoordinatorName = "coordinator-server-cpp-test"; -static constexpr const char* kTabletServerName = "tablet-server-cpp-test"; -static constexpr int kCoordinatorPort = 9123; -static constexpr int kTabletServerPort = 9124; -static constexpr int kPlainClientPort = 9223; -static constexpr int kPlainClientTabletPort = 9224; - -/// Execute a shell command and return its exit code. -inline int RunCommand(const std::string& cmd) { return system(cmd.c_str()); } - -/// Join property lines with the escaped newline separator used by `printf` in docker commands. -inline std::string JoinProps(const std::vector& lines) { - std::string result; - for (size_t i = 0; i < lines.size(); ++i) { - if (i > 0) result += "\\n"; - result += lines[i]; +inline std::string FindCliBinary() { + const char* env_bin = std::getenv("FLUSS_TEST_CLUSTER_BIN"); + if (env_bin && std::strlen(env_bin) > 0) { + if (std::ifstream(env_bin).good()) { + return env_bin; + } + std::cerr << "FLUSS_TEST_CLUSTER_BIN is set to '" << env_bin + << "' but that file does not exist." << std::endl; + std::abort(); } - return result; -} - -/// Build a `docker run` command with FLUSS_PROPERTIES. -inline std::string DockerRunCmd(const std::string& name, const std::string& props, - const std::vector& port_mappings, - const std::string& server_type) { - std::string cmd = "docker run -d --rm --name " + name + " --network " + kNetworkName; - for (const auto& pm : port_mappings) { - cmd += " -p " + pm; + FILE* pipe = popen("cargo locate-project --workspace --message-format plain", "r"); + if (pipe) { + char buf[512]; + std::string root; + while (fgets(buf, sizeof(buf), pipe)) root += buf; + if (pclose(pipe) == 0) { + // cargo returns path to Cargo.toml; strip filename + trailing whitespace. + while (!root.empty() && (root.back() == '\n' || root.back() == '\r')) root.pop_back(); + auto slash = root.rfind('/'); + if (slash != std::string::npos) { + std::string dir = root.substr(0, slash); + for (const char* profile : {"debug", "release"}) { + std::string path = dir + "/target/" + profile + "/fluss-test-cluster"; + if (std::ifstream(path).good()) return path; + } + } + } } - cmd += " -e FLUSS_PROPERTIES=\"$(printf '" + props + "')\""; - cmd += " " + std::string(kFlussImage) + ":" + kFlussVersion + " " + server_type; - return cmd; + return "fluss-test-cluster"; } -/// Wait until a TCP port is accepting connections, or timeout. -inline bool WaitForPort(const std::string& host, int port, int timeout_seconds = 60) { - auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(timeout_seconds); - - while (std::chrono::steady_clock::now() < deadline) { - int sock = socket(AF_INET, SOCK_STREAM, 0); - if (sock < 0) { - std::this_thread::sleep_for(std::chrono::milliseconds(500)); - continue; - } +constexpr const char* kClusterName = "shared-test"; - struct sockaddr_in addr {}; - addr.sin_family = AF_INET; - addr.sin_port = htons(static_cast(port)); - inet_pton(AF_INET, host.c_str(), &addr.sin_addr); +inline std::string CliStartCmd() { + return FindCliBinary() + " start --sasl --name " + kClusterName; +} - int result = connect(sock, reinterpret_cast(&addr), sizeof(addr)); -#ifdef _WIN32 - closesocket(sock); -#else - close(sock); -#endif - if (result == 0) { - return true; +inline bool ParseClusterJson(const std::string& output, std::string& bootstrap, + std::string& sasl_bootstrap) { + // Last non-empty line is the JSON (progress goes to stderr). + auto last_nl = output.rfind('\n', output.size() - 2); + std::string line = (last_nl != std::string::npos) ? output.substr(last_nl + 1) : output; + try { + auto info = nlohmann::json::parse(line); + bootstrap = info.at("bootstrap_servers").get(); + if (info.contains("sasl_bootstrap_servers") && !info["sasl_bootstrap_servers"].is_null()) { + sasl_bootstrap = info["sasl_bootstrap_servers"].get(); } - - std::this_thread::sleep_for(std::chrono::milliseconds(500)); + return true; + } catch (const nlohmann::json::exception&) { + return false; } - return false; } -/// Manages a Docker-based Fluss cluster for integration testing. class FlussTestCluster { public: FlussTestCluster() = default; bool Start() { - const char* env_servers = std::getenv("FLUSS_BOOTSTRAP_SERVERS"); - if (env_servers && std::strlen(env_servers) > 0) { - bootstrap_servers_ = env_servers; + const char* env = std::getenv("FLUSS_BOOTSTRAP_SERVERS"); + if (env && std::strlen(env) > 0) { + bootstrap_servers_ = env; const char* env_sasl = std::getenv("FLUSS_SASL_BOOTSTRAP_SERVERS"); - if (env_sasl && std::strlen(env_sasl) > 0) { - sasl_bootstrap_servers_ = env_sasl; - } - external_cluster_ = true; - std::cout << "Using external cluster: " << bootstrap_servers_ << std::endl; - return true; - } - - // Reuse cluster started by another parallel test process or previous run. - if (WaitForPort("127.0.0.1", kPlainClientPort, /*timeout_seconds=*/1)) { - SetBootstrapServers(); - external_cluster_ = true; + sasl_bootstrap_servers_ = (env_sasl && std::strlen(env_sasl) > 0) ? env_sasl : env; return true; } - std::cout << "Starting Fluss cluster via Docker..." << std::endl; - - // Remove stopped (not running) containers from previous runs. - RunCommand(std::string("docker rm ") + kTabletServerName + " 2>/dev/null || true"); - RunCommand(std::string("docker rm ") + kCoordinatorName + " 2>/dev/null || true"); - RunCommand(std::string("docker rm ") + kZookeeperName + " 2>/dev/null || true"); - RunCommand(std::string("docker network rm ") + kNetworkName + " 2>/dev/null || true"); - - RunCommand(std::string("docker network create ") + kNetworkName + " 2>/dev/null || true"); - - std::string zk_cmd = std::string("docker run -d --rm") + " --name " + kZookeeperName + - " --network " + kNetworkName + " zookeeper:3.9.2"; - if (RunCommand(zk_cmd) != 0) { - return WaitForCluster(); - } - - // Wait for ZooKeeper to be ready - std::this_thread::sleep_for(std::chrono::seconds(5)); - - // Coordinator Server (dual listeners: SASL on 9123, plaintext on 9223) - std::string sasl_jaas = - "org.apache.fluss.security.auth.sasl.plain.PlainLoginModule required" - " user_admin=\"admin-secret\" user_alice=\"alice-secret\";"; - - std::string coord = std::string(kCoordinatorName); - std::string zk = std::string(kZookeeperName); - std::string coord_props = JoinProps({ - "zookeeper.address: " + zk + ":2181", - "bind.listeners: INTERNAL://" + coord + ":0, CLIENT://" + coord + - ":9123, PLAIN_CLIENT://" + coord + ":9223", - "advertised.listeners: CLIENT://localhost:9123, PLAIN_CLIENT://localhost:9223", - "internal.listener.name: INTERNAL", - "security.protocol.map: CLIENT:sasl", - "security.sasl.enabled.mechanisms: plain", - "security.sasl.plain.jaas.config: " + sasl_jaas, - "netty.server.num-network-threads: 1", - "netty.server.num-worker-threads: 3", - }); - - std::string coord_cmd = DockerRunCmd(kCoordinatorName, coord_props, - {"9123:9123", "9223:9223"}, "coordinatorServer"); - if (RunCommand(coord_cmd) != 0) { - return WaitForCluster(); - } - - if (!WaitForPort("127.0.0.1", kCoordinatorPort)) { - std::cerr << "Coordinator Server did not become ready" << std::endl; + std::string cli_cmd = CliStartCmd(); + FILE* pipe = popen(cli_cmd.c_str(), "r"); + if (!pipe) { + std::cerr << "Failed to launch fluss-test-cluster binary" << std::endl; return false; } - - // Tablet Server (dual listeners: SASL on 9124, plaintext on 9224) - std::string ts = std::string(kTabletServerName); - std::string ts_props = JoinProps({ - "zookeeper.address: " + zk + ":2181", - "bind.listeners: INTERNAL://" + ts + ":0, CLIENT://" + ts + ":9123, PLAIN_CLIENT://" + - ts + ":9223", - "advertised.listeners: CLIENT://localhost:" + std::to_string(kTabletServerPort) + - ", PLAIN_CLIENT://localhost:" + std::to_string(kPlainClientTabletPort), - "internal.listener.name: INTERNAL", - "security.protocol.map: CLIENT:sasl", - "security.sasl.enabled.mechanisms: plain", - "security.sasl.plain.jaas.config: " + sasl_jaas, - "tablet-server.id: 0", - "netty.server.num-network-threads: 1", - "netty.server.num-worker-threads: 3", - }); - - std::string ts_cmd = DockerRunCmd(kTabletServerName, ts_props, - {std::to_string(kTabletServerPort) + ":9123", - std::to_string(kPlainClientTabletPort) + ":9223"}, - "tabletServer"); - if (RunCommand(ts_cmd) != 0) { - return WaitForCluster(); + std::string output; + char buf[512]; + while (fgets(buf, sizeof(buf), pipe)) output += buf; + int rc = pclose(pipe); + if (rc != 0) { + std::cerr << "fluss-test-cluster start failed (exit " << rc << "):\n" + << output << std::endl; + return false; } - - if (!WaitForPort("127.0.0.1", kTabletServerPort) || - !WaitForPort("127.0.0.1", kPlainClientPort) || - !WaitForPort("127.0.0.1", kPlainClientTabletPort)) { - std::cerr << "Cluster listeners did not become ready" << std::endl; + if (!ParseClusterJson(output, bootstrap_servers_, sasl_bootstrap_servers_)) { + std::cerr << "Failed to parse cluster JSON from:\n" << output << std::endl; return false; } - - SetBootstrapServers(); - std::cout << "Fluss cluster started successfully." << std::endl; return true; } - void Stop() { - if (external_cluster_) return; - StopAll(); - } - - /// Unconditionally stop and remove all cluster containers and the network. - /// Used by the --cleanup flag from ctest FIXTURES_CLEANUP. static void StopAll() { - std::cout << "Stopping Fluss cluster..." << std::endl; - RunCommand(std::string("docker rm -f ") + kTabletServerName + " 2>/dev/null || true"); - RunCommand(std::string("docker rm -f ") + kCoordinatorName + " 2>/dev/null || true"); - RunCommand(std::string("docker rm -f ") + kZookeeperName + " 2>/dev/null || true"); - RunCommand(std::string("docker network rm ") + kNetworkName + " 2>/dev/null || true"); - std::cout << "Fluss cluster stopped." << std::endl; + std::string cmd = FindCliBinary() + " stop --name " + kClusterName; + system(cmd.c_str()); } const std::string& GetBootstrapServers() const { return bootstrap_servers_; } const std::string& GetSaslBootstrapServers() const { return sasl_bootstrap_servers_; } private: - void SetBootstrapServers() { - bootstrap_servers_ = "127.0.0.1:" + std::to_string(kPlainClientPort); - sasl_bootstrap_servers_ = "127.0.0.1:" + std::to_string(kCoordinatorPort); - } - - /// Wait for a cluster being started by another process. - /// Fails fast if no containers exist (real Docker failure vs race). - bool WaitForCluster() { - if (RunCommand(std::string("docker inspect ") + kZookeeperName + " >/dev/null 2>&1") != 0) { - std::cerr << "Failed to start cluster (docker error)" << std::endl; - return false; - } - std::cout << "Waiting for cluster started by another process..." << std::endl; - if (!WaitForPort("127.0.0.1", kPlainClientPort) || - !WaitForPort("127.0.0.1", kPlainClientTabletPort) || - !WaitForPort("127.0.0.1", kCoordinatorPort) || - !WaitForPort("127.0.0.1", kTabletServerPort)) { - std::cerr << "Cluster did not become ready" << std::endl; - return false; - } - SetBootstrapServers(); - external_cluster_ = true; - std::cout << "Cluster ready." << std::endl; - return true; - } - std::string bootstrap_servers_; std::string sasl_bootstrap_servers_; - bool external_cluster_{false}; }; -/// GoogleTest Environment that manages the Fluss cluster lifecycle. class FlussTestEnvironment : public ::testing::Environment { public: static FlussTestEnvironment* Instance() { @@ -296,35 +154,18 @@ class FlussTestEnvironment : public ::testing::Environment { GTEST_SKIP() << "Failed to start Fluss cluster. Skipping integration tests."; } - // Retry connection creation until the coordinator is fully initialized. fluss::Configuration config; config.bootstrap_servers = cluster_.GetBootstrapServers(); - - auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(60); - while (std::chrono::steady_clock::now() < deadline) { - auto result = fluss::Connection::Create(config, connection_); - if (result.Ok()) { - auto admin_result = connection_.GetAdmin(admin_); - if (admin_result.Ok()) { - std::vector nodes; - auto nodes_result = admin_.GetServerNodes(nodes); - if (nodes_result.Ok() && - std::any_of(nodes.begin(), nodes.end(), - [](const fluss::ServerNode& n) { - return n.server_type == "TabletServer"; - })) { - std::cout << "Connected to Fluss cluster." << std::endl; - return; - } - } - } - std::cout << "Waiting for Fluss cluster to be ready..." << std::endl; - std::this_thread::sleep_for(std::chrono::seconds(2)); + auto result = fluss::Connection::Create(config, connection_); + if (!result.Ok()) { + GTEST_SKIP() << "Failed to connect: " << result.error_message; + } + auto admin_result = connection_.GetAdmin(admin_); + if (!admin_result.Ok()) { + GTEST_SKIP() << "Failed to get admin: " << admin_result.error_message; } - GTEST_SKIP() << "Fluss cluster did not become ready within timeout."; } - // Cluster stays alive for parallel processes and subsequent runs. void TearDown() override {} fluss::Connection& GetConnection() { return connection_; } @@ -340,7 +181,6 @@ class FlussTestEnvironment : public ::testing::Environment { fluss::Admin admin_; }; -/// Helper: create a table (assert success). Drops existing table first if it exists. inline void CreateTable(fluss::Admin& admin, const fluss::TablePath& path, const fluss::TableDescriptor& descriptor) { admin.DropTable(path, true); // ignore if not exists @@ -348,7 +188,6 @@ inline void CreateTable(fluss::Admin& admin, const fluss::TablePath& path, ASSERT_OK(result); } -/// Helper: create partitions for a partitioned table. inline void CreatePartitions(fluss::Admin& admin, const fluss::TablePath& path, const std::string& partition_column, const std::vector& values) { @@ -360,8 +199,6 @@ inline void CreatePartitions(fluss::Admin& admin, const fluss::TablePath& path, } } -/// Poll a LogScanner for ScanRecords until `expected_count` items are collected or timeout. -/// `extract_fn` is called for each ScanRecord and should return a value of type T. template void PollRecords(fluss::LogScanner& scanner, size_t expected_count, ExtractFn extract_fn, std::vector& out) { @@ -375,8 +212,6 @@ void PollRecords(fluss::LogScanner& scanner, size_t expected_count, ExtractFn ex } } -/// Poll a LogScanner for ArrowRecordBatches until `expected_count` items are collected or timeout. -/// `extract_fn` is called with the full ArrowRecordBatches and should return a std::vector. template void PollRecordBatches(fluss::LogScanner& scanner, size_t expected_count, ExtractFn extract_fn, std::vector& out) { diff --git a/bindings/python/pyproject.toml b/bindings/python/pyproject.toml index b81aad07..41463027 100644 --- a/bindings/python/pyproject.toml +++ b/bindings/python/pyproject.toml @@ -51,9 +51,9 @@ dev = [ "pytest>=8.3.5", "pytest-asyncio>=0.25.3", "pytest-xdist>=3.5.0", + "filelock>=3.0", "ruff>=0.9.10", "maturin>=1.8.2", - "testcontainers>=4.0.0", ] docs = [ "pdoc>=15.0.4", diff --git a/bindings/python/test/conftest.py b/bindings/python/test/conftest.py index 420747e6..a8362cb9 100644 --- a/bindings/python/test/conftest.py +++ b/bindings/python/test/conftest.py @@ -15,270 +15,129 @@ # specific language governing permissions and limitations # under the License. -"""Shared fixtures for Fluss Python integration tests. - -If FLUSS_BOOTSTRAP_SERVERS is set, tests connect to an existing cluster. -Otherwise, a Fluss cluster is started automatically via testcontainers. - -The first pytest-xdist worker to run starts the cluster; other workers -detect it via port check and reuse it (matching the C++ test pattern). -Containers are cleaned up after all workers finish via pytest_unconfigure. - -Run with: - uv run maturin develop && uv run pytest test/ -v -n auto -""" - import asyncio +import json import os -import socket import subprocess +import tempfile import time - -# Disable testcontainers Ryuk reaper for xdist runs — it would kill -# containers when the first worker exits, while others are still running. -# We handle cleanup ourselves in pytest_unconfigure. -# In single-process mode, keep Ryuk as a safety net for hard crashes. -if "PYTEST_XDIST_WORKER" in os.environ: - os.environ.setdefault("TESTCONTAINERS_RYUK_DISABLED", "true") +from pathlib import Path import pytest import pytest_asyncio +from filelock import FileLock import fluss -FLUSS_IMAGE = "apache/fluss" -FLUSS_VERSION = "0.9.0-incubating" -BOOTSTRAP_SERVERS_ENV = os.environ.get("FLUSS_BOOTSTRAP_SERVERS") - -# Container / network names -NETWORK_NAME = "fluss-python-test-network" -ZOOKEEPER_NAME = "zookeeper-python-test" -COORDINATOR_NAME = "coordinator-server-python-test" -TABLET_SERVER_NAME = "tablet-server-python-test" - -# Fixed host ports (must match across workers) -COORDINATOR_PORT = 9123 -TABLET_SERVER_PORT = 9124 -PLAIN_CLIENT_PORT = 9223 -PLAIN_CLIENT_TABLET_PORT = 9224 - -ALL_PORTS = [COORDINATOR_PORT, TABLET_SERVER_PORT, PLAIN_CLIENT_PORT, PLAIN_CLIENT_TABLET_PORT] - - -def _wait_for_port(host, port, timeout=60): - """Wait for a TCP port to become available.""" - start = time.time() - while time.time() - start < timeout: - try: - with socket.create_connection((host, port), timeout=1): - return True - except (ConnectionRefusedError, TimeoutError, OSError): - time.sleep(1) - return False - - -def _all_ports_ready(timeout=60): - """Wait for all cluster ports to become available.""" - deadline = time.time() + timeout - for port in ALL_PORTS: - remaining = deadline - time.time() - if remaining <= 0 or not _wait_for_port("localhost", port, timeout=remaining): - return False - return True - - -def _run_cmd(cmd): - """Run a command (list form), return exit code.""" - return subprocess.run(cmd, capture_output=True).returncode - - -def _start_cluster(): - """Start the Fluss Docker cluster via testcontainers. - - If another worker already started the cluster (detected via port check), - reuse it. If container creation fails (name conflict from a racing worker), - wait for the other worker's cluster to become ready. - """ - # Reuse cluster started by another parallel worker or previous run. - if _wait_for_port("localhost", PLAIN_CLIENT_PORT, timeout=1): - print("Reusing existing cluster via port check.") - return +CLUSTER_NAME = "shared-test" - from testcontainers.core.container import DockerContainer - print("Starting Fluss cluster via testcontainers...") - - # Create a named network via Docker CLI (idempotent, avoids orphaned - # random-named networks when multiple xdist workers race). - _run_cmd(["docker", "network", "create", NETWORK_NAME]) - - sasl_jaas = ( - "org.apache.fluss.security.auth.sasl.plain.PlainLoginModule required" - ' user_admin="admin-secret" user_alice="alice-secret";' - ) - coordinator_props = "\n".join([ - f"zookeeper.address: {ZOOKEEPER_NAME}:2181", - f"bind.listeners: INTERNAL://{COORDINATOR_NAME}:0," - f" CLIENT://{COORDINATOR_NAME}:9123," - f" PLAIN_CLIENT://{COORDINATOR_NAME}:9223", - "advertised.listeners: CLIENT://localhost:9123," - " PLAIN_CLIENT://localhost:9223", - "internal.listener.name: INTERNAL", - "security.protocol.map: CLIENT:sasl", - "security.sasl.enabled.mechanisms: plain", - f"security.sasl.plain.jaas.config: {sasl_jaas}", - "netty.server.num-network-threads: 1", - "netty.server.num-worker-threads: 3", - ]) - tablet_props = "\n".join([ - f"zookeeper.address: {ZOOKEEPER_NAME}:2181", - f"bind.listeners: INTERNAL://{TABLET_SERVER_NAME}:0," - f" CLIENT://{TABLET_SERVER_NAME}:9123," - f" PLAIN_CLIENT://{TABLET_SERVER_NAME}:9223", - "advertised.listeners: CLIENT://localhost:9124," - " PLAIN_CLIENT://localhost:9224", - "internal.listener.name: INTERNAL", - "security.protocol.map: CLIENT:sasl", - "security.sasl.enabled.mechanisms: plain", - f"security.sasl.plain.jaas.config: {sasl_jaas}", - "tablet-server.id: 0", - "netty.server.num-network-threads: 1", - "netty.server.num-worker-threads: 3", - ]) - - zookeeper = ( - DockerContainer("zookeeper:3.9.2") - .with_kwargs(network=NETWORK_NAME) - .with_name(ZOOKEEPER_NAME) - ) - coordinator = ( - DockerContainer(f"{FLUSS_IMAGE}:{FLUSS_VERSION}") - .with_kwargs(network=NETWORK_NAME) - .with_name(COORDINATOR_NAME) - .with_bind_ports(9123, 9123) - .with_bind_ports(9223, 9223) - .with_command("coordinatorServer") - .with_env("FLUSS_PROPERTIES", coordinator_props) +def _find_cli_binary(): + env_bin = os.environ.get("FLUSS_TEST_CLUSTER_BIN") + if env_bin: + if os.path.isfile(env_bin): + return env_bin + raise FileNotFoundError(f"FLUSS_TEST_CLUSTER_BIN={env_bin!r} does not exist") + result = subprocess.run( + ["cargo", "locate-project", "--workspace", "--message-format", "plain"], + capture_output=True, + text=True, ) - tablet_server = ( - DockerContainer(f"{FLUSS_IMAGE}:{FLUSS_VERSION}") - .with_kwargs(network=NETWORK_NAME) - .with_name(TABLET_SERVER_NAME) - .with_bind_ports(9123, 9124) - .with_bind_ports(9223, 9224) - .with_command("tabletServer") - .with_env("FLUSS_PROPERTIES", tablet_props) + if result.returncode == 0: + root = Path(result.stdout.strip()).parent + for profile in ("debug", "release"): + bin_path = root / "target" / profile / "fluss-test-cluster" + if bin_path.is_file(): + return str(bin_path) + raise FileNotFoundError( + "fluss-test-cluster not found. Run: cargo build -p fluss-test-cluster" ) - try: - zookeeper.start() - coordinator.start() - tablet_server.start() - except Exception as e: - # Another worker may have started containers with the same names. - # Wait for the cluster to become ready instead of failing. - print(f"Container start failed ({e}), waiting for cluster from another worker...") - if _all_ports_ready(): - return - raise - - if not _all_ports_ready(): - raise RuntimeError("Cluster listeners did not become ready") - print("Fluss cluster started successfully.") +def _start_cluster(): + lock = Path(tempfile.gettempdir()) / f"fluss-{CLUSTER_NAME}.lock" + with FileLock(lock): + cli = _find_cli_binary() + result = subprocess.run( + [cli, "start", "--sasl", "--name", CLUSTER_NAME], + capture_output=True, + text=True, + ) + if result.returncode != 0: + raise RuntimeError( + f"fluss-test-cluster start failed:\n{result.stderr}\n{result.stdout}" + ) + info = json.loads(result.stdout.strip().split("\n")[-1]) + return info["bootstrap_servers"], info.get("sasl_bootstrap_servers") def _stop_cluster(): - """Stop and remove the Fluss Docker cluster containers.""" - for name in [TABLET_SERVER_NAME, COORDINATOR_NAME, ZOOKEEPER_NAME]: - subprocess.run(["docker", "rm", "-f", name], capture_output=True) - subprocess.run(["docker", "network", "rm", NETWORK_NAME], capture_output=True) - + try: + cli = _find_cli_binary() + except FileNotFoundError: + return + subprocess.run([cli, "stop", "--name", CLUSTER_NAME], capture_output=True) -async def _connect_with_retry(bootstrap_servers, timeout=60): - """Connect to the Fluss cluster with retries until it's fully ready. - Waits until both the coordinator and at least one tablet server are - available, matching the Rust wait_for_cluster_ready pattern. - """ +async def _connect(bootstrap_servers): config = fluss.Config({"bootstrap.servers": bootstrap_servers}) start = time.time() last_err = None - while time.time() - start < timeout: - conn = None + while time.time() - start < 60: try: conn = await fluss.FlussConnection.create(config) admin = conn.get_admin() nodes = await admin.get_server_nodes() if any(n.server_type == "TabletServer" for n in nodes): return conn + conn.close() last_err = RuntimeError("No TabletServer available yet") except Exception as e: last_err = e - if conn is not None: - conn.close() await asyncio.sleep(1) - raise RuntimeError( - f"Could not connect to cluster after {timeout}s: {last_err}" - ) + raise RuntimeError(f"Could not connect after 60s: {last_err}") def pytest_unconfigure(config): - """Clean up Docker containers after all xdist workers finish. - - Runs once on the controller process (or the single process when - not using xdist). Workers are identified by the 'workerinput' attr. - """ - if BOOTSTRAP_SERVERS_ENV: + if os.environ.get("FLUSS_BOOTSTRAP_SERVERS"): return if hasattr(config, "workerinput"): - return # This is a worker, skip + return _stop_cluster() @pytest.fixture(scope="session") def fluss_cluster(): - """Start a Fluss cluster using testcontainers, or use an existing one.""" - if BOOTSTRAP_SERVERS_ENV: - sasl_env = os.environ.get( - "FLUSS_SASL_BOOTSTRAP_SERVERS", BOOTSTRAP_SERVERS_ENV - ) - yield (BOOTSTRAP_SERVERS_ENV, sasl_env) + env = os.environ.get("FLUSS_BOOTSTRAP_SERVERS") + if env: + sasl_env = os.environ.get("FLUSS_SASL_BOOTSTRAP_SERVERS", env) + yield (env, sasl_env) return - _start_cluster() - - # (plaintext_bootstrap, sasl_bootstrap) - yield ( - f"127.0.0.1:{PLAIN_CLIENT_PORT}", - f"127.0.0.1:{COORDINATOR_PORT}", - ) + plaintext_addr, sasl_addr = _start_cluster() + yield (plaintext_addr, sasl_addr or plaintext_addr) @pytest_asyncio.fixture(scope="session") async def connection(fluss_cluster): - """Session-scoped connection to the Fluss cluster (plaintext).""" plaintext_addr, _sasl_addr = fluss_cluster - conn = await _connect_with_retry(plaintext_addr) + conn = await _connect(plaintext_addr) yield conn conn.close() @pytest.fixture(scope="session") def sasl_bootstrap_servers(fluss_cluster): - """Bootstrap servers for the SASL listener.""" _plaintext_addr, sasl_addr = fluss_cluster return sasl_addr @pytest.fixture(scope="session") def plaintext_bootstrap_servers(fluss_cluster): - """Bootstrap servers for the plaintext (non-SASL) listener.""" plaintext_addr, _sasl_addr = fluss_cluster return plaintext_addr @pytest_asyncio.fixture(scope="session") async def admin(connection): - """Session-scoped admin client.""" return connection.get_admin() diff --git a/crates/fluss-test-cluster/Cargo.toml b/crates/fluss-test-cluster/Cargo.toml new file mode 100644 index 00000000..6e7d8e3c --- /dev/null +++ b/crates/fluss-test-cluster/Cargo.toml @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "fluss-test-cluster" +edition.workspace = true +version.workspace = true +license.workspace = true +rust-version.workspace = true +publish = false + +[[bin]] +name = "fluss-test-cluster" +path = "src/main.rs" + +[dependencies] +fluss = { workspace = true } +testcontainers = "0.25.0" +tokio = { workspace = true } +clap = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } diff --git a/crates/fluss-test-cluster/build.rs b/crates/fluss-test-cluster/build.rs new file mode 100644 index 00000000..0145196b --- /dev/null +++ b/crates/fluss-test-cluster/build.rs @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +fn main() { + println!("cargo:rerun-if-changed=test-images.env"); + for line in std::fs::read_to_string("test-images.env") + .expect("test-images.env not found") + .lines() + { + let line = line.trim(); + if line.is_empty() || line.starts_with('#') { + continue; + } + if let Some((key, value)) = line.split_once('=') { + println!("cargo:rustc-env={}={}", key.trim(), value.trim()); + } + } +} diff --git a/crates/fluss-test-cluster/src/lib.rs b/crates/fluss-test-cluster/src/lib.rs new file mode 100644 index 00000000..041c21b0 --- /dev/null +++ b/crates/fluss-test-cluster/src/lib.rs @@ -0,0 +1,514 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use fluss::client::FlussConnection; +use fluss::config::Config; +use std::collections::HashMap; +use std::mem::ManuallyDrop; +use std::sync::Arc; +use std::time::Duration; +use testcontainers::core::ContainerPort; +use testcontainers::runners::AsyncRunner; +use testcontainers::{ContainerAsync, GenericImage, ImageExt}; + +pub const FLUSS_IMAGE: &str = env!("FLUSS_IMAGE"); +pub const FLUSS_VERSION: &str = env!("FLUSS_VERSION"); +pub const ZOOKEEPER_IMAGE: &str = env!("ZOOKEEPER_IMAGE"); +pub const ZOOKEEPER_VERSION: &str = env!("ZOOKEEPER_VERSION"); + +#[derive(serde::Serialize, serde::Deserialize, Debug)] +pub struct ClusterInfo { + pub bootstrap_servers: String, + pub sasl_bootstrap_servers: Option, +} + +pub struct FlussTestingClusterBuilder { + number_of_tablet_servers: u16, + network: &'static str, + cluster_conf: HashMap, + testing_name: String, + remote_data_dir: Option, + sasl_enabled: bool, + sasl_users: Vec<(String, String)>, + coordinator_host_port: u16, + plain_client_port: Option, + image: String, + image_tag: String, +} + +impl FlussTestingClusterBuilder { + pub fn new(testing_name: impl Into) -> Self { + Self::new_with_cluster_conf(testing_name.into(), &HashMap::default()) + } + + pub fn with_remote_data_dir(mut self, dir: std::path::PathBuf) -> Self { + std::fs::create_dir_all(&dir).expect("Failed to create remote data directory"); + self.remote_data_dir = Some(dir); + self + } + + pub fn with_sasl(mut self, users: Vec<(String, String)>) -> Self { + self.sasl_enabled = true; + self.sasl_users = users; + self.plain_client_port = Some(self.coordinator_host_port + 100); + self + } + + pub fn with_port(mut self, port: u16) -> Self { + self.coordinator_host_port = port; + // Re-derive SASL port if SASL was already enabled. + if self.sasl_enabled { + self.plain_client_port = Some(port + 100); + } + self + } + + pub fn new_with_cluster_conf( + testing_name: impl Into, + conf: &HashMap, + ) -> Self { + let mut cluster_conf = conf.clone(); + cluster_conf.insert( + "netty.server.num-network-threads".to_string(), + "1".to_string(), + ); + cluster_conf.insert( + "netty.server.num-worker-threads".to_string(), + "3".to_string(), + ); + + FlussTestingClusterBuilder { + number_of_tablet_servers: 1, + cluster_conf, + network: "fluss-cluster-network", + testing_name: testing_name.into(), + remote_data_dir: None, + sasl_enabled: false, + sasl_users: Vec::new(), + coordinator_host_port: 9123, + plain_client_port: None, + image: FLUSS_IMAGE.to_string(), + image_tag: FLUSS_VERSION.to_string(), + } + } + + fn tablet_server_container_name(&self, server_id: u16) -> String { + format!("tablet-server-{}-{}", self.testing_name, server_id) + } + + fn coordinator_server_container_name(&self) -> String { + format!("coordinator-server-{}", self.testing_name) + } + + fn zookeeper_container_name(&self) -> String { + format!("zookeeper-{}", self.testing_name) + } + + fn container_names(&self) -> Vec { + std::iter::once(self.zookeeper_container_name()) + .chain(std::iter::once(self.coordinator_server_container_name())) + .chain( + (0..self.number_of_tablet_servers).map(|id| self.tablet_server_container_name(id)), + ) + .collect() + } + + fn inject_sasl_conf(&mut self) { + if self.sasl_enabled + && !self.sasl_users.is_empty() + && !self.cluster_conf.contains_key("security.protocol.map") + { + self.cluster_conf.insert( + "security.protocol.map".to_string(), + "CLIENT:sasl".to_string(), + ); + self.cluster_conf.insert( + "security.sasl.enabled.mechanisms".to_string(), + "plain".to_string(), + ); + let user_entries: Vec = self + .sasl_users + .iter() + .map(|(u, p)| format!("user_{}=\"{}\"", u, p)) + .collect(); + let jaas_config = format!( + "org.apache.fluss.security.auth.sasl.plain.PlainLoginModule required {};", + user_entries.join(" ") + ); + self.cluster_conf + .insert("security.sasl.plain.jaas.config".to_string(), jaas_config); + } + } + + fn bootstrap_addresses(&self) -> (String, Option) { + if let Some(plain_port) = self.plain_client_port { + ( + format!("127.0.0.1:{}", plain_port), + Some(format!("127.0.0.1:{}", self.coordinator_host_port)), + ) + } else { + (format!("127.0.0.1:{}", self.coordinator_host_port), None) + } + } + + fn all_containers_exist(&self) -> bool { + self.container_names().iter().all(|name| { + std::process::Command::new("docker") + .args(["ps", "-q", "--filter", &format!("name=^{}$", name)]) + .output() + .map(|o| !String::from_utf8_lossy(&o.stdout).trim().is_empty()) + .unwrap_or(false) + }) + } + + async fn start_all_containers(&mut self) -> Vec> { + for name in &self.container_names() { + let _ = std::process::Command::new("docker") + .args(["rm", "-f", name]) + .output(); + } + self.inject_sasl_conf(); + + let mut containers = Vec::new(); + containers.push(self.start_zookeeper().await); + containers.push(self.start_coordinator_server().await); + for server_id in 0..self.number_of_tablet_servers { + containers.push(self.start_tablet_server(server_id).await); + } + containers + } + + /// Containers stop when the returned struct is dropped. + pub async fn build(&mut self) -> FlussTestingCluster { + let container_names = self.container_names(); + let containers = self.start_all_containers().await; + + let mut iter = containers.into_iter(); + let zookeeper = Arc::new(iter.next().unwrap()); + let coordinator_server = Arc::new(iter.next().unwrap()); + let mut tablet_servers = HashMap::new(); + for server_id in 0..self.number_of_tablet_servers { + tablet_servers.insert(server_id, Arc::new(iter.next().unwrap())); + } + + let (bootstrap_servers, sasl_bootstrap_servers) = self.bootstrap_addresses(); + + FlussTestingCluster { + zookeeper, + coordinator_server, + tablet_servers, + bootstrap_servers, + sasl_bootstrap_servers, + remote_data_dir: self.remote_data_dir.clone(), + sasl_users: self.sasl_users.clone(), + container_names, + } + } + + /// Containers outlive the process. Clean up via `stop_cluster()`. + /// Idempotent: if the cluster is already running, returns its info. + pub async fn build_detached(&mut self) -> ClusterInfo { + if !self.all_containers_exist() { + let containers = self.start_all_containers().await; + let _ = ManuallyDrop::new(containers); + } + + let (bootstrap_servers, sasl_bootstrap_servers) = self.bootstrap_addresses(); + ClusterInfo { + bootstrap_servers, + sasl_bootstrap_servers, + } + } + + async fn start_zookeeper(&self) -> ContainerAsync { + GenericImage::new(ZOOKEEPER_IMAGE, ZOOKEEPER_VERSION) + .with_network(self.network) + .with_container_name(self.zookeeper_container_name()) + .start() + .await + .unwrap() + } + + async fn start_coordinator_server(&mut self) -> ContainerAsync { + let port = self.coordinator_host_port; + let container_name = self.coordinator_server_container_name(); + let mut coordinator_confs = HashMap::new(); + coordinator_confs.insert( + "zookeeper.address", + format!("{}:2181", self.zookeeper_container_name()), + ); + + if let Some(plain_port) = self.plain_client_port { + coordinator_confs.insert( + "bind.listeners", + format!( + "INTERNAL://{}:0, CLIENT://{}:{}, PLAIN_CLIENT://{}:{}", + container_name, container_name, port, container_name, plain_port + ), + ); + coordinator_confs.insert( + "advertised.listeners", + format!( + "CLIENT://localhost:{}, PLAIN_CLIENT://localhost:{}", + port, plain_port + ), + ); + } else { + coordinator_confs.insert( + "bind.listeners", + format!( + "INTERNAL://{}:0, CLIENT://{}:{}", + container_name, container_name, port + ), + ); + coordinator_confs.insert( + "advertised.listeners", + format!("CLIENT://localhost:{}", port), + ); + } + + coordinator_confs.insert("internal.listener.name", "INTERNAL".to_string()); + + let mut image = GenericImage::new(&self.image, &self.image_tag) + .with_container_name(self.coordinator_server_container_name()) + .with_mapped_port(port, ContainerPort::Tcp(port)) + .with_network(self.network) + .with_cmd(vec!["coordinatorServer"]) + .with_env_var( + "FLUSS_PROPERTIES", + self.to_fluss_properties_with(coordinator_confs), + ); + + if let Some(plain_port) = self.plain_client_port { + image = image.with_mapped_port(plain_port, ContainerPort::Tcp(plain_port)); + } + + image.start().await.unwrap() + } + + async fn start_tablet_server(&self, server_id: u16) -> ContainerAsync { + let port = self.coordinator_host_port; + let container_name = self.tablet_server_container_name(server_id); + let mut tablet_server_confs = HashMap::new(); + let expose_host_port = port + 1 + server_id; + let tablet_server_id = format!("{}", server_id); + + if let Some(plain_port) = self.plain_client_port { + let bind_listeners = format!( + "INTERNAL://{}:0, CLIENT://{}:{}, PLAIN_CLIENT://{}:{}", + container_name, container_name, port, container_name, plain_port, + ); + let plain_expose_host_port = plain_port + 1 + server_id; + let advertised_listeners = format!( + "CLIENT://localhost:{}, PLAIN_CLIENT://localhost:{}", + expose_host_port, plain_expose_host_port + ); + tablet_server_confs.insert("bind.listeners", bind_listeners); + tablet_server_confs.insert("advertised.listeners", advertised_listeners); + } else { + let bind_listeners = format!( + "INTERNAL://{}:0, CLIENT://{}:{}", + container_name, container_name, port, + ); + let advertised_listeners = format!("CLIENT://localhost:{}", expose_host_port); + tablet_server_confs.insert("bind.listeners", bind_listeners); + tablet_server_confs.insert("advertised.listeners", advertised_listeners); + } + + tablet_server_confs.insert( + "zookeeper.address", + format!("{}:2181", self.zookeeper_container_name()), + ); + tablet_server_confs.insert("internal.listener.name", "INTERNAL".to_string()); + tablet_server_confs.insert("tablet-server.id", tablet_server_id); + + if let Some(remote_data_dir) = &self.remote_data_dir { + tablet_server_confs.insert( + "remote.data.dir", + remote_data_dir.to_string_lossy().to_string(), + ); + } + let mut image = GenericImage::new(&self.image, &self.image_tag) + .with_cmd(vec!["tabletServer"]) + .with_mapped_port(expose_host_port, ContainerPort::Tcp(port)) + .with_network(self.network) + .with_container_name(self.tablet_server_container_name(server_id)) + .with_env_var( + "FLUSS_PROPERTIES", + self.to_fluss_properties_with(tablet_server_confs), + ); + + if let Some(plain_port) = self.plain_client_port { + let plain_expose_host_port = plain_port + 1 + server_id; + image = image.with_mapped_port(plain_expose_host_port, ContainerPort::Tcp(plain_port)); + } + + if let Some(ref remote_data_dir) = self.remote_data_dir { + use testcontainers::core::Mount; + std::fs::create_dir_all(remote_data_dir) + .expect("Failed to create remote data directory for mount"); + let host_path = remote_data_dir.to_string_lossy().to_string(); + let container_path = remote_data_dir.to_string_lossy().to_string(); + image = image.with_mount(Mount::bind_mount(host_path, container_path)); + } + + image.start().await.unwrap() + } + + fn to_fluss_properties_with(&self, extra_properties: HashMap<&str, String>) -> String { + let mut fluss_properties = Vec::new(); + for (k, v) in self.cluster_conf.iter() { + fluss_properties.push(format!("{}: {}", k, v)); + } + for (k, v) in extra_properties.iter() { + fluss_properties.push(format!("{}: {}", k, v)); + } + fluss_properties.join("\n") + } +} + +#[derive(Clone)] +#[allow(dead_code)] // Fields held for RAII. +pub struct FlussTestingCluster { + zookeeper: Arc>, + coordinator_server: Arc>, + tablet_servers: HashMap>>, + bootstrap_servers: String, + sasl_bootstrap_servers: Option, + remote_data_dir: Option, + sasl_users: Vec<(String, String)>, + container_names: Vec, +} + +impl FlussTestingCluster { + pub fn stop(&self) { + for name in &self.container_names { + let _ = std::process::Command::new("docker") + .args(["rm", "-f", name]) + .output(); + } + if let Some(ref dir) = self.remote_data_dir { + let _ = std::fs::remove_dir_all(dir); + } + } + + pub fn sasl_users(&self) -> &[(String, String)] { + &self.sasl_users + } + + pub fn plaintext_bootstrap_servers(&self) -> &str { + &self.bootstrap_servers + } + + pub async fn get_fluss_connection(&self) -> FlussConnection { + let config = Config { + writer_acks: "all".to_string(), + bootstrap_servers: self.bootstrap_servers.clone(), + ..Default::default() + }; + + self.connect_with_retry(config).await + } + + pub async fn get_fluss_connection_with_sasl( + &self, + username: &str, + password: &str, + ) -> FlussConnection { + let bootstrap = self + .sasl_bootstrap_servers + .clone() + .unwrap_or_else(|| self.bootstrap_servers.clone()); + let config = Config { + writer_acks: "all".to_string(), + bootstrap_servers: bootstrap, + security_protocol: "sasl".to_string(), + security_sasl_mechanism: "PLAIN".to_string(), + security_sasl_username: username.to_string(), + security_sasl_password: password.to_string(), + ..Default::default() + }; + + self.connect_with_retry(config).await + } + + pub async fn try_fluss_connection_with_sasl( + &self, + username: &str, + password: &str, + ) -> fluss::error::Result { + let bootstrap = self + .sasl_bootstrap_servers + .clone() + .unwrap_or_else(|| self.bootstrap_servers.clone()); + let config = Config { + writer_acks: "all".to_string(), + bootstrap_servers: bootstrap, + security_protocol: "sasl".to_string(), + security_sasl_mechanism: "PLAIN".to_string(), + security_sasl_username: username.to_string(), + security_sasl_password: password.to_string(), + ..Default::default() + }; + + FlussConnection::new(config).await + } + + async fn connect_with_retry(&self, config: Config) -> FlussConnection { + let max_retries = 60; + let retry_interval = Duration::from_secs(1); + + for attempt in 1..=max_retries { + match FlussConnection::new(config.clone()).await { + Ok(connection) => { + return connection; + } + Err(e) => { + if attempt == max_retries { + panic!( + "Failed to connect to Fluss cluster after {} attempts: {}", + max_retries, e + ); + } + tokio::time::sleep(retry_interval).await; + } + } + } + unreachable!() + } +} + +pub fn stop_cluster(name: &str) { + let prefixes = [ + format!("zookeeper-{}", name), + format!("coordinator-server-{}", name), + format!("tablet-server-{}-", name), + ]; + for prefix in &prefixes { + if let Ok(output) = std::process::Command::new("docker") + .args(["ps", "-aq", "--filter", &format!("name={}", prefix)]) + .output() + { + let ids = String::from_utf8_lossy(&output.stdout); + for id in ids.split_whitespace() { + let _ = std::process::Command::new("docker") + .args(["rm", "-f", id]) + .output(); + } + } + } +} diff --git a/crates/fluss-test-cluster/src/main.rs b/crates/fluss-test-cluster/src/main.rs new file mode 100644 index 00000000..9bf79741 --- /dev/null +++ b/crates/fluss-test-cluster/src/main.rs @@ -0,0 +1,103 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use clap::{Parser, Subcommand}; +use fluss::ServerType; +use fluss::config::Config; +use fluss_test_cluster::FlussTestingClusterBuilder; +use std::time::Duration; + +#[derive(Parser)] +#[command(about = "Manage a Fluss test cluster via testcontainers")] +struct Cli { + #[command(subcommand)] + command: Command, +} + +#[derive(Subcommand)] +enum Command { + /// Start a Fluss test cluster (idempotent). Prints cluster info as JSON to stdout. + Start { + #[arg(long, default_value = "shared-test")] + name: String, + #[arg(long)] + sasl: bool, + #[arg(long, default_value_t = 9123)] + port: u16, + }, + /// Stop and remove all containers for a cluster. + Stop { + #[arg(long, default_value = "shared-test")] + name: String, + }, +} + +#[tokio::main] +async fn main() { + let cli = Cli::parse(); + + match cli.command { + Command::Start { name, sasl, port } => { + eprintln!("Starting Fluss test cluster '{}'...", name); + + let mut builder = FlussTestingClusterBuilder::new(&name).with_port(port); + + if sasl { + builder = builder.with_sasl(vec![ + ("admin".to_string(), "admin-secret".to_string()), + ("alice".to_string(), "alice-secret".to_string()), + ]); + } + + let info = builder.build_detached().await; + let start = std::time::Instant::now(); + + // Check plaintext endpoint only — can't verify SASL without credentials. + eprintln!("Waiting for cluster to be ready..."); + loop { + let config = Config { + bootstrap_servers: info.bootstrap_servers.clone(), + ..Default::default() + }; + if let Ok(conn) = fluss::client::FlussConnection::new(config).await { + if let Ok(admin) = conn.get_admin() { + if let Ok(nodes) = admin.get_server_nodes().await { + if nodes + .iter() + .any(|n| *n.server_type() == ServerType::TabletServer) + { + break; + } + } + } + } + if start.elapsed() >= Duration::from_secs(60) { + eprintln!("Cluster did not become ready within 60s"); + std::process::exit(1); + } + tokio::time::sleep(Duration::from_secs(1)).await; + } + eprintln!("Cluster ready."); + println!("{}", serde_json::to_string(&info).unwrap()); + } + Command::Stop { name } => { + eprintln!("Stopping Fluss test cluster '{}'...", name); + fluss_test_cluster::stop_cluster(&name); + eprintln!("Cluster stopped."); + } + } +} diff --git a/crates/fluss-test-cluster/test-images.env b/crates/fluss-test-cluster/test-images.env new file mode 100644 index 00000000..3aa8e735 --- /dev/null +++ b/crates/fluss-test-cluster/test-images.env @@ -0,0 +1,4 @@ +FLUSS_IMAGE=apache/fluss +FLUSS_VERSION=0.9.0-incubating +ZOOKEEPER_IMAGE=zookeeper +ZOOKEEPER_VERSION=3.9.2 diff --git a/crates/fluss/Cargo.toml b/crates/fluss/Cargo.toml index ef6a62d3..1186dc8f 100644 --- a/crates/fluss/Cargo.toml +++ b/crates/fluss/Cargo.toml @@ -79,7 +79,8 @@ strum_macros = "0.26" jiff = { workspace = true, features = ["js"] } [dev-dependencies] -testcontainers = "0.27.2" +fluss-test-cluster = { path = "../fluss-test-cluster" } +test-env-helpers = "0.2.2" [build-dependencies] prost-build = "0.14" diff --git a/crates/fluss/tests/integration/fluss_cluster.rs b/crates/fluss/tests/integration/fluss_cluster.rs index 5dc3e33e..0860be5d 100644 --- a/crates/fluss/tests/integration/fluss_cluster.rs +++ b/crates/fluss/tests/integration/fluss_cluster.rs @@ -15,453 +15,4 @@ // specific language governing permissions and limitations // under the License. -use fluss::client::FlussConnection; -use fluss::config::Config; -use std::collections::HashMap; -use std::string::ToString; -use std::sync::Arc; -use std::time::Duration; -use testcontainers::core::ContainerPort; -use testcontainers::runners::AsyncRunner; -use testcontainers::{ContainerAsync, GenericImage, ImageExt}; - -const FLUSS_VERSION: &str = "0.9.0-incubating"; -const FLUSS_IMAGE: &str = "apache/fluss"; - -pub struct FlussTestingClusterBuilder { - number_of_tablet_servers: i32, - network: &'static str, - cluster_conf: HashMap, - testing_name: String, - remote_data_dir: Option, - sasl_enabled: bool, - sasl_users: Vec<(String, String)>, - /// Host port for the coordinator server (default 9123). - coordinator_host_port: u16, - /// Host port for the plaintext (PLAIN_CLIENT) listener. - /// When set together with `sasl_enabled`, the cluster exposes two listeners: - /// CLIENT (SASL) on `coordinator_host_port` and PLAIN_CLIENT on this port. - plain_client_port: Option, - image: String, - image_tag: String, -} - -impl FlussTestingClusterBuilder { - #[allow(dead_code)] - pub fn new(testing_name: impl Into) -> Self { - Self::new_with_cluster_conf(testing_name.into(), &HashMap::default()) - } - - pub fn with_remote_data_dir(mut self, dir: std::path::PathBuf) -> Self { - // Ensure the directory exists before mounting - std::fs::create_dir_all(&dir).expect("Failed to create remote data directory"); - self.remote_data_dir = Some(dir); - self - } - - /// Enable SASL/PLAIN authentication on the cluster with dual listeners. - /// Users are specified as `(username, password)` pairs. - /// This automatically configures a PLAIN_CLIENT (plaintext) listener in addition - /// to the CLIENT (SASL) listener, allowing both authenticated and unauthenticated - /// connections on the same cluster. - pub fn with_sasl(mut self, users: Vec<(String, String)>) -> Self { - self.sasl_enabled = true; - self.sasl_users = users; - self.plain_client_port = Some(self.coordinator_host_port + 100); - self - } - - pub fn new_with_cluster_conf( - testing_name: impl Into, - conf: &HashMap, - ) -> Self { - // reduce testing resources - let mut cluster_conf = conf.clone(); - cluster_conf.insert( - "netty.server.num-network-threads".to_string(), - "1".to_string(), - ); - cluster_conf.insert( - "netty.server.num-worker-threads".to_string(), - "3".to_string(), - ); - - FlussTestingClusterBuilder { - number_of_tablet_servers: 1, - cluster_conf, - network: "fluss-cluster-network", - testing_name: testing_name.into(), - remote_data_dir: None, - sasl_enabled: false, - sasl_users: Vec::new(), - coordinator_host_port: 9123, - plain_client_port: None, - image: FLUSS_IMAGE.to_string(), - image_tag: FLUSS_VERSION.to_string(), - } - } - - fn tablet_server_container_name(&self, server_id: i32) -> String { - format!("tablet-server-{}-{}", self.testing_name, server_id) - } - - fn coordinator_server_container_name(&self) -> String { - format!("coordinator-server-{}", self.testing_name) - } - - fn zookeeper_container_name(&self) -> String { - format!("zookeeper-{}", self.testing_name) - } - - pub async fn build(&mut self) -> FlussTestingCluster { - // Remove stale containers from previous runs (if any) so we can reuse names. - let stale_containers: Vec = std::iter::once(self.zookeeper_container_name()) - .chain(std::iter::once(self.coordinator_server_container_name())) - .chain( - (0..self.number_of_tablet_servers).map(|id| self.tablet_server_container_name(id)), - ) - .collect(); - for name in &stale_containers { - let _ = std::process::Command::new("docker") - .args(["rm", "-f", name]) - .output(); - } - - // Inject SASL server-side configuration into cluster_conf - if self.sasl_enabled && !self.sasl_users.is_empty() { - self.cluster_conf.insert( - "security.protocol.map".to_string(), - "CLIENT:sasl".to_string(), - ); - self.cluster_conf.insert( - "security.sasl.enabled.mechanisms".to_string(), - "plain".to_string(), - ); - // Build JAAS config: user_="" for each user - let user_entries: Vec = self - .sasl_users - .iter() - .map(|(u, p)| format!("user_{}=\"{}\"", u, p)) - .collect(); - let jaas_config = format!( - "org.apache.fluss.security.auth.sasl.plain.PlainLoginModule required {};", - user_entries.join(" ") - ); - self.cluster_conf - .insert("security.sasl.plain.jaas.config".to_string(), jaas_config); - } - - let zookeeper = Arc::new( - GenericImage::new("zookeeper", "3.9.2") - .with_network(self.network) - .with_container_name(self.zookeeper_container_name()) - .start() - .await - .unwrap(), - ); - - let coordinator_server = Arc::new(self.start_coordinator_server().await); - - let mut tablet_servers = HashMap::new(); - for server_id in 0..self.number_of_tablet_servers { - tablet_servers.insert( - server_id, - Arc::new(self.start_tablet_server(server_id).await), - ); - } - - // When dual listeners are configured, bootstrap_servers points to the plaintext - // listener and sasl_bootstrap_servers points to the SASL listener. - let (bootstrap_servers, sasl_bootstrap_servers) = - if let Some(plain_port) = self.plain_client_port { - ( - format!("127.0.0.1:{}", plain_port), - Some(format!("127.0.0.1:{}", self.coordinator_host_port)), - ) - } else { - (format!("127.0.0.1:{}", self.coordinator_host_port), None) - }; - - FlussTestingCluster { - zookeeper, - coordinator_server, - tablet_servers, - bootstrap_servers, - sasl_bootstrap_servers, - remote_data_dir: self.remote_data_dir.clone(), - sasl_users: self.sasl_users.clone(), - container_names: stale_containers, - } - } - - async fn start_coordinator_server(&mut self) -> ContainerAsync { - let port = self.coordinator_host_port; - let container_name = self.coordinator_server_container_name(); - let mut coordinator_confs = HashMap::new(); - coordinator_confs.insert( - "zookeeper.address", - format!("{}:2181", self.zookeeper_container_name()), - ); - - if let Some(plain_port) = self.plain_client_port { - // Dual listeners: CLIENT (SASL) + PLAIN_CLIENT (plaintext) - coordinator_confs.insert( - "bind.listeners", - format!( - "INTERNAL://{}:0, CLIENT://{}:{}, PLAIN_CLIENT://{}:{}", - container_name, container_name, port, container_name, plain_port - ), - ); - coordinator_confs.insert( - "advertised.listeners", - format!( - "CLIENT://localhost:{}, PLAIN_CLIENT://localhost:{}", - port, plain_port - ), - ); - } else { - coordinator_confs.insert( - "bind.listeners", - format!( - "INTERNAL://{}:0, CLIENT://{}:{}", - container_name, container_name, port - ), - ); - coordinator_confs.insert( - "advertised.listeners", - format!("CLIENT://localhost:{}", port), - ); - } - - coordinator_confs.insert("internal.listener.name", "INTERNAL".to_string()); - - let mut image = GenericImage::new(&self.image, &self.image_tag) - .with_container_name(self.coordinator_server_container_name()) - .with_mapped_port(port, ContainerPort::Tcp(port)) - .with_network(self.network) - .with_cmd(vec!["coordinatorServer"]) - .with_env_var( - "FLUSS_PROPERTIES", - self.to_fluss_properties_with(coordinator_confs), - ); - - if let Some(plain_port) = self.plain_client_port { - image = image.with_mapped_port(plain_port, ContainerPort::Tcp(plain_port)); - } - - image.start().await.unwrap() - } - - async fn start_tablet_server(&self, server_id: i32) -> ContainerAsync { - let port = self.coordinator_host_port; - let container_name = self.tablet_server_container_name(server_id); - let mut tablet_server_confs = HashMap::new(); - let expose_host_port = (port as i32) + 1 + server_id; - let tablet_server_id = format!("{}", server_id); - - if let Some(plain_port) = self.plain_client_port { - // Dual listeners: CLIENT (SASL) + PLAIN_CLIENT (plaintext) - let bind_listeners = format!( - "INTERNAL://{}:0, CLIENT://{}:{}, PLAIN_CLIENT://{}:{}", - container_name, container_name, port, container_name, plain_port, - ); - let plain_expose_host_port = (plain_port as i32) + 1 + server_id; - let advertised_listeners = format!( - "CLIENT://localhost:{}, PLAIN_CLIENT://localhost:{}", - expose_host_port, plain_expose_host_port - ); - tablet_server_confs.insert("bind.listeners", bind_listeners); - tablet_server_confs.insert("advertised.listeners", advertised_listeners); - } else { - let bind_listeners = format!( - "INTERNAL://{}:0, CLIENT://{}:{}", - container_name, container_name, port, - ); - let advertised_listeners = format!("CLIENT://localhost:{}", expose_host_port); - tablet_server_confs.insert("bind.listeners", bind_listeners); - tablet_server_confs.insert("advertised.listeners", advertised_listeners); - } - - tablet_server_confs.insert( - "zookeeper.address", - format!("{}:2181", self.zookeeper_container_name()), - ); - tablet_server_confs.insert("internal.listener.name", "INTERNAL".to_string()); - tablet_server_confs.insert("tablet-server.id", tablet_server_id); - - // Set remote.data.dir to use the same path as host when volume mount is provided - // This ensures the path is consistent between host and container - if let Some(remote_data_dir) = &self.remote_data_dir { - tablet_server_confs.insert( - "remote.data.dir", - remote_data_dir.to_string_lossy().to_string(), - ); - } - let mut image = GenericImage::new(&self.image, &self.image_tag) - .with_cmd(vec!["tabletServer"]) - .with_mapped_port(expose_host_port as u16, ContainerPort::Tcp(port)) - .with_network(self.network) - .with_container_name(self.tablet_server_container_name(server_id)) - .with_env_var( - "FLUSS_PROPERTIES", - self.to_fluss_properties_with(tablet_server_confs), - ); - - // Add port mapping for plaintext listener - if let Some(plain_port) = self.plain_client_port { - let plain_expose_host_port = (plain_port as i32) + 1 + server_id; - image = image.with_mapped_port( - plain_expose_host_port as u16, - ContainerPort::Tcp(plain_port), - ); - } - - // Add volume mount if remote_data_dir is provided - if let Some(ref remote_data_dir) = self.remote_data_dir { - use testcontainers::core::Mount; - // Ensure directory exists before mounting (double check) - std::fs::create_dir_all(remote_data_dir) - .expect("Failed to create remote data directory for mount"); - let host_path = remote_data_dir.to_string_lossy().to_string(); - let container_path = remote_data_dir.to_string_lossy().to_string(); - image = image.with_mount(Mount::bind_mount(host_path, container_path)); - } - - image.start().await.unwrap() - } - - fn to_fluss_properties_with(&self, extra_properties: HashMap<&str, String>) -> String { - let mut fluss_properties = Vec::new(); - for (k, v) in self.cluster_conf.iter() { - fluss_properties.push(format!("{}: {}", k, v)); - } - for (k, v) in extra_properties.iter() { - fluss_properties.push(format!("{}: {}", k, v)); - } - fluss_properties.join("\n") - } -} - -/// Provides an easy way to launch a Fluss cluster with coordinator and tablet servers. -#[derive(Clone)] -#[allow(dead_code)] // Fields held for RAII (keeping Docker containers alive). -pub struct FlussTestingCluster { - zookeeper: Arc>, - coordinator_server: Arc>, - tablet_servers: HashMap>>, - /// Bootstrap servers for plaintext connections. - /// When dual listeners are configured, this points to the PLAIN_CLIENT listener. - bootstrap_servers: String, - /// Bootstrap servers for SASL connections (only set when dual listeners are configured). - sasl_bootstrap_servers: Option, - remote_data_dir: Option, - sasl_users: Vec<(String, String)>, - container_names: Vec, -} - -impl FlussTestingCluster { - /// Synchronously stops and removes all Docker containers and cleans up the - /// remote data directory. Safe to call from non-async contexts (e.g. atexit). - #[allow(dead_code)] - pub fn stop(&self) { - for name in &self.container_names { - let _ = std::process::Command::new("docker") - .args(["rm", "-f", name]) - .output(); - } - if let Some(ref dir) = self.remote_data_dir { - let _ = std::fs::remove_dir_all(dir); - } - } - - pub fn sasl_users(&self) -> &[(String, String)] { - &self.sasl_users - } - - /// Returns the plaintext (non-SASL) bootstrap servers address. - pub fn plaintext_bootstrap_servers(&self) -> &str { - &self.bootstrap_servers - } - - pub async fn get_fluss_connection(&self) -> FlussConnection { - let config = Config { - writer_acks: "all".to_string(), - bootstrap_servers: self.bootstrap_servers.clone(), - ..Default::default() - }; - - self.connect_with_retry(config).await - } - - /// Connect with SASL/PLAIN credentials. - /// Uses `sasl_bootstrap_servers` when dual listeners are configured. - pub async fn get_fluss_connection_with_sasl( - &self, - username: &str, - password: &str, - ) -> FlussConnection { - let bootstrap = self - .sasl_bootstrap_servers - .clone() - .unwrap_or_else(|| self.bootstrap_servers.clone()); - let config = Config { - writer_acks: "all".to_string(), - bootstrap_servers: bootstrap, - security_protocol: "sasl".to_string(), - security_sasl_mechanism: "PLAIN".to_string(), - security_sasl_username: username.to_string(), - security_sasl_password: password.to_string(), - ..Default::default() - }; - - self.connect_with_retry(config).await - } - - /// Try to connect with SASL/PLAIN credentials, returning the error on failure. - /// Uses `sasl_bootstrap_servers` when dual listeners are configured. - pub async fn try_fluss_connection_with_sasl( - &self, - username: &str, - password: &str, - ) -> fluss::error::Result { - let bootstrap = self - .sasl_bootstrap_servers - .clone() - .unwrap_or_else(|| self.bootstrap_servers.clone()); - let config = Config { - writer_acks: "all".to_string(), - bootstrap_servers: bootstrap, - security_protocol: "sasl".to_string(), - security_sasl_mechanism: "PLAIN".to_string(), - security_sasl_username: username.to_string(), - security_sasl_password: password.to_string(), - ..Default::default() - }; - - FlussConnection::new(config).await - } - - async fn connect_with_retry(&self, config: Config) -> FlussConnection { - // Retry mechanism: retry for up to 1 minute - let max_retries = 60; // 60 retry attempts - let retry_interval = Duration::from_secs(1); // 1 second interval between retries - - for attempt in 1..=max_retries { - match FlussConnection::new(config.clone()).await { - Ok(connection) => { - return connection; - } - Err(e) => { - if attempt == max_retries { - panic!( - "Failed to connect to Fluss cluster after {} attempts: {}", - max_retries, e - ); - } - tokio::time::sleep(retry_interval).await; - } - } - } - unreachable!() - } -} +pub use fluss_test_cluster::{FlussTestingCluster, FlussTestingClusterBuilder}; diff --git a/crates/fluss/tests/integration/utils.rs b/crates/fluss/tests/integration/utils.rs index 970b84ae..dc2876f8 100644 --- a/crates/fluss/tests/integration/utils.rs +++ b/crates/fluss/tests/integration/utils.rs @@ -53,7 +53,7 @@ static SHARED_CLUSTER: LazyLock = LazyLock::new(|| { ); let cluster = - FlussTestingClusterBuilder::new_with_cluster_conf("shared-test", &cluster_conf) + FlussTestingClusterBuilder::new_with_cluster_conf("rust-test", &cluster_conf) .with_sasl(vec![ ("admin".to_string(), "admin-secret".to_string()), ("alice".to_string(), "alice-secret".to_string()), From 39c1749d98240a570cf5f4e1133d643cbf61ea42 Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Sun, 5 Apr 2026 20:30:02 +0100 Subject: [PATCH 2/4] fix CMake and nlohmann_json --- bindings/cpp/CMakeLists.txt | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/bindings/cpp/CMakeLists.txt b/bindings/cpp/CMakeLists.txt index f58b7efe..c59acd94 100644 --- a/bindings/cpp/CMakeLists.txt +++ b/bindings/cpp/CMakeLists.txt @@ -255,14 +255,17 @@ if (FLUSS_ENABLE_TESTING) ) set(gtest_force_shared_crt ON CACHE BOOL "" FORCE) - set(JSON_BuildTests OFF CACHE INTERNAL "") - FetchContent_Declare( - nlohmann_json - URL https://github.com/nlohmann/json/archive/refs/tags/v${FLUSS_NLOHMANN_JSON_VERSION}.tar.gz - URL_HASH SHA256=0d8ef5af7f9794e3263480193c491549b2ba6cc74bb018906202ada498a79406 - ) - - FetchContent_MakeAvailable(googletest nlohmann_json) + FetchContent_MakeAvailable(googletest) + + if (NOT TARGET nlohmann_json::nlohmann_json) + set(JSON_BuildTests OFF CACHE INTERNAL "") + FetchContent_Declare( + nlohmann_json + URL https://github.com/nlohmann/json/archive/refs/tags/v${FLUSS_NLOHMANN_JSON_VERSION}.tar.gz + URL_HASH SHA256=0d8ef5af7f9794e3263480193c491549b2ba6cc74bb018906202ada498a79406 + ) + FetchContent_MakeAvailable(nlohmann_json) + endif() enable_testing() include(GoogleTest) From a72eb8a9a9864373fd63f25050eeee1980938b50 Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Sun, 5 Apr 2026 22:28:56 +0100 Subject: [PATCH 3/4] finish rebase manually, regenerate .lock --- Cargo.lock | 14 +++++++++++++- crates/fluss-test-cluster/Cargo.toml | 2 +- crates/fluss/Cargo.toml | 1 - 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5af3f875..388d1a25 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1099,6 +1099,7 @@ dependencies = [ "crc32c", "dashmap", "delegate", + "fluss-test-cluster", "futures", "jiff", "linked-hash-map", @@ -1117,13 +1118,24 @@ dependencies = [ "strum", "strum_macros", "tempfile", - "testcontainers", "thiserror 1.0.69", "tokio", "url", "uuid", ] +[[package]] +name = "fluss-test-cluster" +version = "0.2.0" +dependencies = [ + "clap", + "fluss-rs", + "serde", + "serde_json", + "testcontainers", + "tokio", +] + [[package]] name = "fluss_python" version = "0.2.0" diff --git a/crates/fluss-test-cluster/Cargo.toml b/crates/fluss-test-cluster/Cargo.toml index 6e7d8e3c..977df307 100644 --- a/crates/fluss-test-cluster/Cargo.toml +++ b/crates/fluss-test-cluster/Cargo.toml @@ -29,7 +29,7 @@ path = "src/main.rs" [dependencies] fluss = { workspace = true } -testcontainers = "0.25.0" +testcontainers = "0.27.2" tokio = { workspace = true } clap = { workspace = true } serde = { workspace = true } diff --git a/crates/fluss/Cargo.toml b/crates/fluss/Cargo.toml index 1186dc8f..040599ed 100644 --- a/crates/fluss/Cargo.toml +++ b/crates/fluss/Cargo.toml @@ -80,7 +80,6 @@ jiff = { workspace = true, features = ["js"] } [dev-dependencies] fluss-test-cluster = { path = "../fluss-test-cluster" } -test-env-helpers = "0.2.2" [build-dependencies] prost-build = "0.14" From 408152bcc4c50cf5b0d5c9347a95a035a792889e Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Sun, 5 Apr 2026 22:58:14 +0100 Subject: [PATCH 4/4] remove docker cache, it's slower --- .github/actions/docker-cache/action.yml | 49 --------------------- .github/workflows/build_and_test_cpp.yml | 2 - .github/workflows/build_and_test_python.yml | 2 - .github/workflows/build_and_test_rust.yml | 2 - 4 files changed, 55 deletions(-) delete mode 100644 .github/actions/docker-cache/action.yml diff --git a/.github/actions/docker-cache/action.yml b/.github/actions/docker-cache/action.yml deleted file mode 100644 index 8bbee699..00000000 --- a/.github/actions/docker-cache/action.yml +++ /dev/null @@ -1,49 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -name: 'Cache Docker images for Fluss tests' -description: 'Reads image versions from test-images.env, caches via docker save/load.' - -runs: - using: 'composite' - steps: - - name: Read image versions - id: v - shell: bash - run: | - source crates/fluss-test-cluster/test-images.env - echo "fluss=${FLUSS_IMAGE}:${FLUSS_VERSION}" >> "$GITHUB_OUTPUT" - echo "zk=${ZOOKEEPER_IMAGE}:${ZOOKEEPER_VERSION}" >> "$GITHUB_OUTPUT" - - - uses: actions/cache@v4 - with: - path: /tmp/docker-images - key: docker-${{ runner.os }}-${{ steps.v.outputs.fluss }}-${{ steps.v.outputs.zk }} - - - name: Load or pull Docker images - shell: bash - run: | - if [ -f /tmp/docker-images/fluss.tar ]; then - docker load -i /tmp/docker-images/fluss.tar - docker load -i /tmp/docker-images/zookeeper.tar - else - docker pull "${{ steps.v.outputs.fluss }}" - docker pull "${{ steps.v.outputs.zk }}" - mkdir -p /tmp/docker-images - docker save "${{ steps.v.outputs.fluss }}" -o /tmp/docker-images/fluss.tar - docker save "${{ steps.v.outputs.zk }}" -o /tmp/docker-images/zookeeper.tar - fi diff --git a/.github/workflows/build_and_test_cpp.yml b/.github/workflows/build_and_test_cpp.yml index f2f7a027..7b6383af 100644 --- a/.github/workflows/build_and_test_cpp.yml +++ b/.github/workflows/build_and_test_cpp.yml @@ -68,8 +68,6 @@ jobs: restore-keys: | cpp-test-${{ runner.os }}- - - uses: ./.github/actions/docker-cache - - name: Build fluss-test-cluster binary run: cargo build -p fluss-test-cluster diff --git a/.github/workflows/build_and_test_python.yml b/.github/workflows/build_and_test_python.yml index 1abe0436..5eec8c39 100644 --- a/.github/workflows/build_and_test_python.yml +++ b/.github/workflows/build_and_test_python.yml @@ -71,8 +71,6 @@ jobs: restore-keys: | python-test-${{ runner.os }}-${{ matrix.python }}- - - uses: ./.github/actions/docker-cache - - name: Build fluss-test-cluster binary run: cargo build -p fluss-test-cluster diff --git a/.github/workflows/build_and_test_rust.yml b/.github/workflows/build_and_test_rust.yml index 7e1ba674..1c5a99ab 100644 --- a/.github/workflows/build_and_test_rust.yml +++ b/.github/workflows/build_and_test_rust.yml @@ -99,8 +99,6 @@ jobs: restore-keys: | rust-${{ runner.os }}- - - uses: ./.github/actions/docker-cache - - name: Integration Test run: cargo test --features integration_tests --all-targets --workspace --exclude fluss_python --exclude fluss-cpp env: