Skip to content

Commit 9fa9e2a

Browse files
committed
WIP: Add testcontainers for integration tests
Integration tests now use testcontainers to spin up GreptimeDB and the marketmeteringd service binary automatically. - Added testcontainers and requests dependencies to dev-pytest - Created conftest.py with fixtures for: - GreptimeDB container - marketmeteringd service process - Database schema initialization - Updated test_integration.py to use new fixtures - Tests skip automatically when Docker is unavailable Requires: - Docker running - Service binary at ../frequenz-service-marketmetering/target/release/marketmeteringd Signed-off-by: Mathias L. Baumann <mathias.baumann@frequenz.com>
1 parent ea73ca8 commit 9fa9e2a

3 files changed

Lines changed: 243 additions & 53 deletions

File tree

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ dev-pytest = [
100100
"pytest-asyncio == 1.3.0",
101101
"async-solipsism == 0.9",
102102
"frequenz-client-marketmetering[cli]",
103+
"testcontainers == 4.10.0",
104+
"requests == 2.32.3",
103105
]
104106
dev = [
105107
"frequenz-client-marketmetering[dev-mkdocs,dev-flake8,dev-formatting,dev-mkdocs,dev-mypy,dev-noxfile,dev-pylint,dev-pytest]",

tests/conftest.py

Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
# License: MIT
2+
# Copyright © 2026 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Pytest fixtures for integration tests using testcontainers."""
5+
6+
from __future__ import annotations
7+
8+
import subprocess
9+
import tempfile
10+
import time
11+
from collections.abc import AsyncIterator
12+
from pathlib import Path
13+
from typing import TYPE_CHECKING, Any
14+
15+
import pytest
16+
import requests
17+
18+
from frequenz.client.marketmetering import MarketMeteringApiClient
19+
20+
if TYPE_CHECKING:
21+
from collections.abc import Generator
22+
23+
SERVICE_REPO_PATH = (
24+
Path(__file__).parent.parent.parent / "frequenz-service-marketmetering"
25+
)
26+
SCHEMA_PATH = SERVICE_REPO_PATH / "database" / "greptimedb" / "create_schema.sql"
27+
SERVICE_BINARY = SERVICE_REPO_PATH / "target" / "release" / "marketmeteringd"
28+
29+
GREPTIMEDB_IMAGE = "greptime/greptimedb:v1.0.0-rc.2"
30+
GREPTIMEDB_GRPC_PORT = 4001
31+
GREPTIMEDB_HTTP_PORT = 4000
32+
33+
pytestmark = pytest.mark.integration
34+
35+
36+
def _docker_available() -> bool:
37+
"""Check if Docker is available and running."""
38+
try:
39+
import docker
40+
41+
client = docker.from_env()
42+
client.ping()
43+
return True
44+
except Exception:
45+
return False
46+
47+
48+
def pytest_configure(config: pytest.Config) -> None:
49+
"""Configure pytest markers."""
50+
config.addinivalue_line(
51+
"markers", "integration: integration tests requiring Docker"
52+
)
53+
54+
55+
def pytest_collection_modifyitems(
56+
config: pytest.Config, items: list[pytest.Item]
57+
) -> None:
58+
"""Skip integration tests if Docker is not available."""
59+
if not _docker_available():
60+
skip_reason = pytest.mark.skip(reason="Docker not available")
61+
for item in items:
62+
if "integration" in [m.name for m in item.iter_markers()]:
63+
item.add_marker(skip_reason)
64+
65+
66+
class _GreptimeDBContainer:
67+
"""GreptimeDB testcontainer wrapper."""
68+
69+
def __init__(self) -> None:
70+
from testcontainers.core.container import DockerContainer
71+
72+
self._container: DockerContainer = DockerContainer(GREPTIMEDB_IMAGE)
73+
self._container.with_exposed_ports(
74+
GREPTIMEDB_HTTP_PORT, GREPTIMEDB_GRPC_PORT, 4002, 4003
75+
)
76+
self._container.with_command(
77+
"standalone start --http-addr 0.0.0.0:4000 --rpc-bind-addr 0.0.0.0:4001 "
78+
"--mysql-addr 0.0.0.0:4002 --postgres-addr 0.0.0.0:4003"
79+
)
80+
81+
def start(self) -> None:
82+
"""Start the container."""
83+
self._container.start()
84+
85+
def stop(self) -> None:
86+
"""Stop the container."""
87+
self._container.stop()
88+
89+
def get_grpc_endpoint(self) -> str:
90+
"""Get the gRPC endpoint for GreptimeDB."""
91+
host = self._container.get_container_host_ip()
92+
port = self._container.get_exposed_port(GREPTIMEDB_GRPC_PORT)
93+
return f"http://{host}:{port}"
94+
95+
def get_http_url(self) -> str:
96+
"""Get the HTTP URL for GreptimeDB."""
97+
host = self._container.get_container_host_ip()
98+
port = self._container.get_exposed_port(GREPTIMEDB_HTTP_PORT)
99+
return f"http://{host}:{port}"
100+
101+
def wait_for_health(self, timeout: int = 30) -> None:
102+
"""Wait for GreptimeDB to be healthy."""
103+
start = time.time()
104+
while time.time() - start < timeout:
105+
try:
106+
resp = requests.get(f"{self.get_http_url()}/health", timeout=2)
107+
if resp.status_code == 200:
108+
return
109+
except Exception:
110+
pass
111+
time.sleep(0.5)
112+
raise RuntimeError("GreptimeDB health check timed out")
113+
114+
115+
@pytest.fixture(scope="session")
116+
def greptimedb_container() -> Generator[_GreptimeDBContainer, None, None]:
117+
"""Start a GreptimeDB container for the test session."""
118+
if not _docker_available():
119+
pytest.skip("Docker not available")
120+
121+
from testcontainers.core.waiting_utils import wait_for_logs
122+
123+
container = _GreptimeDBContainer()
124+
container.start()
125+
wait_for_logs(container._container, predicate=".*server started.*", timeout=30)
126+
container.wait_for_health()
127+
yield container
128+
container.stop()
129+
130+
131+
@pytest.fixture(scope="session")
132+
def greptimedb_schema(
133+
greptimedb_container: _GreptimeDBContainer,
134+
) -> str:
135+
"""Initialize the GreptimeDB schema."""
136+
http_url = greptimedb_container.get_http_url()
137+
sql_path = SCHEMA_PATH
138+
139+
if not sql_path.exists():
140+
pytest.skip(
141+
f"Schema file not found at {sql_path}. Is the service repo checked out?"
142+
)
143+
144+
sql_content = sql_path.read_text()
145+
146+
response = requests.post(
147+
f"{http_url}/v1/sql",
148+
data={"sql": sql_content},
149+
headers={"Content-Type": "application/x-www-form-urlencoded"},
150+
timeout=30,
151+
)
152+
if response.status_code != 200:
153+
raise RuntimeError(f"Failed to initialize schema: {response.text}")
154+
155+
return greptimedb_container.get_grpc_endpoint()
156+
157+
158+
@pytest.fixture(scope="session")
159+
def service_binary() -> Path:
160+
"""Return path to the marketmeteringd binary."""
161+
if not SERVICE_BINARY.exists():
162+
pytest.skip(
163+
f"Service binary not found at {SERVICE_BINARY}. "
164+
"Build the service with 'cargo build --release'"
165+
)
166+
return SERVICE_BINARY
167+
168+
169+
@pytest.fixture(scope="session")
170+
def service_config(
171+
greptimedb_container: _GreptimeDBContainer,
172+
service_binary: Path,
173+
) -> Path:
174+
"""Create a config file for the marketmeteringd service."""
175+
greptimedb_endpoint = greptimedb_container.get_grpc_endpoint()
176+
177+
config_content = f"""[net]
178+
ip = "[::1]"
179+
port = 50051
180+
181+
[auth]
182+
enabled = false
183+
184+
[storage]
185+
backend = "greptime"
186+
endpoint = "{greptimedb_endpoint}/marketmetering"
187+
188+
[service]
189+
upsert_stream_buf_size = 32
190+
"""
191+
config_file = tempfile.NamedTemporaryFile(mode="w", suffix=".toml", delete=False)
192+
config_file.write(config_content)
193+
config_file.close()
194+
return Path(config_file.name)
195+
196+
197+
@pytest.fixture(scope="session")
198+
def service_process(
199+
service_binary: Path,
200+
service_config: Path,
201+
) -> Generator[subprocess.Popen[Any], None, None]:
202+
"""Start the marketmeteringd service."""
203+
proc = subprocess.Popen(
204+
[str(service_binary), "--config", str(service_config)],
205+
stdout=subprocess.PIPE,
206+
stderr=subprocess.PIPE,
207+
)
208+
209+
time.sleep(2)
210+
211+
if proc.poll() is not None:
212+
stdout, stderr = proc.communicate(timeout=5)
213+
raise RuntimeError(
214+
f"Service failed to start.\nstdout: {stdout.decode()}\nstderr: {stderr.decode()}"
215+
)
216+
217+
yield proc
218+
219+
proc.terminate()
220+
try:
221+
proc.wait(timeout=5)
222+
except subprocess.TimeoutExpired:
223+
proc.kill()
224+
225+
226+
@pytest.fixture
227+
async def client(
228+
service_process: subprocess.Popen[Any],
229+
) -> AsyncIterator[MarketMeteringApiClient]:
230+
"""Create a client connected to the test service."""
231+
c = MarketMeteringApiClient(
232+
server_url="grpc://[::1]:50051?ssl=false",
233+
auth_key="",
234+
)
235+
yield c

tests/test_integration.py

Lines changed: 6 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -3,34 +3,15 @@
33

44
"""Integration tests for the MarketMeteringApiClient against a live service.
55
6-
These tests require a running marketmetering service and are excluded from
7-
CI by default. To run them:
6+
These tests use testcontainers to spin up GreptimeDB and run the
7+
marketmeteringd service binary. They require:
8+
1. Docker to be running
9+
2. The marketmeteringd binary to be built (run 'cargo build --release' in the service repo)
10+
3. The service repo to be checked out at ../frequenz-service-marketmetering
811
9-
1. Start the service with auth disabled and test storage backend:
10-
11-
./target/release/marketmeteringd -c test-config.toml
12-
13-
Where test-config.toml contains:
14-
15-
[net]
16-
ip = "[::1]"
17-
port = 50051
18-
19-
[auth]
20-
enabled = false
21-
22-
[storage]
23-
backend = "test"
24-
25-
2. Run the tests:
26-
27-
uv run pytest -m integration
12+
To run: uv run pytest -m integration
2813
"""
2914

30-
import os
31-
import socket
32-
from collections.abc import AsyncIterator
33-
3415
import grpc
3516
import pytest
3617
from grpc.aio import AioRpcError
@@ -48,34 +29,6 @@
4829
TimeResolution,
4930
)
5031

51-
SERVICE_URL = "grpc://[::1]:50051?ssl=false"
52-
AUTH_KEY = "test-key"
53-
54-
pytestmark = pytest.mark.integration
55-
56-
57-
def _service_available() -> bool:
58-
"""Check whether the local integration test service is reachable."""
59-
with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as sock:
60-
sock.settimeout(0.5)
61-
return sock.connect_ex(("::1", 50051, 0, 0)) == 0
62-
63-
64-
@pytest.fixture
65-
async def client() -> AsyncIterator[MarketMeteringApiClient]:
66-
"""Create a connected client for testing."""
67-
if os.environ.get("CI") == "true":
68-
pytest.skip("integration tests are not run in CI")
69-
70-
if not _service_available():
71-
pytest.skip("integration test service is not running on [::1]:50051")
72-
73-
c = MarketMeteringApiClient(
74-
server_url=SERVICE_URL,
75-
auth_key=AUTH_KEY,
76-
)
77-
yield c
78-
7932

8033
def make_ref(
8134
enterprise_id: int = 42, malo_id: str = "DE0000000001"

0 commit comments

Comments
 (0)