Skip to content

Commit b1468be

Browse files
chenliu0831claude
andcommitted
Unify multi-engine API and fix flaky tests
- All runners take engine in constructor, onData(table=, dataframe=) for data binding - Spark protobuf builders become private (_Spark*RunBuilder) - Refactor connect() to use isinstance() instead of string matching - Fix ApproxQuantile alias collision and metric name mismatch - Scale up test datasets for reliable HLL tolerance validation Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent a17bfa0 commit b1468be

24 files changed

Lines changed: 1005 additions & 1565 deletions

README.md

Lines changed: 57 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ pip install pydeequ[duckdb]
137137
```python
138138
import duckdb
139139
import pydeequ
140+
from pydeequ.v2.verification import AnalysisRunner, VerificationSuite
140141
from pydeequ.v2.analyzers import Size, Completeness, Mean
141142
from pydeequ.v2.checks import Check, CheckLevel
142143
from pydeequ.v2.predicates import eq, gte
@@ -152,18 +153,19 @@ con.execute("""
152153
""")
153154

154155
# Create an engine from the connection
155-
engine = pydeequ.connect(con, table="users")
156+
engine = pydeequ.connect(con)
156157

157158
# Run analyzers
158-
metrics = engine.compute_metrics([
159-
Size(),
160-
Completeness("id"),
161-
Completeness("age"),
162-
Mean("age"),
163-
])
159+
result = (AnalysisRunner(engine)
160+
.onData(table="users")
161+
.addAnalyzer(Size())
162+
.addAnalyzer(Completeness("id"))
163+
.addAnalyzer(Completeness("age"))
164+
.addAnalyzer(Mean("age"))
165+
.run())
166+
164167
print("Metrics:")
165-
for m in metrics:
166-
print(f" {m.name}({m.instance}): {m.value}")
168+
print(result.to_string(index=False))
167169

168170
# Run constraint checks
169171
check = (Check(CheckLevel.Error, "Data quality checks")
@@ -172,16 +174,13 @@ check = (Check(CheckLevel.Error, "Data quality checks")
172174
.isComplete("name")
173175
.hasCompleteness("age", gte(0.5)))
174176

175-
results = engine.run_checks([check])
176-
print("\nConstraint Results:")
177-
for r in results:
178-
print(f" {r.constraint}: {r.constraint_status}")
177+
result = (VerificationSuite(engine)
178+
.onData(table="users")
179+
.addCheck(check)
180+
.run())
179181

180-
# Profile columns
181-
profiles = engine.profile_columns()
182-
print("\nColumn Profiles:")
183-
for p in profiles:
184-
print(f" {p.column}: completeness={p.completeness}, distinct={p.approx_distinct_values}")
182+
print("\nConstraint Results:")
183+
print(result.to_string(index=False))
185184

186185
con.close()
187186
```
@@ -269,12 +268,14 @@ pip install pydeequ[spark]
269268

270269
```python
271270
from pyspark.sql import SparkSession, Row
271+
import pydeequ
272272
from pydeequ.v2.checks import Check, CheckLevel
273273
from pydeequ.v2.verification import VerificationSuite
274274
from pydeequ.v2.predicates import eq, gte
275275

276276
# Connect to Spark Connect server
277277
spark = SparkSession.builder.remote("sc://localhost:15002").getOrCreate()
278+
engine = pydeequ.connect(spark)
278279

279280
# Create sample data
280281
df = spark.createDataFrame([
@@ -292,12 +293,12 @@ check = (Check(CheckLevel.Error, "Data quality checks")
292293
.isUnique("id"))
293294

294295
# Run verification
295-
result = (VerificationSuite(spark)
296-
.onData(df)
296+
result = (VerificationSuite(engine)
297+
.onData(dataframe=df)
297298
.addCheck(check)
298299
.run())
299300

300-
result.show(truncate=False)
301+
print(result.to_string(index=False))
301302
spark.stop()
302303
```
303304

@@ -344,14 +345,21 @@ from pydeequ.v2.analyzers import (
344345
Uniqueness, Entropy, Correlation
345346
)
346347

