Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 50 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,55 @@
# Tutorials for Stanford cs228 and cs231n
# CS228 Material

Preparatory material for the probabilistic graphical models and the deep learning classes at Stanford.
## 三维模型顶点索引区间数字水印

## Material
本仓库包含一个纯 Python、无第三方依赖的三维网格数字水印实现。算法适用于
OBJ 和 ASCII PLY 文件,核心思想是在稳定轴向上把顶点投影划分为 `n * 2` 个
相邻区间,并用每对相邻区间内顶点索引编号的大小关系表示水印位:

This repo currently holds:
- `0`:左区间顶点索引均值小于右区间;
- `1`:左区间顶点索引均值大于右区间。

* A [tutorial](https://github.com/kuleshov/cs228-material/blob/master/tutorials/python/cs228-python-tutorial.ipynb) on basic Python/Numpy that is necesseary to get started with the above machine learning classes.
嵌入阶段不移动顶点坐标,而是在每个区间对内部重新分配顶点编号,并同步改写
所有面片索引,因此几何形状完全不变。提取阶段重新计算稳定轴、重新分区,然
后比较相邻区间的顶点编号均值。`repeats` 参数提供重复编码和多数投票,用于
抵抗轻微噪声、平滑、简化等导致的分区扰动。

You may follow the iPython notebook on github, or clone and execute it locally.
The notebook is based on an [earlier version](http://cs231n.github.io/python-numpy-tutorial/) prepared by Justin Johnson.
### 方案细化

1. **稳定轴选择**:对顶点坐标做中心化,使用协方差矩阵的主特征向量作为轴向。
主轴方向符号由轴向投影的三阶中心矩固定;高度对称模型则固定最大绝对分量
为正,从而避免 PCA 符号翻转。
2. **区间同步**:将所有顶点投影到稳定轴上,并按投影排序划分为
`2 * len(bits) * repeats` 个等顶点数秩区间。每两个相邻区间组成一个水印承载单元;
使用秩区间比固定宽度区间更不依赖模型尺度和投影端点。
3. **最优编号重排**:对每个区间对取顶点编号并排序。如果目标位为 `0`,把该
对内较小的编号分配给左区间、较大的编号分配给右区间;目标位为 `1` 则反
过来。这样不仅满足均值关系,还满足更强的“全体编号大小分离”关系。
4. **几何无损嵌入**:只改变顶点表顺序和面片引用,不改变任意顶点坐标。
5. **鲁棒提取**:每个原始比特可嵌入多个区间对,提取时多数投票;测试程序内
置相似几何变换、加噪、Laplacian 平滑和简单顶点简化攻击。

> 注意:该类水印依赖顶点编号通道。它天然抵抗保持顶点顺序的几何攻击;若攻
> 击程序完全重新网格化并随机重排顶点,任何仅依赖顶点编号的方案都需要额外
> 的同步码、纠错码或外部注册机制。

### 单个模型嵌入/提取

```bash
python -m watermark3d.cli embed input.obj output.obj --bits 1011001110001011 --repeats 5 --metadata output.json
python -m watermark3d.cli extract output.obj --bit-count 16 --repeats 5 --expected 1011001110001011
```

### 整个目录测试

```bash
python scripts/test_directory.py \
--input-dir ./meshes \
--output-dir ./watermark_out \
--bits 1011001110001011 \
--repeats 5 \
--make-sample-if-empty
```

测试程序会遍历目录内所有 `.obj` 和 `.ply` 文件,输出水印模型、攻击后模型以
及 `watermark_report.json`,其中包含每个模型/攻击的提取比特串和误码率。
107 changes: 107 additions & 0 deletions scripts/test_directory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#!/usr/bin/env python3
from __future__ import annotations

import argparse
import json
import sys
from pathlib import Path

ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))

from watermark3d.attacks import add_noise, affine_transform, laplacian_smooth, vertex_decimate
from watermark3d.mesh import Mesh, load_mesh, save_mesh
from watermark3d.watermark import WatermarkConfig, bit_error_rate, embed_watermark, extract_watermark

SUPPORTED = {".obj", ".ply"}


def main() -> None:
parser = argparse.ArgumentParser(
description="Embed/extract a 3D interval watermark for every OBJ/PLY in a directory."
)
parser.add_argument("--input-dir", required=True, help="directory containing .obj or ASCII .ply meshes")
parser.add_argument("--output-dir", required=True, help="directory for watermarked and attacked meshes")
parser.add_argument("--bits", default="1011001110001011", help="binary watermark payload")
parser.add_argument("--repeats", type=int, default=5, help="interval-pair repetitions per bit")
parser.add_argument("--make-sample-if-empty", action="store_true", help="write a sample OBJ when input-dir has no meshes")
args = parser.parse_args()

