[SPARK-37711][PS] Reduce pandas describe job count from O(N) to O(1)#54370
[SPARK-37711][PS] Reduce pandas describe job count from O(N) to O(1)#54370devin-petersohn wants to merge 7 commits intoapache:masterfrom
Conversation
I generated some benchmarks for new implementation and compared against the old implementation. The performance numbers are show below. - **Row counts:** 1,000 and 10,000 - **Column counts:** 2, 5, 10, 20, 40, 100 - **Data distribution:** Random uniform distribution over 10 distinct values per column - **Total tests:** 11 configurations | Rows | Columns | Old Time | New Time | Speedup | Time Saved | Improvement | Jobs (Old→New) | Jobs Saved | |---------|---------|----------|----------|----------|------------|-------------|----------------|------------| | 1,000 | **1** | **0.125s** | **0.188s** | **0.66x** | **-0.063s** | **-50.6%** | **2 → 3** | **-1** | | 1,000 | 2 | 0.226s | 0.233s | 0.97x | -0.007s | -2.9% | 4 → 3 | 1 | | 1,000 | 5 | 0.501s | 0.225s | 2.23x | 0.276s | 55.1% | 10 → 3 | 7 | | 1,000 | 10 | 0.861s | 0.351s | 2.46x | 0.511s | 59.3% | 20 → 3 | 17 | | 1,000 | 20 | 1.539s | 0.418s | 3.68x | 1.120s | 72.8% | 40 → 3 | 37 | | 1,000 | 40 | 3.176s | 0.514s | 6.18x | 2.662s | 83.8% | 80 → 3 | 77 | | 1,000 | 100 | 7.483s | 0.586s | 12.77x | 6.897s | 92.2% | 200 → 3 | 197 | | 10,000 | **1** | **0.073s** | **0.111s** | **0.66x** | **-0.038s** | **-51.9%** | **2 → 3** | **-1** | | 10,000 | 5 | 0.362s | 0.194s | 1.87x | 0.168s | 46.5% | 10 → 3 | 7 | | 10,000 | 10 | 1.446s | 0.257s | 5.61x | 1.188s | 82.2% | 20 → 3 | 17 | | 10,000 | 20 | 1.424s | 0.382s | 3.72x | 1.041s | 73.1% | 40 → 3 | 37 | | 10,000 | 40 | 3.171s | 0.521s | 6.09x | 2.650s | 83.6% | 80 → 3 | 77 | | 10,000 | 100 | 10.953s | 1.163s | 9.41x | 9.789s | 89.4% | 200 → 3 | 197 | **Aggregate Statistics:** - Average speedup: 4.33x - Average improvement: 48.7% - Average jobs saved: 54.2 per operation - Maximum speedup: 12.77x (100 columns) - **Regression case: 0.66x for N=1** (new approach is 50% slower) Signed-off-by: Devin Petersohn <devin.petersohn@gmail.com> Co-authored-by: Devin Petersohn <devin.petersohn@snowflake.com>
holdenk
left a comment
There was a problem hiding this comment.
I like this :) Quick first look, haven't had a chance for a proper review yet but love to see these old TODOs getting fixed :)
There was a problem hiding this comment.
Reason for dropping the coment?
There was a problem hiding this comment.
I can add the comments back. I removed them because I felt the comments didn't add extra context over the code (code is straightforward IMO).
| # Unfortunately, there's no straightforward way to get the top value and its frequency | ||
| # for each column without collecting the data to the driver side. |
There was a problem hiding this comment.
Note for the future: This seems like a good follow up issue, I think we could do something smarter here long term. I've been thinking about some kind of bounded collection types for aggregations and this might fit. (although tbf describe isn't used all that often, but would love to put these together if we can). They do still end up being large but on the executors and the final driver part is a bit smaller.
|
cc @gaogaotiantian FYI |
|
The improvement for multiple columns is great. A few questions:
|
Signed-off-by: Devin Petersohn <devin.petersohn@gmail.com>
So eventually we'll have to collect the summaries back to the driver but the summaries will all have to fit anyways even when were executing in a loop since we store them in Python and display them. If someone had a silly number of columns this could maybe be an issue, but the old approach wouldn't work well either. There might be a bit of extra data during the final merge steps when we're merging the aggregate objects but if that ever became an issue we could look at treeReduce (but again this would likely only happen in a degenerate case where the current implementation would also not behave well). |
There was a problem hiding this comment.
Thanks for working on this long-standing TODO!
We have some similar cases on per-column computation 1, 2, and I feel the expensive window operation is unnecessary, and the implementation might be simplified like (I haven't check it but I think the solution is like this)
# Get `top` & `freq` for each columns
rows = (
sdf.select(
F.posexplode(F.array(*[scol.cast("string") for scol in exprs_string])).alias(
"idx", "str_value"
)
)
.groupby("idx", "str_value")
.agg(F.negative(F.count()).alias("neg_count"))
.groupby("idx")
.agg(F.min(F.struct("neg_count", "str_value").alias("s")))
.sort("idx")
.collect()
)
tops = [r.s.str_value for r in rows]
freqs = [-r.s.neg_count for r in rows]
| freqs.append(str(freq)) | ||
|
|
||
| n_cols = len(column_names) | ||
| stack_args = ", ".join([f"'{col_name}', `{col_name}`" for col_name in column_names]) |
There was a problem hiding this comment.
existing implementation is using exprs_string: List[Column], is it equivalent to column_names here?
There was a problem hiding this comment.
Good catch. They are equivalent, but I agree it was confusing. Switched to using posexplode on exprs_string directly so the unpivot no longer depends on column_names similar to the other implementations.
- Switch from stack/window to posexplode + struct min for top/freq - Restore section comments Co-authored-by: Devin Petersohn <devin.petersohn@snowflake.com> Signed-off-by: Devin Petersohn <devin.petersohn@gmail.com>
@gaogaotiantian I personally don't think it's worth the additional code branch, but if others have strong opinions I'm open to it. In absolute terms it is in the 10's to 100's of milliseconds of overhead, and that is just the job scheduling overhead which won't change as data scales. |
|
I don't have a super strong opinion about it, but 10~100 ms is probably already recognizable by human. I think the question here is how often the user will use this function with just a single column. If it's a super rare case, we don't need to care about that. If it's quite common, a fast path might be worthwhile. |
|
@devin-petersohn I think we should optimize the performance of single column cases with a fastpast, because spark/python/pyspark/pandas/series.py Lines 4344 to 4346 in 9e6b257 |
Co-authored-by: Devin Petersohn <devin.petersohn@snowflake.com> Signed-off-by: Devin Petersohn <devin.petersohn@gmail.com>
|
thanks all, merged to mater |
I generated some benchmarks for new implementation and compared against the old implementation. The performance numbers are show below.
Aggregate Statistics:
What changes were proposed in this pull request?
Fixes describe for string-only dataframes to have a fixed number of jobs rather than one job per column
Why are the changes needed?
Performance
Does this PR introduce any user-facing change?
No
How was this patch tested?
CI
Was this patch authored or co-authored using generative AI tooling?
Co-authored-by: Claude Sonnet 4.5