347-
result = (AnalysisRunner(spark)
348-
.onData(df)
348+
# DuckDB
349+
result = (AnalysisRunner(engine)
350+
.onData(table="users")
349351
.addAnalyzer(Size())
350352
.addAnalyzer(Completeness("name"))
351353
.addAnalyzer(Mean("age"))
352354
.run())
353355

354-
result.show()
356+
# Spark
357+
result = (AnalysisRunner(engine)
358+
.onData(dataframe=df)
359+
.addAnalyzer(Size())
360+
.addAnalyzer(Completeness("name"))
361+
.addAnalyzer(Mean("age"))
362+
.run())
355363
```
356364

357365
### Constraint Methods
@@ -386,26 +394,21 @@ result.show()
386394
Profile column distributions and statistics across your dataset:
387395

388396
```python
389-
from pydeequ.v2.profiles import ColumnProfilerRunner, KLLParameters
397+
from pydeequ.v2.profiles import ColumnProfilerRunner
390398

391399
# Basic profiling
392-
profiles = (ColumnProfilerRunner(spark)
393-
.onData(df)
400+
profiles = (ColumnProfilerRunner(engine)
401+
.onData(table="users") # DuckDB: use table=
402+
# .onData(dataframe=df) # Spark: use dataframe=
394403
.run())
395404

396-
profiles.show()
397-
398-
# Advanced profiling with options
399-
profiles = (ColumnProfilerRunner(spark)
400-
.onData(df)
401-
.restrictToColumns(["id", "name", "age"]) # Profile specific columns
402-
.withLowCardinalityHistogramThreshold(100) # Generate histograms for low-cardinality columns
403-
.withKLLProfiling() # Enable KLL sketch for approximate quantiles
404-
.setKLLParameters(KLLParameters(
405-
sketch_size=2048,
406-
shrinking_factor=0.64,
407-
num_buckets=64
408-
))
405+
print(profiles)
406+
407+
# With options
408+
profiles = (ColumnProfilerRunner(engine)
409+
.onData(table="users")
410+
.restrictToColumns(["id", "name", "age"])
411+
.withLowCardinalityHistogramThreshold(100)
409412
.run())
410413
```
411414

@@ -436,21 +439,13 @@ Auto-generate data quality constraints based on your data:
436439
from pydeequ.v2.suggestions import ConstraintSuggestionRunner, Rules
437440

438441
# Basic suggestion generation
439-
suggestions = (ConstraintSuggestionRunner(spark)
440-
.onData(df)
442+
suggestions = (ConstraintSuggestionRunner(engine)
443+
.onData(table="users") # DuckDB: use table=
444+
# .onData(dataframe=df) # Spark: use dataframe=
441445
.addConstraintRules(Rules.DEFAULT)
442446
.run())
443447

444-
suggestions.show(truncate=False)
445-
446-
# Advanced usage with train/test evaluation
447-
suggestions = (ConstraintSuggestionRunner(spark)
448-
.onData(df)
449-
.addConstraintRules(Rules.DEFAULT)
450-
.addConstraintRules(Rules.EXTENDED)
451-
.restrictToColumns(["id", "status", "score"])
452-
.useTrainTestSplitWithTestsetRatio(0.2, seed=42) # Evaluate suggestions on test set
453-
.run())
448+
print(suggestions)
454449
```
455450

456451
**Available Rule Sets:**
@@ -509,10 +504,12 @@ result = ColumnProfilerRunner(spark).onData(df).run()
509504
for col, profile in result.profiles.items():
510505
print(profile)
511506

512-
# After (2.0) - returns DataFrame
507+
# After (2.0) - unified engine API
508+
import pydeequ
513509
from pydeequ.v2.profiles import ColumnProfilerRunner
514-
result = ColumnProfilerRunner(spark).onData(df).run()
515-
result.show()
510+
engine = pydeequ.connect(spark)
511+
result = ColumnProfilerRunner(engine).onData(dataframe=df).run()
512+
print(result)
516513
```
517514

518515
**Suggestions changes:**
@@ -522,10 +519,12 @@ from pydeequ.suggestions import ConstraintSuggestionRunner, DEFAULT
522519
result = ConstraintSuggestionRunner(spark).onData(df).addConstraintRule(DEFAULT()).run()
523520
print(result)
524521

525-
# After (2.0) - returns DataFrame
522+
# After (2.0) - unified engine API
523+
import pydeequ
526524
from pydeequ.v2.suggestions import ConstraintSuggestionRunner, Rules
527-
result = ConstraintSuggestionRunner(spark).onData(df).addConstraintRules(Rules.DEFAULT).run()
528-
result.show()
525+
engine = pydeequ.connect(spark)
526+
result = ConstraintSuggestionRunner(engine).onData(dataframe=df).addConstraintRules(Rules.DEFAULT).run()
527+
print(result)
529528
```
530529

531530
---

benchmark/experiments.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ def benchmark_duckdb_validation(engine: Any, check: Check, n_runs: int) -> float
183183
times = []
184184
for _ in range(n_runs):
185185
start = time.perf_counter()
186-
result = VerificationSuite().on_engine(engine).addCheck(check).run()
186+
result = VerificationSuite(engine).onData(table="benchmark_data").addCheck(check).run()
187187
_ = len(result)
188188
elapsed = time.perf_counter() - start
189189
times.append(elapsed)
@@ -195,7 +195,7 @@ def benchmark_duckdb_profiling(engine: Any, n_runs: int) -> float:
195195
times = []
196196
for _ in range(n_runs):
197197
start = time.perf_counter()
198-
result = ColumnProfilerRunner().on_engine(engine).run()
198+
result = ColumnProfilerRunner(engine).onData(table="benchmark_data").run()
199199
_ = len(result)
200200
elapsed = time.perf_counter() - start
201201
times.append(elapsed)
@@ -230,23 +230,25 @@ def load_spark_from_parquet(spark: Any, parquet_path: str) -> Tuple[Any, float]:
230230

231231
def benchmark_spark_validation(spark: Any, spark_df: Any, check: Check, n_runs: int) -> float:
232232
"""Time Spark VerificationSuite.run() over N runs, return average."""
233+
engine = pydeequ.connect(spark)
233234
times = []
234235
for _ in range(n_runs):
235236
start = time.perf_counter()
236-
result = VerificationSuite(spark).onData(spark_df).addCheck(check).run()
237-
_ = result.collect()
237+
result = VerificationSuite(engine).onData(dataframe=spark_df).addCheck(check).run()
238+
_ = len(result)
238239
elapsed = time.perf_counter() - start
239240
times.append(elapsed)
240241
return sum(times) / len(times)
241242

242243

243244
def benchmark_spark_profiling(spark: Any, spark_df: Any, n_runs: int) -> float:
244245
"""Time Spark ColumnProfilerRunner.run() over N runs, return average."""
246+
engine = pydeequ.connect(spark)
245247
times = []
246248
for _ in range(n_runs):
247249
start = time.perf_counter()
248-
result = ColumnProfilerRunner(spark).onData(spark_df).run()
249-
_ = result.collect()
250+
result = ColumnProfilerRunner(engine).onData(dataframe=spark_df).run()
251+
_ = len(result)
250252
elapsed = time.perf_counter() - start
251253
times.append(elapsed)
252254
return sum(times) / len(times)

pydeequ/__init__.py

Lines changed: 38 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,30 @@
1717
For PyDeequ 2.0 with DuckDB (no Spark required):
1818
import duckdb
1919
import pydeequ
20+
from pydeequ.v2.verification import AnalysisRunner
2021
from pydeequ.v2.analyzers import Size, Completeness
2122
2223
con = duckdb.connect()
2324
con.execute("CREATE TABLE test AS SELECT 1 as id")
24-
engine = pydeequ.connect(con, table="test")
25+
engine = pydeequ.connect(con)
2526
26-
For PyDeequ 2.0 (Spark Connect), use:
27-
from pydeequ.v2 import VerificationSuite, Check, CheckLevel
28-
from pydeequ.v2.predicates import eq, gte
27+
result = (AnalysisRunner(engine)
28+
.onData(table="test")
29+
.addAnalyzer(Size())
30+
.run())
31+
32+
For PyDeequ 2.0 (Spark Connect):
33+
from pyspark.sql import SparkSession
34+
import pydeequ
35+
from pydeequ.v2.verification import VerificationSuite
36+
37+
spark = SparkSession.builder.remote("sc://localhost:15002").getOrCreate()
38+
engine = pydeequ.connect(spark)
39+
40+
result = (VerificationSuite(engine)
41+
.onData(dataframe=df)
42+
.addCheck(check)
43+
.run())
2944
3045
For PyDeequ 1.x (Legacy Py4J), set SPARK_VERSION env var and use:
3146
from pydeequ import deequ_maven_coord
@@ -49,10 +64,14 @@ def connect(
4964
- DuckDB connections (duckdb.DuckDBPyConnection) - runs locally
5065
- Spark sessions (pyspark.sql.SparkSession) - uses Spark Connect
5166
67+
The returned engine is passed to runner constructors. Use
68+
``onData(table=...)`` or ``onData(dataframe=...)`` on the runner to
69+
bind data for each run.
70+
5271
Args:
5372
connection: A database connection or Spark session
54-
table: Table name for SQL-based backends (required for DuckDB)
55-
dataframe: DataFrame for Spark backend (alternative to table)
73+
table: Optional table name (can also be specified via onData)
74+
dataframe: Optional DataFrame (can also be specified via onData)
5675
5776
Returns:
5877
An engine instance appropriate for the connection type
@@ -61,18 +80,19 @@ def connect(
6180
ValueError: If connection type is not supported
6281
6382
Example:
64-
# DuckDB (local, no Spark required)
65-
import duckdb
66-
import pydeequ
67-
68-
con = duckdb.connect()
69-
con.execute("CREATE TABLE reviews AS SELECT * FROM 'reviews.csv'")
70-
engine = pydeequ.connect(con, table="reviews")
71-
72-
# Spark Connect
73-
from pyspark.sql import SparkSession
74-
spark = SparkSession.builder.remote("sc://localhost:15002").getOrCreate()
75-
engine = pydeequ.connect(spark, dataframe=df)
83+
# DuckDB
84+
engine = pydeequ.connect(con)
85+
result = (AnalysisRunner(engine)
86+
.onData(table="reviews")
87+
.addAnalyzer(Size())
88+
.run())
89+
90+
# Spark
91+
engine = pydeequ.connect(spark)
92+
result = (VerificationSuite(engine)
93+
.onData(dataframe=df)
94+
.addCheck(check)
95+
.run())
7696
"""
7797
from pydeequ.engines import connect as engines_connect
7898
return engines_connect(connection, table=table, dataframe=dataframe)

0 commit comments

Comments
 (0)