-
Notifications
You must be signed in to change notification settings - Fork 148
SNOW-3484790: initialize aggregation functions list during SCOS init #4217
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -856,8 +856,10 @@ def __init__( | |
| self._dataframe_profiler = DataframeProfiler(session=self) | ||
| self._catalog = None | ||
| self._client_telemetry = EventTableTelemetry(session=self) | ||
| self._agg_function_prefetch_job: Optional[AsyncJob] = None | ||
|
|
||
| self._ast_batch = AstBatch(self) | ||
| self._start_async_aggregation_prefetch_if_needed() | ||
|
|
||
| _logger.info("Snowpark Session information: %s", self._session_info) | ||
|
|
||
|
|
@@ -5055,43 +5057,89 @@ def _retrieve_aggregation_function_list(self) -> None: | |
| return | ||
|
|
||
| retrieved_set = set() | ||
| system_fetch_succeeded = False | ||
|
|
||
| # User-defined aggregation functions | ||
| try: | ||
| retrieved_set.update( | ||
| { | ||
| r[0].lower() | ||
| for r in self.sql( | ||
| """select function_name from information_schema.functions where is_aggregate = 'YES'""" | ||
| ).collect() | ||
| } | ||
| ) | ||
| except Exception as e: | ||
| # Try async result first if prefetch was already started. | ||
| if self._agg_function_prefetch_job is not None: | ||
| try: | ||
| retrieved_set.update( | ||
| {r[0].lower() for r in self._agg_function_prefetch_job.result()} | ||
| ) | ||
| system_fetch_succeeded = True | ||
| except Exception as e: | ||
| _logger.debug( | ||
| "Unable to use async aggregation function prefetch: %s", | ||
| e, | ||
| ) | ||
| finally: | ||
| self._agg_function_prefetch_job = None | ||
| else: | ||
| _logger.debug( | ||
| "Unable to get user-defined aggregation functions: %s", | ||
| e, | ||
| "Async aggregation function prefetch job is unavailable; using sync fallback." | ||
| ) | ||
|
|
||
| # System built-in aggregation functions | ||
| # Sync fallback query. | ||
| if not system_fetch_succeeded: | ||
| try: | ||
| retrieved_set.update( | ||
| { | ||
| r[0].lower() | ||
| for r in self._conn.run_query( | ||
| """show functions ->> select "name" from $1 where "is_aggregate" = 'Y'""", | ||
| _is_internal=True, | ||
| )["data"] | ||
| } | ||
| ) | ||
| system_fetch_succeeded = True | ||
| except Exception as e: | ||
| _logger.debug( | ||
| "Unable to get aggregation functions via sync fallback query: %s", | ||
| e, | ||
| ) | ||
|
|
||
| # Fallback to the local hardcoded list only when metadata retrieval fails. | ||
| if not system_fetch_succeeded: | ||
| retrieved_set.update(context._KNOWN_AGGREGATION_FUNCTIONS) | ||
|
|
||
| with context._aggregation_function_set_lock: | ||
| context._aggregation_function_set.update(retrieved_set) | ||
|
|
||
| def _start_async_aggregation_prefetch_if_needed(self) -> None: | ||
| """Start aggregation metadata prefetch only when not already in progress.""" | ||
| if not ( | ||
| context._is_snowpark_connect_compatible_mode | ||
| and context._snowpark_connect_flatten_select_after_sort | ||
| ): | ||
| return | ||
| if context._aggregation_function_set: | ||
| return | ||
| if self._agg_function_prefetch_job is not None: | ||
| return | ||
|
|
||
| try: | ||
| retrieved_set.update( | ||
| { | ||
| r[0].lower() | ||
| for r in self.sql( | ||
| """show functions ->> select "name" from $1 where "is_aggregate" = 'Y'""" | ||
|
Comment on lines
-5080
to
-5081
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My understanding is that queries like this ensure the server is always the source of truth, and we don't have to make client code changes that are aligned with whatever Snowflake version is being run on the server. Hard-coding it to a local change is something of a philosophical change, and may unexpectedly break workloads in some circumstances. It looks to only ever get run once per workload anyway, so I think the benefits of this would be pretty minimal.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had a discussion with Adam and Yijun this morning. One of the issue is that this is always run when there are potentially agg function in the query, which means user would still see this even if they just issued 1 query in SCOS. To solve the problem that this list may diverge from server, I created a followup ticket: https://snowflakecomputing.atlassian.net/browse/SNOW-3489271 Currently my thoughts is adding a step to release pipeline to ensure that each snowpark release's system function list is up-to-date at the point it released. Thus user would only need to use latest snowpark to avoid the mis align issue. But I think it also make sense to add parameter protection for this. Will add one
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Parameter-protecting makes sense. I'm slightly worried about server divergence because the same client release might get run against different version backends (I'm not sure if we the N-1 version policy still exists, but at the very least testing vs. prod environments, and customers on different release cadences might get different versions) |
||
| ).collect() | ||
| } | ||
| self._agg_function_prefetch_job = self._submit_internal_async_prefetch_query( | ||
| """show functions ->> select "name" from $1 where "is_aggregate" = 'Y' | ||
| union | ||
| select function_name from information_schema.functions where is_aggregate = 'YES'""" | ||
| ) | ||
| except Exception as e: | ||
| except Exception as e: # pragma: no cover | ||
| _logger.debug( | ||
| "Unable to get system aggregation functions, " | ||
| "falling back to hardcoded list: %s", | ||
| "Unable to start async aggregation metadata prefetch: %s", | ||
| e, | ||
| ) | ||
| retrieved_set.update(context._KNOWN_AGGREGATION_FUNCTIONS) | ||
| self._agg_function_prefetch_job = None | ||
|
|
||
| with context._aggregation_function_set_lock: | ||
| context._aggregation_function_set.update(retrieved_set) | ||
| def _submit_internal_async_prefetch_query(self, query: str) -> Optional[AsyncJob]: | ||
| """Submit a prefetch query as internal async and return an AsyncJob handle.""" | ||
| try: | ||
| result = self._conn.execute_async_and_notify_query_listener( | ||
| query, | ||
| _is_internal=True, | ||
| ) | ||
| return self.create_async_job(result["queryId"]) | ||
| except Exception as e: # pragma: no cover | ||
| _logger.debug("Unable to submit internal async prefetch query: %s", e) | ||
| return None | ||
|
|
||
| def directory(self, stage_name: str, _emit_ast: bool = True) -> DataFrame: | ||
| """ | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical Bug: User-defined aggregation functions are missing in sync fallback
The sync fallback query only retrieves system aggregation functions via
show functions, but the async prefetch query (lines 5121-5123) uses a UNION to retrieve both system AND user-defined aggregation functions frominformation_schema.functions. This inconsistency means:This will cause production issues where user-defined aggregation functions work inconsistently depending on whether the async query succeeded or failed.
Fix:
Spotted by Graphite

Is this helpful? React 👍 or 👎 to let us know.