From 9fa9e2a1b295fbbd6c3b83a56dfd43fcd81b8547 Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Tue, 7 Apr 2026 13:18:23 +0200 Subject: [PATCH] WIP: Add testcontainers for integration tests Integration tests now use testcontainers to spin up GreptimeDB and the marketmeteringd service binary automatically. - Added testcontainers and requests dependencies to dev-pytest - Created conftest.py with fixtures for: - GreptimeDB container - marketmeteringd service process - Database schema initialization - Updated test_integration.py to use new fixtures - Tests skip automatically when Docker is unavailable Requires: - Docker running - Service binary at ../frequenz-service-marketmetering/target/release/marketmeteringd Signed-off-by: Mathias L. Baumann --- pyproject.toml | 2 + tests/conftest.py | 235 ++++++++++++++++++++++++++++++++++++++ tests/test_integration.py | 59 +--------- 3 files changed, 243 insertions(+), 53 deletions(-) create mode 100644 tests/conftest.py diff --git a/pyproject.toml b/pyproject.toml index b66e411..36b6f38 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -100,6 +100,8 @@ dev-pytest = [ "pytest-asyncio == 1.3.0", "async-solipsism == 0.9", "frequenz-client-marketmetering[cli]", + "testcontainers == 4.10.0", + "requests == 2.32.3", ] dev = [ "frequenz-client-marketmetering[dev-mkdocs,dev-flake8,dev-formatting,dev-mkdocs,dev-mypy,dev-noxfile,dev-pylint,dev-pytest]", diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..e577efc --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,235 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Pytest fixtures for integration tests using testcontainers.""" + +from __future__ import annotations + +import subprocess +import tempfile +import time +from collections.abc import AsyncIterator +from pathlib import Path +from typing import TYPE_CHECKING, Any + +import pytest +import requests + +from frequenz.client.marketmetering import MarketMeteringApiClient + +if TYPE_CHECKING: + from collections.abc import Generator + +SERVICE_REPO_PATH = ( + Path(__file__).parent.parent.parent / "frequenz-service-marketmetering" +) +SCHEMA_PATH = SERVICE_REPO_PATH / "database" / "greptimedb" / "create_schema.sql" +SERVICE_BINARY = SERVICE_REPO_PATH / "target" / "release" / "marketmeteringd" + +GREPTIMEDB_IMAGE = "greptime/greptimedb:v1.0.0-rc.2" +GREPTIMEDB_GRPC_PORT = 4001 +GREPTIMEDB_HTTP_PORT = 4000 + +pytestmark = pytest.mark.integration + + +def _docker_available() -> bool: + """Check if Docker is available and running.""" + try: + import docker + + client = docker.from_env() + client.ping() + return True + except Exception: + return False + + +def pytest_configure(config: pytest.Config) -> None: + """Configure pytest markers.""" + config.addinivalue_line( + "markers", "integration: integration tests requiring Docker" + ) + + +def pytest_collection_modifyitems( + config: pytest.Config, items: list[pytest.Item] +) -> None: + """Skip integration tests if Docker is not available.""" + if not _docker_available(): + skip_reason = pytest.mark.skip(reason="Docker not available") + for item in items: + if "integration" in [m.name for m in item.iter_markers()]: + item.add_marker(skip_reason) + + +class _GreptimeDBContainer: + """GreptimeDB testcontainer wrapper.""" + + def __init__(self) -> None: + from testcontainers.core.container import DockerContainer + + self._container: DockerContainer = DockerContainer(GREPTIMEDB_IMAGE) + self._container.with_exposed_ports( + GREPTIMEDB_HTTP_PORT, GREPTIMEDB_GRPC_PORT, 4002, 4003 + ) + self._container.with_command( + "standalone start --http-addr 0.0.0.0:4000 --rpc-bind-addr 0.0.0.0:4001 " + "--mysql-addr 0.0.0.0:4002 --postgres-addr 0.0.0.0:4003" + ) + + def start(self) -> None: + """Start the container.""" + self._container.start() + + def stop(self) -> None: + """Stop the container.""" + self._container.stop() + + def get_grpc_endpoint(self) -> str: + """Get the gRPC endpoint for GreptimeDB.""" + host = self._container.get_container_host_ip() + port = self._container.get_exposed_port(GREPTIMEDB_GRPC_PORT) + return f"http://{host}:{port}" + + def get_http_url(self) -> str: + """Get the HTTP URL for GreptimeDB.""" + host = self._container.get_container_host_ip() + port = self._container.get_exposed_port(GREPTIMEDB_HTTP_PORT) + return f"http://{host}:{port}" + + def wait_for_health(self, timeout: int = 30) -> None: + """Wait for GreptimeDB to be healthy.""" + start = time.time() + while time.time() - start < timeout: + try: + resp = requests.get(f"{self.get_http_url()}/health", timeout=2) + if resp.status_code == 200: + return + except Exception: + pass + time.sleep(0.5) + raise RuntimeError("GreptimeDB health check timed out") + + +@pytest.fixture(scope="session") +def greptimedb_container() -> Generator[_GreptimeDBContainer, None, None]: + """Start a GreptimeDB container for the test session.""" + if not _docker_available(): + pytest.skip("Docker not available") + + from testcontainers.core.waiting_utils import wait_for_logs + + container = _GreptimeDBContainer() + container.start() + wait_for_logs(container._container, predicate=".*server started.*", timeout=30) + container.wait_for_health() + yield container + container.stop() + + +@pytest.fixture(scope="session") +def greptimedb_schema( + greptimedb_container: _GreptimeDBContainer, +) -> str: + """Initialize the GreptimeDB schema.""" + http_url = greptimedb_container.get_http_url() + sql_path = SCHEMA_PATH + + if not sql_path.exists(): + pytest.skip( + f"Schema file not found at {sql_path}. Is the service repo checked out?" + ) + + sql_content = sql_path.read_text() + + response = requests.post( + f"{http_url}/v1/sql", + data={"sql": sql_content}, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + timeout=30, + ) + if response.status_code != 200: + raise RuntimeError(f"Failed to initialize schema: {response.text}") + + return greptimedb_container.get_grpc_endpoint() + + +@pytest.fixture(scope="session") +def service_binary() -> Path: + """Return path to the marketmeteringd binary.""" + if not SERVICE_BINARY.exists(): + pytest.skip( + f"Service binary not found at {SERVICE_BINARY}. " + "Build the service with 'cargo build --release'" + ) + return SERVICE_BINARY + + +@pytest.fixture(scope="session") +def service_config( + greptimedb_container: _GreptimeDBContainer, + service_binary: Path, +) -> Path: + """Create a config file for the marketmeteringd service.""" + greptimedb_endpoint = greptimedb_container.get_grpc_endpoint() + + config_content = f"""[net] +ip = "[::1]" +port = 50051 + +[auth] +enabled = false + +[storage] +backend = "greptime" +endpoint = "{greptimedb_endpoint}/marketmetering" + +[service] +upsert_stream_buf_size = 32 +""" + config_file = tempfile.NamedTemporaryFile(mode="w", suffix=".toml", delete=False) + config_file.write(config_content) + config_file.close() + return Path(config_file.name) + + +@pytest.fixture(scope="session") +def service_process( + service_binary: Path, + service_config: Path, +) -> Generator[subprocess.Popen[Any], None, None]: + """Start the marketmeteringd service.""" + proc = subprocess.Popen( + [str(service_binary), "--config", str(service_config)], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + + time.sleep(2) + + if proc.poll() is not None: + stdout, stderr = proc.communicate(timeout=5) + raise RuntimeError( + f"Service failed to start.\nstdout: {stdout.decode()}\nstderr: {stderr.decode()}" + ) + + yield proc + + proc.terminate() + try: + proc.wait(timeout=5) + except subprocess.TimeoutExpired: + proc.kill() + + +@pytest.fixture +async def client( + service_process: subprocess.Popen[Any], +) -> AsyncIterator[MarketMeteringApiClient]: + """Create a client connected to the test service.""" + c = MarketMeteringApiClient( + server_url="grpc://[::1]:50051?ssl=false", + auth_key="", + ) + yield c diff --git a/tests/test_integration.py b/tests/test_integration.py index bff93fe..ff5e78a 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -3,34 +3,15 @@ """Integration tests for the MarketMeteringApiClient against a live service. -These tests require a running marketmetering service and are excluded from -CI by default. To run them: +These tests use testcontainers to spin up GreptimeDB and run the +marketmeteringd service binary. They require: +1. Docker to be running +2. The marketmeteringd binary to be built (run 'cargo build --release' in the service repo) +3. The service repo to be checked out at ../frequenz-service-marketmetering - 1. Start the service with auth disabled and test storage backend: - - ./target/release/marketmeteringd -c test-config.toml - - Where test-config.toml contains: - - [net] - ip = "[::1]" - port = 50051 - - [auth] - enabled = false - - [storage] - backend = "test" - - 2. Run the tests: - - uv run pytest -m integration +To run: uv run pytest -m integration """ -import os -import socket -from collections.abc import AsyncIterator - import grpc import pytest from grpc.aio import AioRpcError @@ -48,34 +29,6 @@ TimeResolution, ) -SERVICE_URL = "grpc://[::1]:50051?ssl=false" -AUTH_KEY = "test-key" - -pytestmark = pytest.mark.integration - - -def _service_available() -> bool: - """Check whether the local integration test service is reachable.""" - with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as sock: - sock.settimeout(0.5) - return sock.connect_ex(("::1", 50051, 0, 0)) == 0 - - -@pytest.fixture -async def client() -> AsyncIterator[MarketMeteringApiClient]: - """Create a connected client for testing.""" - if os.environ.get("CI") == "true": - pytest.skip("integration tests are not run in CI") - - if not _service_available(): - pytest.skip("integration test service is not running on [::1]:50051") - - c = MarketMeteringApiClient( - server_url=SERVICE_URL, - auth_key=AUTH_KEY, - ) - yield c - def make_ref( enterprise_id: int = 42, malo_id: str = "DE0000000001"