Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,4 @@ Initial release of the Frequenz Market Metering API client for Python.
## Bug Fixes

- `update_market_location()`: Add missing `expected_revision` parameter required for optimistic concurrency control.
- `upsert_samples()`: Attach auth and signing metadata to the streaming upsert RPC so authenticated sample upserts work against services that require signed requests.
5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ dev-pytest = [
"pytest-asyncio == 1.3.0",
"async-solipsism == 0.9",
"frequenz-client-marketmetering[cli]",
"testcontainers == 4.10.0",
"requests == 2.32.3",
]
dev = [
"frequenz-client-marketmetering[dev-mkdocs,dev-flake8,dev-formatting,dev-mkdocs,dev-mypy,dev-noxfile,dev-pylint,dev-pytest]",
Expand Down Expand Up @@ -183,6 +185,9 @@ testpaths = ["tests", "src"]
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function"
required_plugins = ["pytest-asyncio", "pytest-mock"]
markers = [
"integration: integration tests requiring a running gRPC server",
]

[tool.mypy]
explicit_package_bases = true
Expand Down
32 changes: 32 additions & 0 deletions src/frequenz/client/marketmetering/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@

from __future__ import annotations

import hmac
import secrets
import time
from base64 import urlsafe_b64encode
from datetime import datetime, timedelta
from typing import AsyncIterator, cast

Expand Down Expand Up @@ -152,6 +156,33 @@ def stub(self) -> marketmetering_pb2_grpc.MarketMeteringServiceStub:
raise ClientNotConnected(server_url=self.server_url, operation="stub")
return self._stub

def _metadata(self, method: str) -> tuple[tuple[str, str | bytes], ...] | None:
"""Build request metadata for RPCs not covered by client-base interceptors."""
if self._auth_key is None:
return None

metadata: list[tuple[str, str | bytes]] = [("key", self._auth_key)]
if self._sign_secret is None:
return tuple(metadata)

ts = str(int(time.time())).encode()
nonce = urlsafe_b64encode(secrets.token_bytes(16))

digest = hmac.new(self._sign_secret.encode(), digestmod="sha256")
digest.update(self._auth_key.encode())
digest.update(ts)
digest.update(nonce)
digest.update(method.encode())

metadata.extend(
[
("ts", ts),
("nonce", nonce),
("sig", urlsafe_b64encode(digest.digest()).rstrip(b"=")),
]
)
return tuple(metadata)

async def create_market_location(
self,
*,
Expand Down Expand Up @@ -336,6 +367,7 @@ async def request_generator() -> (
AsyncIterator[pb.UpsertMarketLocationSamplesStreamResponse],
self.stub.UpsertMarketLocationSamplesStream(
request_generator(), # type: ignore[arg-type]
metadata=self._metadata("UpsertMarketLocationSamplesStream"),
timeout=self._stream_timeout_seconds,
),
)
Expand Down
235 changes: 235 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
# License: MIT
# Copyright © 2026 Frequenz Energy-as-a-Service GmbH

"""Pytest fixtures for integration tests using testcontainers."""

from __future__ import annotations

import subprocess
import tempfile
import time
from collections.abc import AsyncIterator
from pathlib import Path
from typing import TYPE_CHECKING, Any

import pytest
import requests

from frequenz.client.marketmetering import MarketMeteringApiClient

if TYPE_CHECKING:
from collections.abc import Generator

SERVICE_REPO_PATH = (
Path(__file__).parent.parent.parent / "frequenz-service-marketmetering"
)
SCHEMA_PATH = SERVICE_REPO_PATH / "database" / "greptimedb" / "create_schema.sql"
SERVICE_BINARY = SERVICE_REPO_PATH / "target" / "release" / "marketmeteringd"

GREPTIMEDB_IMAGE = "greptime/greptimedb:v1.0.0-rc.2"
GREPTIMEDB_GRPC_PORT = 4001
GREPTIMEDB_HTTP_PORT = 4000

pytestmark = pytest.mark.integration


def _docker_available() -> bool:
"""Check if Docker is available and running."""
try:
import docker

client = docker.from_env()
client.ping()
return True
except Exception:
return False


def pytest_configure(config: pytest.Config) -> None:
"""Configure pytest markers."""
config.addinivalue_line(
"markers", "integration: integration tests requiring Docker"
)


def pytest_collection_modifyitems(
config: pytest.Config, items: list[pytest.Item]
) -> None:
"""Skip integration tests if Docker is not available."""
if not _docker_available():
skip_reason = pytest.mark.skip(reason="Docker not available")
for item in items:
if "integration" in [m.name for m in item.iter_markers()]:
item.add_marker(skip_reason)


class _GreptimeDBContainer:
"""GreptimeDB testcontainer wrapper."""

def __init__(self) -> None:
from testcontainers.core.container import DockerContainer

self._container: DockerContainer = DockerContainer(GREPTIMEDB_IMAGE)
self._container.with_exposed_ports(
GREPTIMEDB_HTTP_PORT, GREPTIMEDB_GRPC_PORT, 4002, 4003
)
self._container.with_command(
"standalone start --http-addr 0.0.0.0:4000 --rpc-bind-addr 0.0.0.0:4001 "
"--mysql-addr 0.0.0.0:4002 --postgres-addr 0.0.0.0:4003"
)

def start(self) -> None:
"""Start the container."""
self._container.start()

def stop(self) -> None:
"""Stop the container."""
self._container.stop()

def get_grpc_endpoint(self) -> str:
"""Get the gRPC endpoint for GreptimeDB."""
host = self._container.get_container_host_ip()
port = self._container.get_exposed_port(GREPTIMEDB_GRPC_PORT)
return f"http://{host}:{port}"

def get_http_url(self) -> str:
"""Get the HTTP URL for GreptimeDB."""
host = self._container.get_container_host_ip()
port = self._container.get_exposed_port(GREPTIMEDB_HTTP_PORT)
return f"http://{host}:{port}"

def wait_for_health(self, timeout: int = 30) -> None:
"""Wait for GreptimeDB to be healthy."""
start = time.time()
while time.time() - start < timeout:
try:
resp = requests.get(f"{self.get_http_url()}/health", timeout=2)
if resp.status_code == 200:
return
except Exception:
pass
time.sleep(0.5)
raise RuntimeError("GreptimeDB health check timed out")


@pytest.fixture(scope="session")
def greptimedb_container() -> Generator[_GreptimeDBContainer, None, None]:
"""Start a GreptimeDB container for the test session."""
if not _docker_available():
pytest.skip("Docker not available")

from testcontainers.core.waiting_utils import wait_for_logs

container = _GreptimeDBContainer()
container.start()
wait_for_logs(container._container, predicate=".*server started.*", timeout=30)
container.wait_for_health()
yield container
container.stop()


@pytest.fixture(scope="session")
def greptimedb_schema(
greptimedb_container: _GreptimeDBContainer,
) -> str:
"""Initialize the GreptimeDB schema."""
http_url = greptimedb_container.get_http_url()
sql_path = SCHEMA_PATH

if not sql_path.exists():
pytest.skip(
f"Schema file not found at {sql_path}. Is the service repo checked out?"
)

sql_content = sql_path.read_text()

response = requests.post(
f"{http_url}/v1/sql",
data={"sql": sql_content},
headers={"Content-Type": "application/x-www-form-urlencoded"},
timeout=30,
)
if response.status_code != 200:
raise RuntimeError(f"Failed to initialize schema: {response.text}")

return greptimedb_container.get_grpc_endpoint()


@pytest.fixture(scope="session")
def service_binary() -> Path:
"""Return path to the marketmeteringd binary."""
if not SERVICE_BINARY.exists():
pytest.skip(
f"Service binary not found at {SERVICE_BINARY}. "
"Build the service with 'cargo build --release'"
)
return SERVICE_BINARY


@pytest.fixture(scope="session")
def service_config(
greptimedb_container: _GreptimeDBContainer,
service_binary: Path,
) -> Path:
"""Create a config file for the marketmeteringd service."""
greptimedb_endpoint = greptimedb_container.get_grpc_endpoint()

config_content = f"""[net]
ip = "[::1]"
port = 50051

[auth]
enabled = false

[storage]
backend = "greptime"
endpoint = "{greptimedb_endpoint}/marketmetering"

[service]
upsert_stream_buf_size = 32
"""
config_file = tempfile.NamedTemporaryFile(mode="w", suffix=".toml", delete=False)
config_file.write(config_content)
config_file.close()
return Path(config_file.name)


@pytest.fixture(scope="session")
def service_process(
service_binary: Path,
service_config: Path,
) -> Generator[subprocess.Popen[Any], None, None]:
"""Start the marketmeteringd service."""
proc = subprocess.Popen(
[str(service_binary), "--config", str(service_config)],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)

time.sleep(2)

if proc.poll() is not None:
stdout, stderr = proc.communicate(timeout=5)
raise RuntimeError(
f"Service failed to start.\nstdout: {stdout.decode()}\nstderr: {stderr.decode()}"
)

yield proc

proc.terminate()
try:
proc.wait(timeout=5)
except subprocess.TimeoutExpired:
proc.kill()


@pytest.fixture
async def client(
service_process: subprocess.Popen[Any],
) -> AsyncIterator[MarketMeteringApiClient]:
"""Create a client connected to the test service."""
c = MarketMeteringApiClient(
server_url="grpc://[::1]:50051?ssl=false",
auth_key="",
)
yield c
Loading
Loading