Skip to content

Commit f166cd9

Browse files
fixes
1 parent 0104b9b commit f166cd9

2 files changed

Lines changed: 16 additions & 9 deletions

File tree

packages/bigframes/bigframes/session/bq_caching_executor.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,9 @@ async def _execute_async(
191191
execution_spec: ex_spec.ExecutionSpec,
192192
) -> executor.ExecuteResult:
193193
self._publisher.publish(bigframes.core.events.ExecutionStarted())
194-
maybe_result = await self._try_execute_semi_executors(array_value, execution_spec)
194+
maybe_result = await self._try_execute_semi_executors(
195+
array_value, execution_spec
196+
)
195197
if maybe_result is not None:
196198
return maybe_result
197199
result = await self._execute_bigquery(array_value, execution_spec)
@@ -240,7 +242,7 @@ async def _execute_bigquery(
240242
if not execution_spec.promise_under_10gb:
241243
table = await asyncio.to_thread(
242244
self.storage_manager.create_temp_table,
243-
array_value.schema.to_bigquery()
245+
array_value.schema.to_bigquery(),
244246
)
245247
execution_spec = dataclasses.replace(
246248
execution_spec,
@@ -419,7 +421,8 @@ async def _execute_to_cached_table(
419421
plan, ordering = rewrite.pull_out_order(plan)
420422
destination_table = await asyncio.to_thread(
421423
self.storage_manager.create_temp_table,
422-
plan.schema.to_bigquery(), cluster_cols
424+
plan.schema.to_bigquery(),
425+
cluster_cols,
423426
)
424427
arr_value = bigframes.core.ArrayValue(plan)
425428
execution_spec = ex_spec.ExecutionSpec(
@@ -445,17 +448,20 @@ def _is_trivially_executable(self, array_value: bigframes.core.ArrayValue):
445448
"""
446449
# Once rewriting is available, will want to rewrite before
447450
# evaluating execution cost.
448-
return tree_properties.is_trivially_executable(
449-
self.prepare_plan(array_value.node)
450-
)
451+
simplified_plan = self._prepare_plan_simplify(array_value.node)
452+
return tree_properties.is_trivially_executable(simplified_plan)
451453

452454
def _prepare_plan_simplify(self, plan: nodes.BigFrameNode) -> nodes.BigFrameNode:
455+
"""Prepare the plan by simplifying it with caches and removing unused operators."""
453456
plan = self.cache.subsitute_cached_subplans(plan)
454457
plan = rewrite.column_pruning(plan)
455458
plan = plan.top_down(rewrite.fold_row_counts)
456459
return plan
457460

458-
async def _prepare_plan_bq_execution(self, plan: nodes.BigFrameNode) -> nodes.BigFrameNode:
461+
async def _prepare_plan_bq_execution(
462+
self, plan: nodes.BigFrameNode
463+
) -> nodes.BigFrameNode:
464+
"""Prepare the plan for BigQuery execution by caching subtrees and uploading large local sources."""
459465
if bigframes.options.compute.enable_multi_query_execution:
460466
await self._simplify_with_caching(plan)
461467
plan = self._prepare_plan_simplify(plan)

packages/bigframes/tests/unit/session/test_local_scan_executor.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414
from __future__ import annotations
1515

16+
import asyncio
1617
import pyarrow
1718
import pytest
1819

@@ -76,7 +77,7 @@ def test_local_scan_executor_with_slice(start, stop, expected_rows, object_under
7677
stop=stop,
7778
)
7879

79-
result = object_under_test.execute(plan, SPEC)
80+
result = asyncio.run(object_under_test.execute(plan, SPEC))
8081
result_table = pyarrow.Table.from_batches(result.batches().arrow_batches)
8182
assert result_table.num_rows == expected_rows
8283

@@ -102,4 +103,4 @@ def test_local_scan_executor_with_slice_unsupported_inputs(
102103
stop=stop,
103104
step=step,
104105
)
105-
assert object_under_test.execute(plan, SPEC) is None
106+
assert asyncio.run(object_under_test.execute(plan, SPEC)) is None

0 commit comments

Comments
 (0)