From 7b9b8692ae8ecbd82d88311788637773711e045d Mon Sep 17 00:00:00 2001 From: Jianfeng Lu <46821337+gitlujean@users.noreply.github.com> Date: Wed, 6 May 2026 06:47:12 +0800 Subject: [PATCH] Add 3D mesh watermarking toolkit --- README.md | 57 +++++++-- scripts/test_directory.py | 107 +++++++++++++++++ watermark3d/__init__.py | 14 +++ watermark3d/attacks.py | 97 ++++++++++++++++ watermark3d/cli.py | 52 +++++++++ watermark3d/mesh.py | 143 +++++++++++++++++++++++ watermark3d/watermark.py | 239 ++++++++++++++++++++++++++++++++++++++ 7 files changed, 702 insertions(+), 7 deletions(-) create mode 100755 scripts/test_directory.py create mode 100644 watermark3d/__init__.py create mode 100644 watermark3d/attacks.py create mode 100644 watermark3d/cli.py create mode 100644 watermark3d/mesh.py create mode 100644 watermark3d/watermark.py diff --git a/README.md b/README.md index 67ada0c..fdf51d4 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,55 @@ -# Tutorials for Stanford cs228 and cs231n +# CS228 Material -Preparatory material for the probabilistic graphical models and the deep learning classes at Stanford. +## 三维模型顶点索引区间数字水印 -## Material +本仓库包含一个纯 Python、无第三方依赖的三维网格数字水印实现。算法适用于 +OBJ 和 ASCII PLY 文件,核心思想是在稳定轴向上把顶点投影划分为 `n * 2` 个 +相邻区间,并用每对相邻区间内顶点索引编号的大小关系表示水印位: -This repo currently holds: +- `0`:左区间顶点索引均值小于右区间; +- `1`:左区间顶点索引均值大于右区间。 -* A [tutorial](https://github.com/kuleshov/cs228-material/blob/master/tutorials/python/cs228-python-tutorial.ipynb) on basic Python/Numpy that is necesseary to get started with the above machine learning classes. +嵌入阶段不移动顶点坐标,而是在每个区间对内部重新分配顶点编号,并同步改写 +所有面片索引,因此几何形状完全不变。提取阶段重新计算稳定轴、重新分区,然 +后比较相邻区间的顶点编号均值。`repeats` 参数提供重复编码和多数投票,用于 +抵抗轻微噪声、平滑、简化等导致的分区扰动。 -You may follow the iPython notebook on github, or clone and execute it locally. -The notebook is based on an [earlier version](http://cs231n.github.io/python-numpy-tutorial/) prepared by Justin Johnson. +### 方案细化 + +1. **稳定轴选择**:对顶点坐标做中心化,使用协方差矩阵的主特征向量作为轴向。 + 主轴方向符号由轴向投影的三阶中心矩固定;高度对称模型则固定最大绝对分量 + 为正,从而避免 PCA 符号翻转。 +2. **区间同步**:将所有顶点投影到稳定轴上,并按投影排序划分为 + `2 * len(bits) * repeats` 个等顶点数秩区间。每两个相邻区间组成一个水印承载单元; + 使用秩区间比固定宽度区间更不依赖模型尺度和投影端点。 +3. **最优编号重排**:对每个区间对取顶点编号并排序。如果目标位为 `0`,把该 + 对内较小的编号分配给左区间、较大的编号分配给右区间;目标位为 `1` 则反 + 过来。这样不仅满足均值关系,还满足更强的“全体编号大小分离”关系。 +4. **几何无损嵌入**:只改变顶点表顺序和面片引用,不改变任意顶点坐标。 +5. **鲁棒提取**:每个原始比特可嵌入多个区间对,提取时多数投票;测试程序内 + 置相似几何变换、加噪、Laplacian 平滑和简单顶点简化攻击。 + +> 注意:该类水印依赖顶点编号通道。它天然抵抗保持顶点顺序的几何攻击;若攻 +> 击程序完全重新网格化并随机重排顶点,任何仅依赖顶点编号的方案都需要额外 +> 的同步码、纠错码或外部注册机制。 + +### 单个模型嵌入/提取 + +```bash +python -m watermark3d.cli embed input.obj output.obj --bits 1011001110001011 --repeats 5 --metadata output.json +python -m watermark3d.cli extract output.obj --bit-count 16 --repeats 5 --expected 1011001110001011 +``` + +### 整个目录测试 + +```bash +python scripts/test_directory.py \ + --input-dir ./meshes \ + --output-dir ./watermark_out \ + --bits 1011001110001011 \ + --repeats 5 \ + --make-sample-if-empty +``` + +测试程序会遍历目录内所有 `.obj` 和 `.ply` 文件,输出水印模型、攻击后模型以 +及 `watermark_report.json`,其中包含每个模型/攻击的提取比特串和误码率。 diff --git a/scripts/test_directory.py b/scripts/test_directory.py new file mode 100755 index 0000000..b427dd3 --- /dev/null +++ b/scripts/test_directory.py @@ -0,0 +1,107 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import json +import sys +from pathlib import Path + +ROOT = Path(__file__).resolve().parents[1] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + +from watermark3d.attacks import add_noise, affine_transform, laplacian_smooth, vertex_decimate +from watermark3d.mesh import Mesh, load_mesh, save_mesh +from watermark3d.watermark import WatermarkConfig, bit_error_rate, embed_watermark, extract_watermark + +SUPPORTED = {".obj", ".ply"} + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Embed/extract a 3D interval watermark for every OBJ/PLY in a directory." + ) + parser.add_argument("--input-dir", required=True, help="directory containing .obj or ASCII .ply meshes") + parser.add_argument("--output-dir", required=True, help="directory for watermarked and attacked meshes") + parser.add_argument("--bits", default="1011001110001011", help="binary watermark payload") + parser.add_argument("--repeats", type=int, default=5, help="interval-pair repetitions per bit") + parser.add_argument("--make-sample-if-empty", action="store_true", help="write a sample OBJ when input-dir has no meshes") + args = parser.parse_args() + + input_dir = Path(args.input_dir) + output_dir = Path(args.output_dir) + input_dir.mkdir(parents=True, exist_ok=True) + output_dir.mkdir(parents=True, exist_ok=True) + + meshes = sorted(path for path in input_dir.iterdir() if path.suffix.lower() in SUPPORTED) + if not meshes and args.make_sample_if_empty: + sample_path = input_dir / "sample_ellipsoid.obj" + save_mesh(make_sample_ellipsoid(), sample_path) + meshes = [sample_path] + if not meshes: + raise SystemExit("No .obj or .ply meshes found") + + cfg = WatermarkConfig(repeats=args.repeats) + attacks = { + "none": lambda mesh: mesh, + "affine": affine_transform, + "noise": add_noise, + "smooth": laplacian_smooth, + "simplify": vertex_decimate, + } + + report = [] + for mesh_path in meshes: + mesh = load_mesh(mesh_path) + watermarked = embed_watermark(mesh, args.bits, cfg) + base = output_dir / mesh_path.stem + marked_path = base.with_name(base.name + "_watermarked" + mesh_path.suffix) + save_mesh(watermarked, marked_path) + + for attack_name, attack in attacks.items(): + attacked = attack(watermarked.copy()) + attacked_path = base.with_name(base.name + f"_{attack_name}" + mesh_path.suffix) + save_mesh(attacked, attacked_path) + observed = extract_watermark(attacked, len(args.bits), cfg) + report.append( + { + "mesh": str(mesh_path), + "attack": attack_name, + "observed": observed, + "expected": args.bits, + "ber": bit_error_rate(args.bits, observed), + "attacked_path": str(attacked_path), + } + ) + + report_path = output_dir / "watermark_report.json" + report_path.write_text(json.dumps(report, indent=2), encoding="utf-8") + print(json.dumps(report, indent=2)) + + +def make_sample_ellipsoid(rows: int = 26, cols: int = 52) -> Mesh: + import math + + vertices = [] + for r in range(rows + 1): + theta = math.pi * r / rows + for c in range(cols): + phi = 2.0 * math.pi * c / cols + ripple = 1.0 + 0.045 * math.sin(3.0 * phi + 0.7 * r) + 0.025 * math.cos(5.0 * theta + phi) + x = 2.4 * ripple * math.sin(theta) * math.cos(phi) + y = 1.0 * ripple * math.sin(theta) * math.sin(phi) + 0.05 * math.sin(2.0 * theta) + z = 0.7 * ripple * math.cos(theta) + 0.18 * x + 0.08 * x * x + vertices.append((x, y, z)) + faces = [] + for r in range(rows): + for c in range(cols): + a = r * cols + c + b = r * cols + (c + 1) % cols + d = (r + 1) * cols + c + e = (r + 1) * cols + (c + 1) % cols + faces.append([a, d, e, b]) + return Mesh(vertices, faces, "obj") + + +if __name__ == "__main__": + main() diff --git a/watermark3d/__init__.py b/watermark3d/__init__.py new file mode 100644 index 0000000..b8786b9 --- /dev/null +++ b/watermark3d/__init__.py @@ -0,0 +1,14 @@ +"""Vertex-index interval watermarking for 3D triangle/polygon meshes.""" + +from .mesh import Mesh, load_mesh, save_mesh +from .watermark import WatermarkConfig, embed_watermark, extract_watermark, bit_error_rate + +__all__ = [ + "Mesh", + "load_mesh", + "save_mesh", + "WatermarkConfig", + "embed_watermark", + "extract_watermark", + "bit_error_rate", +] diff --git a/watermark3d/attacks.py b/watermark3d/attacks.py new file mode 100644 index 0000000..36814ff --- /dev/null +++ b/watermark3d/attacks.py @@ -0,0 +1,97 @@ +from __future__ import annotations + +import math +import random +from typing import Dict, List, Set + +from .mesh import Mesh + + +def affine_transform(mesh: Mesh) -> Mesh: + """Apply a deterministic similarity transform plus translation.""" + + angle = math.radians(31.0) + ca, sa = math.cos(angle), math.sin(angle) + scale = 1.17 + transformed = [] + for x, y, z in mesh.vertices: + rx = ca * x - sa * y + ry = sa * x + ca * y + rz = z + transformed.append((scale * rx + 0.4, scale * ry - 0.2, scale * rz + 0.8)) + return Mesh(transformed, [list(face) for face in mesh.faces], mesh.source_format, mesh.obj_face_suffixes) + + +def add_noise(mesh: Mesh, strength: float = 0.001, seed: int = 7) -> Mesh: + rng = random.Random(seed) + diagonal = _bbox_diagonal(mesh) + noisy = [] + for x, y, z in mesh.vertices: + noisy.append( + ( + x + rng.uniform(-strength, strength) * diagonal, + y + rng.uniform(-strength, strength) * diagonal, + z + rng.uniform(-strength, strength) * diagonal, + ) + ) + return Mesh(noisy, [list(face) for face in mesh.faces], mesh.source_format, mesh.obj_face_suffixes) + + +def laplacian_smooth(mesh: Mesh, iterations: int = 2, lam: float = 0.25) -> Mesh: + neighbors: List[Set[int]] = [set() for _ in mesh.vertices] + for face in mesh.faces: + for i, current in enumerate(face): + previous = face[i - 1] + following = face[(i + 1) % len(face)] + neighbors[current].add(previous) + neighbors[current].add(following) + vertices = list(mesh.vertices) + for _ in range(iterations): + updated = list(vertices) + for idx, vertex_neighbors in enumerate(neighbors): + if not vertex_neighbors: + continue + inv = 1.0 / len(vertex_neighbors) + avg = ( + sum(vertices[j][0] for j in vertex_neighbors) * inv, + sum(vertices[j][1] for j in vertex_neighbors) * inv, + sum(vertices[j][2] for j in vertex_neighbors) * inv, + ) + x, y, z = vertices[idx] + updated[idx] = ( + x * (1.0 - lam) + avg[0] * lam, + y * (1.0 - lam) + avg[1] * lam, + z * (1.0 - lam) + avg[2] * lam, + ) + vertices = updated + return Mesh(vertices, [list(face) for face in mesh.faces], mesh.source_format, mesh.obj_face_suffixes) + + +def vertex_decimate(mesh: Mesh, keep_ratio: float = 0.85) -> Mesh: + """A simple order-preserving simplifier for robustness testing.""" + + if not 0.0 < keep_ratio <= 1.0: + raise ValueError("keep_ratio must be in (0, 1]") + keep_count = max(3, int(len(mesh.vertices) * keep_ratio)) + keep = set(round(i * (len(mesh.vertices) - 1) / (keep_count - 1)) for i in range(keep_count)) + mapping: Dict[int, int] = {} + vertices = [] + for old_index, vertex in enumerate(mesh.vertices): + if old_index in keep: + mapping[old_index] = len(vertices) + vertices.append(vertex) + faces = [] + suffixes = [] + for face_index, face in enumerate(mesh.faces): + if all(index in mapping for index in face) and len(set(face)) == len(face): + faces.append([mapping[index] for index in face]) + if mesh.obj_face_suffixes and face_index < len(mesh.obj_face_suffixes): + suffixes.append(list(mesh.obj_face_suffixes[face_index])) + return Mesh(vertices, faces, mesh.source_format, suffixes if mesh.obj_face_suffixes is not None else None) + + +def _bbox_diagonal(mesh: Mesh) -> float: + xs = [v[0] for v in mesh.vertices] + ys = [v[1] for v in mesh.vertices] + zs = [v[2] for v in mesh.vertices] + return math.sqrt((max(xs) - min(xs)) ** 2 + (max(ys) - min(ys)) ** 2 + (max(zs) - min(zs)) ** 2) diff --git a/watermark3d/cli.py b/watermark3d/cli.py new file mode 100644 index 0000000..02e2418 --- /dev/null +++ b/watermark3d/cli.py @@ -0,0 +1,52 @@ +from __future__ import annotations + +import argparse +import json +from pathlib import Path + +from .mesh import load_mesh, save_mesh +from .watermark import WatermarkConfig, bit_error_rate, embed_watermark, extract_watermark + + +def main() -> None: + parser = argparse.ArgumentParser(description="3D vertex-index interval watermarking") + subparsers = parser.add_subparsers(dest="command", required=True) + + embed = subparsers.add_parser("embed", help="embed bits into one mesh") + embed.add_argument("input") + embed.add_argument("output") + embed.add_argument("--bits", required=True) + embed.add_argument("--repeats", type=int, default=5) + embed.add_argument("--metadata", help="optional JSON metadata output path") + + extract = subparsers.add_parser("extract", help="extract bits from one mesh") + extract.add_argument("input") + extract.add_argument("--bit-count", type=int, required=True) + extract.add_argument("--repeats", type=int, default=5) + extract.add_argument("--expected", help="optional expected bit string for BER reporting") + + args = parser.parse_args() + cfg = WatermarkConfig(repeats=args.repeats) + + if args.command == "embed": + mesh = load_mesh(args.input) + watermarked = embed_watermark(mesh, args.bits, cfg) + save_mesh(watermarked, args.output) + if args.metadata: + Path(args.metadata).write_text( + json.dumps( + {"bits": args.bits, "bit_count": len(args.bits), "repeats": args.repeats}, + indent=2, + ), + encoding="utf-8", + ) + elif args.command == "extract": + mesh = load_mesh(args.input) + observed = extract_watermark(mesh, args.bit_count, cfg) + print(observed) + if args.expected: + print(f"BER={bit_error_rate(args.expected, observed):.6f}") + + +if __name__ == "__main__": + main() diff --git a/watermark3d/mesh.py b/watermark3d/mesh.py new file mode 100644 index 0000000..f7d4169 --- /dev/null +++ b/watermark3d/mesh.py @@ -0,0 +1,143 @@ +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from typing import Iterable, List, Sequence, Tuple + +Vector = Tuple[float, float, float] + + +@dataclass +class Mesh: + """A small mesh container. + + Vertices are zero-based in memory. Face entries are zero-based vertex indices. + OBJ texture/normal suffixes are preserved when possible. + """ + + vertices: List[Vector] + faces: List[List[int]] + source_format: str = "obj" + obj_face_suffixes: List[List[str]] | None = None + + def copy(self) -> "Mesh": + return Mesh( + vertices=list(self.vertices), + faces=[list(face) for face in self.faces], + source_format=self.source_format, + obj_face_suffixes=( + [list(face) for face in self.obj_face_suffixes] + if self.obj_face_suffixes is not None + else None + ), + ) + + +def load_mesh(path: str | Path) -> Mesh: + path = Path(path) + suffix = path.suffix.lower() + if suffix == ".obj": + return _load_obj(path) + if suffix == ".ply": + return _load_ascii_ply(path) + raise ValueError(f"Unsupported mesh format {suffix!r}; expected .obj or ASCII .ply") + + +def save_mesh(mesh: Mesh, path: str | Path) -> None: + path = Path(path) + path.parent.mkdir(parents=True, exist_ok=True) + suffix = path.suffix.lower() + if suffix == ".obj": + _save_obj(mesh, path) + return + if suffix == ".ply": + _save_ascii_ply(mesh, path) + return + raise ValueError(f"Unsupported mesh format {suffix!r}; expected .obj or .ply") + + +def _load_obj(path: Path) -> Mesh: + vertices: List[Vector] = [] + faces: List[List[int]] = [] + suffixes: List[List[str]] = [] + with path.open("r", encoding="utf-8") as handle: + for line in handle: + stripped = line.strip() + if not stripped or stripped.startswith("#"): + continue + parts = stripped.split() + if parts[0] == "v" and len(parts) >= 4: + vertices.append((float(parts[1]), float(parts[2]), float(parts[3]))) + elif parts[0] == "f" and len(parts) >= 4: + face: List[int] = [] + face_suffix: List[str] = [] + for token in parts[1:]: + head, sep, tail = token.partition("/") + raw_index = int(head) + if raw_index < 0: + idx = len(vertices) + raw_index + else: + idx = raw_index - 1 + face.append(idx) + face_suffix.append(sep + tail if sep else "") + faces.append(face) + suffixes.append(face_suffix) + return Mesh(vertices=vertices, faces=faces, source_format="obj", obj_face_suffixes=suffixes) + + +def _save_obj(mesh: Mesh, path: Path) -> None: + with path.open("w", encoding="utf-8") as handle: + handle.write("# written by watermark3d\n") + for x, y, z in mesh.vertices: + handle.write(f"v {x:.12g} {y:.12g} {z:.12g}\n") + for face_number, face in enumerate(mesh.faces): + suffixes: Sequence[str] = [] + if mesh.obj_face_suffixes and face_number < len(mesh.obj_face_suffixes): + suffixes = mesh.obj_face_suffixes[face_number] + tokens = [] + for local_number, idx in enumerate(face): + suffix = suffixes[local_number] if local_number < len(suffixes) else "" + tokens.append(f"{idx + 1}{suffix}") + handle.write("f " + " ".join(tokens) + "\n") + + +def _load_ascii_ply(path: Path) -> Mesh: + with path.open("r", encoding="utf-8") as handle: + header: List[str] = [] + for line in handle: + header.append(line.rstrip("\n")) + if line.strip() == "end_header": + break + if not header or header[0].strip() != "ply" or "format ascii" not in "\n".join(header): + raise ValueError("Only ASCII PLY files are supported") + vertex_count = 0 + face_count = 0 + for line in header: + parts = line.split() + if len(parts) == 3 and parts[:2] == ["element", "vertex"]: + vertex_count = int(parts[2]) + elif len(parts) == 3 and parts[:2] == ["element", "face"]: + face_count = int(parts[2]) + vertices = [] + for _ in range(vertex_count): + parts = handle.readline().split() + vertices.append((float(parts[0]), float(parts[1]), float(parts[2]))) + faces = [] + for _ in range(face_count): + parts = handle.readline().split() + count = int(parts[0]) + faces.append([int(value) for value in parts[1 : 1 + count]]) + return Mesh(vertices=vertices, faces=faces, source_format="ply") + + +def _save_ascii_ply(mesh: Mesh, path: Path) -> None: + with path.open("w", encoding="utf-8") as handle: + handle.write("ply\nformat ascii 1.0\n") + handle.write(f"element vertex {len(mesh.vertices)}\n") + handle.write("property float x\nproperty float y\nproperty float z\n") + handle.write(f"element face {len(mesh.faces)}\n") + handle.write("property list uchar int vertex_indices\nend_header\n") + for x, y, z in mesh.vertices: + handle.write(f"{x:.12g} {y:.12g} {z:.12g}\n") + for face in mesh.faces: + handle.write(f"{len(face)} " + " ".join(str(idx) for idx in face) + "\n") diff --git a/watermark3d/watermark.py b/watermark3d/watermark.py new file mode 100644 index 0000000..60e8394 --- /dev/null +++ b/watermark3d/watermark.py @@ -0,0 +1,239 @@ +from __future__ import annotations + +from dataclasses import dataclass +from math import sqrt +from typing import Dict, List, Sequence, Tuple + +from .mesh import Mesh + +Vector = Tuple[float, float, float] + + +@dataclass(frozen=True) +class WatermarkConfig: + """Configuration for interval-pair vertex-index watermarking. + + ``repeats`` applies a repetition code: each payload bit is embedded in several + independent adjacent interval pairs and decoded by majority vote. + """ + + repeats: int = 5 + min_vertices_per_interval: int = 1 + power_iterations: int = 64 + + +def embed_watermark(mesh: Mesh, bits: str, config: WatermarkConfig | None = None) -> Mesh: + """Return a geometry-identical mesh whose vertex order encodes ``bits``. + + For each adjacent interval pair, bit 0 means the first interval must contain + lower vertex indices than the second interval; bit 1 means the reverse. When + a pair violates the rule, the implementation reassigns the sorted vertex + labels inside that pair, then rewrites every face index consistently. + """ + + cfg = config or WatermarkConfig() + payload = _validate_bits(bits) + _validate_mesh_capacity(mesh, payload, cfg) + + axis, center = stable_axis(mesh.vertices, cfg.power_iterations) + projections = [_dot(_sub(vertex, center), axis) for vertex in mesh.vertices] + pair_count = len(payload) * cfg.repeats + interval_count = pair_count * 2 + bins = assign_intervals(projections, interval_count) + + new_label_for_old: Dict[int, int] = {idx: idx for idx in range(len(mesh.vertices))} + + for pair_number in range(pair_count): + desired_bit = payload[pair_number // cfg.repeats] + left = bins[2 * pair_number] + right = bins[2 * pair_number + 1] + if len(left) < cfg.min_vertices_per_interval or len(right) < cfg.min_vertices_per_interval: + continue + + union = sorted(left + right) + if desired_bit == "0": + left_labels = union[: len(left)] + right_labels = union[len(left) :] + else: + right_labels = union[: len(right)] + left_labels = union[len(right) :] + + for old, label in zip(sorted(left), left_labels): + new_label_for_old[old] = label + for old, label in zip(sorted(right), right_labels): + new_label_for_old[old] = label + + new_vertices: List[Vector] = [(0.0, 0.0, 0.0)] * len(mesh.vertices) + for old_index, vertex in enumerate(mesh.vertices): + new_vertices[new_label_for_old[old_index]] = vertex + + new_faces = [[new_label_for_old[index] for index in face] for face in mesh.faces] + return Mesh( + vertices=new_vertices, + faces=new_faces, + source_format=mesh.source_format, + obj_face_suffixes=( + [list(face) for face in mesh.obj_face_suffixes] + if mesh.obj_face_suffixes is not None + else None + ), + ) + + +def extract_watermark(mesh: Mesh, bit_count: int, config: WatermarkConfig | None = None) -> str: + """Extract a watermark bit string from a possibly attacked mesh.""" + + cfg = config or WatermarkConfig() + if bit_count <= 0: + raise ValueError("bit_count must be positive") + _validate_mesh_capacity(mesh, "0" * bit_count, cfg, allow_sparse=True) + + axis, center = stable_axis(mesh.vertices, cfg.power_iterations) + projections = [_dot(_sub(vertex, center), axis) for vertex in mesh.vertices] + pair_count = bit_count * cfg.repeats + interval_count = pair_count * 2 + bins = assign_intervals(projections, interval_count) + + votes: List[List[str]] = [[] for _ in range(bit_count)] + for pair_number in range(pair_count): + left = bins[2 * pair_number] + right = bins[2 * pair_number + 1] + if len(left) < cfg.min_vertices_per_interval or len(right) < cfg.min_vertices_per_interval: + continue + bit = "0" if _mean(left) < _mean(right) else "1" + votes[pair_number // cfg.repeats].append(bit) + + decoded = [] + for bit_votes in votes: + if not bit_votes: + decoded.append("?") + else: + decoded.append("1" if bit_votes.count("1") > bit_votes.count("0") else "0") + return "".join(decoded) + + +def bit_error_rate(expected: str, observed: str) -> float: + expected = _validate_bits(expected) + if len(expected) != len(observed): + raise ValueError("expected and observed must have the same length") + errors = sum(1 for a, b in zip(expected, observed) if a != b) + return errors / len(expected) + + +def stable_axis(vertices: Sequence[Vector], power_iterations: int = 64) -> Tuple[Vector, Vector]: + """Find a repeatable principal axis using covariance power iteration. + + The axis direction is made deterministic by the third central moment along + the axis. If the model is too symmetric for that test, the largest absolute + component is forced positive. + """ + + if len(vertices) < 3: + raise ValueError("At least three vertices are required") + center = _mean_vertex(vertices) + covariance = [[0.0, 0.0, 0.0] for _ in range(3)] + for vertex in vertices: + shifted = _sub(vertex, center) + for row in range(3): + for col in range(3): + covariance[row][col] += shifted[row] * shifted[col] + scale = 1.0 / len(vertices) + covariance = [[value * scale for value in row] for row in covariance] + + axis = (1.0, 0.61803398875, 0.38196601125) + axis = _normalize(axis) + for _ in range(power_iterations): + axis = _normalize( + ( + covariance[0][0] * axis[0] + covariance[0][1] * axis[1] + covariance[0][2] * axis[2], + covariance[1][0] * axis[0] + covariance[1][1] * axis[1] + covariance[1][2] * axis[2], + covariance[2][0] * axis[0] + covariance[2][1] * axis[1] + covariance[2][2] * axis[2], + ) + ) + + skew = sum(_dot(_sub(vertex, center), axis) ** 3 for vertex in vertices) + if abs(skew) > 1e-12: + if skew < 0: + axis = _scale(axis, -1.0) + else: + largest_component = max(range(3), key=lambda idx: abs(axis[idx])) + if axis[largest_component] < 0: + axis = _scale(axis, -1.0) + return axis, center + + +def assign_intervals(values: Sequence[float], interval_count: int) -> List[List[int]]: + """Assign projection ranks to equal-population axial intervals. + + Rank intervals are more robust than fixed-width intervals for watermark + synchronization because similarity transforms and mild smoothing/noise preserve + most axial order statistics even when the absolute projection range changes. + """ + + if interval_count <= 0: + raise ValueError("interval_count must be positive") + if max(values) - min(values) < 1e-15: + raise ValueError("Degenerate projections; cannot divide vertices into axial intervals") + bins: List[List[int]] = [[] for _ in range(interval_count)] + ranked = sorted(range(len(values)), key=lambda idx: (values[idx], idx)) + total = len(ranked) + for rank, vertex_index in enumerate(ranked): + interval = min(interval_count - 1, rank * interval_count // total) + bins[interval].append(vertex_index) + return bins + + +def _validate_bits(bits: str) -> str: + if not bits or any(bit not in "01" for bit in bits): + raise ValueError("bits must be a non-empty string containing only 0 and 1") + return bits + + +def _validate_mesh_capacity( + mesh: Mesh, + bits: str, + cfg: WatermarkConfig, + allow_sparse: bool = False, +) -> None: + if cfg.repeats <= 0: + raise ValueError("repeats must be positive") + if len(mesh.vertices) < 3: + raise ValueError("mesh must contain at least three vertices") + interval_count = len(bits) * cfg.repeats * 2 + if not allow_sparse and len(mesh.vertices) < interval_count * cfg.min_vertices_per_interval: + raise ValueError( + "not enough vertices for the requested payload/repetition settings: " + f"need at least {interval_count * cfg.min_vertices_per_interval}, got {len(mesh.vertices)}" + ) + + +def _mean(values: Sequence[int]) -> float: + return sum(values) / len(values) + + +def _mean_vertex(vertices: Sequence[Vector]) -> Vector: + inv = 1.0 / len(vertices) + return ( + sum(vertex[0] for vertex in vertices) * inv, + sum(vertex[1] for vertex in vertices) * inv, + sum(vertex[2] for vertex in vertices) * inv, + ) + + +def _dot(a: Vector, b: Vector) -> float: + return a[0] * b[0] + a[1] * b[1] + a[2] * b[2] + + +def _sub(a: Vector, b: Vector) -> Vector: + return (a[0] - b[0], a[1] - b[1], a[2] - b[2]) + + +def _scale(a: Vector, scalar: float) -> Vector: + return (a[0] * scalar, a[1] * scalar, a[2] * scalar) + + +def _normalize(a: Vector) -> Vector: + length = sqrt(_dot(a, a)) + if length < 1e-15: + return (1.0, 0.0, 0.0) + return (a[0] / length, a[1] / length, a[2] / length)