input_dir = Path(args.input_dir)
output_dir = Path(args.output_dir)
input_dir.mkdir(parents=True, exist_ok=True)
output_dir.mkdir(parents=True, exist_ok=True)

meshes = sorted(path for path in input_dir.iterdir() if path.suffix.lower() in SUPPORTED)
if not meshes and args.make_sample_if_empty:
sample_path = input_dir / "sample_ellipsoid.obj"
save_mesh(make_sample_ellipsoid(), sample_path)
meshes = [sample_path]
if not meshes:
raise SystemExit("No .obj or .ply meshes found")

cfg = WatermarkConfig(repeats=args.repeats)
attacks = {
"none": lambda mesh: mesh,
"affine": affine_transform,
"noise": add_noise,
"smooth": laplacian_smooth,
"simplify": vertex_decimate,
}

report = []
for mesh_path in meshes:
mesh = load_mesh(mesh_path)
watermarked = embed_watermark(mesh, args.bits, cfg)
base = output_dir / mesh_path.stem
marked_path = base.with_name(base.name + "_watermarked" + mesh_path.suffix)
save_mesh(watermarked, marked_path)

for attack_name, attack in attacks.items():
attacked = attack(watermarked.copy())
attacked_path = base.with_name(base.name + f"_{attack_name}" + mesh_path.suffix)
save_mesh(attacked, attacked_path)
observed = extract_watermark(attacked, len(args.bits), cfg)
report.append(
{
"mesh": str(mesh_path),
"attack": attack_name,
"observed": observed,
"expected": args.bits,
"ber": bit_error_rate(args.bits, observed),
"attacked_path": str(attacked_path),
}
)

report_path = output_dir / "watermark_report.json"
report_path.write_text(json.dumps(report, indent=2), encoding="utf-8")
print(json.dumps(report, indent=2))


def make_sample_ellipsoid(rows: int = 26, cols: int = 52) -> Mesh:
import math

vertices = []
for r in range(rows + 1):
theta = math.pi * r / rows
for c in range(cols):
phi = 2.0 * math.pi * c / cols
ripple = 1.0 + 0.045 * math.sin(3.0 * phi + 0.7 * r) + 0.025 * math.cos(5.0 * theta + phi)
x = 2.4 * ripple * math.sin(theta) * math.cos(phi)
y = 1.0 * ripple * math.sin(theta) * math.sin(phi) + 0.05 * math.sin(2.0 * theta)
z = 0.7 * ripple * math.cos(theta) + 0.18 * x + 0.08 * x * x
vertices.append((x, y, z))
faces = []
for r in range(rows):
for c in range(cols):
a = r * cols + c
b = r * cols + (c + 1) % cols
d = (r + 1) * cols + c
e = (r + 1) * cols + (c + 1) % cols
faces.append([a, d, e, b])
return Mesh(vertices, faces, "obj")


if __name__ == "__main__":
main()
14 changes: 14 additions & 0 deletions watermark3d/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
"""Vertex-index interval watermarking for 3D triangle/polygon meshes."""

from .mesh import Mesh, load_mesh, save_mesh
from .watermark import WatermarkConfig, embed_watermark, extract_watermark, bit_error_rate

__all__ = [
"Mesh",
"load_mesh",
"save_mesh",
"WatermarkConfig",
"embed_watermark",
"extract_watermark",
"bit_error_rate",
]
97 changes: 97 additions & 0 deletions watermark3d/attacks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from __future__ import annotations

import math
import random
from typing import Dict, List, Set

from .mesh import Mesh


def affine_transform(mesh: Mesh) -> Mesh:
"""Apply a deterministic similarity transform plus translation."""

angle = math.radians(31.0)
ca, sa = math.cos(angle), math.sin(angle)
scale = 1.17
transformed = []
for x, y, z in mesh.vertices:
rx = ca * x - sa * y
ry = sa * x + ca * y
rz = z
transformed.append((scale * rx + 0.4, scale * ry - 0.2, scale * rz + 0.8))
return Mesh(transformed, [list(face) for face in mesh.faces], mesh.source_format, mesh.obj_face_suffixes)


def add_noise(mesh: Mesh, strength: float = 0.001, seed: int = 7) -> Mesh:
rng = random.Random(seed)
diagonal = _bbox_diagonal(mesh)
noisy = []
for x, y, z in mesh.vertices:
noisy.append(
(
x + rng.uniform(-strength, strength) * diagonal,
y + rng.uniform(-strength, strength) * diagonal,
z + rng.uniform(-strength, strength) * diagonal,
)
)
return Mesh(noisy, [list(face) for face in mesh.faces], mesh.source_format, mesh.obj_face_suffixes)


def laplacian_smooth(mesh: Mesh, iterations: int = 2, lam: float = 0.25) -> Mesh:
neighbors: List[Set[int]] = [set() for _ in mesh.vertices]
for face in mesh.faces:
for i, current in enumerate(face):
previous = face[i - 1]
following = face[(i + 1) % len(face)]
neighbors[current].add(previous)
neighbors[current].add(following)
vertices = list(mesh.vertices)
for _ in range(iterations):
updated = list(vertices)
for idx, vertex_neighbors in enumerate(neighbors):
if not vertex_neighbors:
continue
inv = 1.0 / len(vertex_neighbors)
avg = (
sum(vertices[j][0] for j in vertex_neighbors) * inv,
sum(vertices[j][1] for j in vertex_neighbors) * inv,
sum(vertices[j][2] for j in vertex_neighbors) * inv,
)
x, y, z = vertices[idx]
updated[idx] = (
x * (1.0 - lam) + avg[0] * lam,
y * (1.0 - lam) + avg[1] * lam,
z * (1.0 - lam) + avg[2] * lam,
)
vertices = updated
return Mesh(vertices, [list(face) for face in mesh.faces], mesh.source_format, mesh.obj_face_suffixes)


def vertex_decimate(mesh: Mesh, keep_ratio: float = 0.85) -> Mesh:
"""A simple order-preserving simplifier for robustness testing."""

if not 0.0 < keep_ratio <= 1.0:
raise ValueError("keep_ratio must be in (0, 1]")
keep_count = max(3, int(len(mesh.vertices) * keep_ratio))
keep = set(round(i * (len(mesh.vertices) - 1) / (keep_count - 1)) for i in range(keep_count))
mapping: Dict[int, int] = {}
vertices = []
for old_index, vertex in enumerate(mesh.vertices):
if old_index in keep:
mapping[old_index] = len(vertices)
vertices.append(vertex)
faces = []
suffixes = []
for face_index, face in enumerate(mesh.faces):
if all(index in mapping for index in face) and len(set(face)) == len(face):
faces.append([mapping[index] for index in face])
if mesh.obj_face_suffixes and face_index < len(mesh.obj_face_suffixes):
suffixes.append(list(mesh.obj_face_suffixes[face_index]))
return Mesh(vertices, faces, mesh.source_format, suffixes if mesh.obj_face_suffixes is not None else None)


def _bbox_diagonal(mesh: Mesh) -> float:
xs = [v[0] for v in mesh.vertices]
ys = [v[1] for v in mesh.vertices]
zs = [v[2] for v in mesh.vertices]
return math.sqrt((max(xs) - min(xs)) ** 2 + (max(ys) - min(ys)) ** 2 + (max(zs) - min(zs)) ** 2)
52 changes: 52 additions & 0 deletions watermark3d/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from __future__ import annotations

import argparse
import json
from pathlib import Path

from .mesh import load_mesh, save_mesh
from .watermark import WatermarkConfig, bit_error_rate, embed_watermark, extract_watermark


def main() -> None:
parser = argparse.ArgumentParser(description="3D vertex-index interval watermarking")
subparsers = parser.add_subparsers(dest="command", required=True)

embed = subparsers.add_parser("embed", help="embed bits into one mesh")
embed.add_argument("input")
embed.add_argument("output")
embed.add_argument("--bits", required=True)
embed.add_argument("--repeats", type=int, default=5)
embed.add_argument("--metadata", help="optional JSON metadata output path")

extract = subparsers.add_parser("extract", help="extract bits from one mesh")
extract.add_argument("input")
extract.add_argument("--bit-count", type=int, required=True)
extract.add_argument("--repeats", type=int, default=5)
extract.add_argument("--expected", help="optional expected bit string for BER reporting")

args = parser.parse_args()
cfg = WatermarkConfig(repeats=args.repeats)

if args.command == "embed":
mesh = load_mesh(args.input)
watermarked = embed_watermark(mesh, args.bits, cfg)
save_mesh(watermarked, args.output)
if args.metadata:
Path(args.metadata).write_text(
json.dumps(
{"bits": args.bits, "bit_count": len(args.bits), "repeats": args.repeats},
indent=2,
),
encoding="utf-8",
)
elif args.command == "extract":
mesh = load_mesh(args.input)
observed = extract_watermark(mesh, args.bit_count, cfg)
print(observed)
if args.expected:
print(f"BER={bit_error_rate(args.expected, observed):.6f}")


if __name__ == "__main__":
main()
Loading