Skip to content

Commit f13e16d

Browse files
more threaded config fixes
1 parent 20044ae commit f13e16d

9 files changed

Lines changed: 99 additions & 68 deletions

File tree

packages/bigframes/bigframes/core/blocks.py

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -277,9 +277,7 @@ def shape(self) -> typing.Tuple[int, int]:
277277
row_count = (
278278
self.session._executor.execute(
279279
self.expr.row_count(),
280-
execution_spec.ExecutionSpec(
281-
promise_under_10gb=True, ordered=False
282-
).with_current_configuration(),
280+
execution_spec.ExecutionSpec(promise_under_10gb=True, ordered=False),
283281
)
284282
.batches()
285283
.to_py_scalar()
@@ -592,7 +590,7 @@ def to_arrow(
592590
execution_spec.ExecutionSpec(
593591
promise_under_10gb=under_10gb,
594592
ordered=ordered,
595-
).with_current_configuration(),
593+
),
596594
)
597595
pa_table = execute_result.batches().to_arrow_table()
598596

@@ -686,9 +684,7 @@ def try_peek(
686684
)
687685
result = self.session._executor.execute(
688686
self.expr,
689-
execution_spec.ExecutionSpec(
690-
promise_under_10gb=under_10gb, peek=n
691-
).with_current_configuration(),
687+
execution_spec.ExecutionSpec(promise_under_10gb=under_10gb, peek=n),
692688
)
693689
df = result.batches().to_pandas()
694690
return self._copy_index_to_pandas(df)
@@ -717,7 +713,7 @@ def to_pandas_batches(
717713
execution_spec.ExecutionSpec(
718714
promise_under_10gb=under_10gb,
719715
ordered=True,
720-
).with_current_configuration(),
716+
),
721717
)
722718
result_batches = execution_result.batches()
723719

@@ -781,7 +777,7 @@ def _materialize_local(
781777
execution_spec.ExecutionSpec(
782778
promise_under_10gb=under_10gb,
783779
ordered=materialize_options.ordered,
784-
).with_current_configuration(),
780+
),
785781
)
786782
result_batches = execute_result.batches()
787783

@@ -1625,15 +1621,15 @@ def retrieve_repr_request_results(
16251621
execution_spec.ExecutionSpec(
16261622
promise_under_10gb=True,
16271623
ordered=True,
1628-
).with_current_configuration(),
1624+
),
16291625
)
16301626
row_count = (
16311627
self.session._executor.execute(
16321628
self.expr.row_count(),
16331629
execution_spec.ExecutionSpec(
16341630
promise_under_10gb=True,
16351631
ordered=False,
1636-
).with_current_configuration(),
1632+
),
16371633
)
16381634
.batches()
16391635
.to_py_scalar()

packages/bigframes/bigframes/core/indexes/base.py

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -294,9 +294,7 @@ def get_loc(self, key) -> typing.Union[int, slice, "bigframes.series.Series"]:
294294
count_scalar = (
295295
self._block.session._executor.execute(
296296
count_result,
297-
ex_spec.ExecutionSpec(
298-
promise_under_10gb=True
299-
).with_current_configuration(),
297+
ex_spec.ExecutionSpec(promise_under_10gb=True),
300298
)
301299
.batches()
302300
.to_py_scalar()
@@ -312,9 +310,7 @@ def get_loc(self, key) -> typing.Union[int, slice, "bigframes.series.Series"]:
312310
position_scalar = (
313311
self._block.session._executor.execute(
314312
position_result,
315-
ex_spec.ExecutionSpec(
316-
promise_under_10gb=True
317-
).with_current_configuration(),
313+
ex_spec.ExecutionSpec(promise_under_10gb=True),
318314
)
319315
.batches()
320316
.to_py_scalar()
@@ -353,9 +349,7 @@ def _get_monotonic_slice(
353349
result_df = (
354350
self._block.session._executor.execute(
355351
combined_result,
356-
execution_spec=ex_spec.ExecutionSpec(
357-
promise_under_10gb=True
358-
).with_current_configuration(),
352+
execution_spec=ex_spec.ExecutionSpec(promise_under_10gb=True),
359353
)
360354
.batches()
361355
.to_pandas()

packages/bigframes/bigframes/dataframe.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4202,7 +4202,7 @@ def to_csv(
42024202
ex_spec.GcsOutputSpec(
42034203
uri=path_or_buf, format="csv", export_options=tuple(options.items())
42044204
)
4205-
).with_current_configuration(),
4205+
),
42064206
)
42074207
self._set_internal_query_job(result.query_job)
42084208
return None
@@ -4251,7 +4251,7 @@ def to_json(
42514251
export_array.rename_columns(id_overrides),
42524252
ex_spec.ExecutionSpec(
42534253
ex_spec.GcsOutputSpec(uri=path_or_buf, format="json", export_options=())
4254-
).with_current_configuration(),
4254+
),
42554255
)
42564256
self._set_internal_query_job(result.query_job)
42574257
return None
@@ -4334,7 +4334,7 @@ def to_gbq(
43344334
cluster_cols=tuple(clustering_fields),
43354335
if_exists=if_exists,
43364336
)
4337-
).with_current_configuration(),
4337+
),
43384338
)
43394339
assert result.query_job is not None
43404340
self._set_internal_query_job(result.query_job)
@@ -4414,7 +4414,7 @@ def to_parquet(
44144414
format="parquet",
44154415
export_options=tuple(export_options.items()),
44164416
)
4417-
).with_current_configuration(),
4417+
),
44184418
)
44194419
self._set_internal_query_job(result.query_job)
44204420
return None

packages/bigframes/bigframes/formatting_helpers.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import google.cloud.bigquery as bigquery
2727
import humanize
2828

29+
import bigframes._config
30+
2931
if TYPE_CHECKING:
3032
import bigframes.core.events
3133

packages/bigframes/bigframes/session/bq_caching_executor.py

Lines changed: 34 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import bigframes.session.metrics
4242
import bigframes.session.planner
4343
import bigframes.session.temporary_storage
44+
from bigframes._config import ComputeOptions
4445
from bigframes.core import bq_data, compile, guid, identifiers, local_data, rewrite
4546
from bigframes.core.compile.sqlglot import sql as sg_sql
4647
from bigframes.core.compile.sqlglot import sqlglot_ir
@@ -183,6 +184,8 @@ def execute(
183184
array_value: bigframes.core.ArrayValue,
184185
execution_spec: ex_spec.ExecutionSpec,
185186
) -> executor.ExecuteResult:
187+
# Need to grab thread local before starting async execution.
188+
execution_spec = execution_spec.with_compute_options(bigframes.options.compute)
186189
return _run_sync(
187190
self._execute_async(
188191
array_value,
@@ -319,7 +322,9 @@ async def _execute_gbq_query_only(
319322
array_value: bigframes.core.ArrayValue,
320323
execution_spec: ex_spec.ExecutionSpec,
321324
) -> executor.ExecuteResult:
322-
gbq_plan = await self._prepare_plan_bq_execution(array_value.node)
325+
gbq_plan = await self._prepare_plan_bq_execution(
326+
array_value.node, execution_spec.bigquery_config
327+
)
323328
result = await self._gbq_executor.execute(gbq_plan, execution_spec)
324329
if result is None:
325330
raise ValueError(
@@ -395,10 +400,13 @@ def dry_run(
395400
def cached(
396401
self, array_value: bigframes.core.ArrayValue, *, config: executor.CacheConfig
397402
) -> None:
403+
# Get compute options before passing to async method, can be thread-local
404+
bq_compute_options = ex_spec.BqComputeOptions.from_compute_options(
405+
bigframes.options.compute
406+
)
398407
return _run_sync(
399408
self._cached_async(
400-
array_value,
401-
config=config,
409+
array_value, config=config, compute_options=bq_compute_options
402410
)
403411
)
404412

@@ -407,6 +415,7 @@ async def _cached_async(
407415
array_value: bigframes.core.ArrayValue,
408416
*,
409417
config: executor.CacheConfig,
418+
compute_options: ex_spec.BqComputeOptions,
410419
) -> None:
411420
"""Write the block to a session table."""
412421
# First, see if we can reuse the existing cache
@@ -429,25 +438,23 @@ async def _cached_async(
429438

430439
if config.optimize_for == "auto":
431440
await self._cache_with_session_awareness(
432-
array_value, enable_multi_query_execution=enable_multi_query_execution
441+
array_value, compute_options=compute_options
433442
)
434443
elif config.optimize_for == "head":
435-
await self._cache_with_offsets(
436-
array_value, enable_multi_query_execution=enable_multi_query_execution
437-
)
444+
await self._cache_with_offsets(array_value, compute_options=compute_options)
438445
else:
439446
assert isinstance(config.optimize_for, executor.HierarchicalKey)
440447
await self._cache_with_cluster_cols(
441448
array_value,
442449
cluster_cols=config.optimize_for.columns,
443-
enable_multi_query_execution=enable_multi_query_execution,
450+
compute_options=compute_options,
444451
)
445452

446453
async def _execute_to_cached_table(
447454
self,
448455
plan: nodes.BigFrameNode,
449456
cache_spec: ex_spec.CacheSpec,
450-
enable_multi_query_execution: bool,
457+
compute_options: ex_spec.BqComputeOptions,
451458
) -> executor.ExecuteResult:
452459
# "ephemeral" temp tables created in the course of exeuction, don't need to be allocated
453460
# materialized ordering only really makes sense for internal temp tables used by caching
@@ -474,7 +481,7 @@ async def _execute_to_cached_table(
474481
cluster_cols=cluster_cols,
475482
if_exists="replace",
476483
),
477-
enable_multi_query_execution=enable_multi_query_execution,
484+
bigquery_config=compute_options,
478485
)
479486
# We don't use _execute_gbq_table_export, as this result is internal, not exported.
480487
result = await self._execute_gbq_query_only(
@@ -506,13 +513,13 @@ def _prepare_plan_simplify(self, plan: nodes.BigFrameNode) -> nodes.BigFrameNode
506513
return plan
507514

508515
async def _prepare_plan_bq_execution(
509-
self, plan: nodes.BigFrameNode, enable_multi_query_execution: bool
516+
self,
517+
plan: nodes.BigFrameNode,
518+
compute_options: ex_spec.BqComputeOptions,
510519
) -> nodes.BigFrameNode:
511520
"""Prepare the plan for BigQuery execution by caching subtrees and uploading large local sources."""
512-
if enable_multi_query_execution:
513-
await self._simplify_with_caching(
514-
plan, enable_multi_query_execution=enable_multi_query_execution
515-
)
521+
if compute_options.enable_multi_query_execution:
522+
await self._simplify_with_caching(plan, compute_options=compute_options)
516523
plan = self._prepare_plan_simplify(plan)
517524
plan = await self._substitute_large_local_sources(plan)
518525
return plan
@@ -521,7 +528,7 @@ async def _cache_with_cluster_cols(
521528
self,
522529
array_value: bigframes.core.ArrayValue,
523530
cluster_cols: Sequence[str],
524-
enable_multi_query_execution: bool,
531+
compute_options: ex_spec.BqComputeOptions,
525532
):
526533
"""Executes the query and uses the resulting table to rewrite future executions."""
527534
cluster_cols = [
@@ -533,7 +540,7 @@ async def _cache_with_cluster_cols(
533540
result = await self._execute_to_cached_table(
534541
array_value.node,
535542
ex_spec.CacheSpec(cluster_cols=tuple(cluster_cols), ordering="order_key"),
536-
enable_multi_query_execution=enable_multi_query_execution,
543+
compute_options=compute_options,
537544
)
538545
assert isinstance(result, executor.BQTableExecuteResult)
539546
assert result._data.ordering is not None
@@ -542,13 +549,13 @@ async def _cache_with_cluster_cols(
542549
async def _cache_with_offsets(
543550
self,
544551
array_value: bigframes.core.ArrayValue,
545-
enable_multi_query_execution: bool,
552+
compute_options: ex_spec.BqComputeOptions,
546553
):
547554
"""Executes the query and uses the resulting table to rewrite future executions."""
548555
result = await self._execute_to_cached_table(
549556
array_value.node,
550557
ex_spec.CacheSpec(ordering="offsets_col"),
551-
enable_multi_query_execution=enable_multi_query_execution,
558+
compute_options=compute_options,
552559
)
553560
assert isinstance(result, executor.BQTableExecuteResult)
554561
assert result._data.ordering is not None
@@ -557,7 +564,7 @@ async def _cache_with_offsets(
557564
async def _cache_with_session_awareness(
558565
self,
559566
array_value: bigframes.core.ArrayValue,
560-
enable_multi_query_execution: bool,
567+
compute_options: ex_spec.BqComputeOptions,
561568
) -> None:
562569
session_forest = [obj._block._expr.node for obj in array_value.session.objects]
563570
# These node types are cheap to re-compute
@@ -569,22 +576,22 @@ async def _cache_with_session_awareness(
569576
await self._cache_with_cluster_cols(
570577
bigframes.core.ArrayValue(target),
571578
cluster_cols_sql_names,
572-
enable_multi_query_execution=enable_multi_query_execution,
579+
compute_options=compute_options,
573580
)
574581
elif not target.order_ambiguous:
575582
await self._cache_with_offsets(
576583
bigframes.core.ArrayValue(target),
577-
enable_multi_query_execution=enable_multi_query_execution,
584+
compute_options=compute_options,
578585
)
579586
else:
580587
await self._cache_with_cluster_cols(
581588
bigframes.core.ArrayValue(target),
582589
[],
583-
enable_multi_query_execution=enable_multi_query_execution,
590+
compute_options=compute_options,
584591
)
585592

586593
async def _simplify_with_caching(
587-
self, plan: nodes.BigFrameNode, enable_multi_query_execution: bool
594+
self, plan: nodes.BigFrameNode, compute_options: ex_spec.BqComputeOptions
588595
):
589596
"""Attempts to handle the complexity by caching duplicated subtrees and breaking the query into pieces."""
590597
# Apply existing caching first
@@ -596,13 +603,13 @@ async def _simplify_with_caching(
596603
return
597604

598605
did_cache = await self._cache_most_complex_subtree(
599-
plan, enable_multi_query_execution=enable_multi_query_execution
606+
plan, compute_options=compute_options
600607
)
601608
if not did_cache:
602609
return
603610

604611
async def _cache_most_complex_subtree(
605-
self, node: nodes.BigFrameNode, enable_multi_query_execution: bool
612+
self, node: nodes.BigFrameNode, compute_options: ex_spec.BqComputeOptions
606613
) -> bool:
607614
# TODO: If query fails, retry with lower complexity limit
608615
selection = tree_properties.select_cache_target(
@@ -622,7 +629,7 @@ async def _cache_most_complex_subtree(
622629
await self._cache_with_cluster_cols(
623630
bigframes.core.ArrayValue(selection),
624631
[],
625-
enable_multi_query_execution=enable_multi_query_execution,
632+
compute_options=compute_options,
626633
)
627634
return True
628635

packages/bigframes/bigframes/session/direct_gbq_execution.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,10 +95,13 @@ async def execute(
9595
job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs
9696
if self._labels:
9797
job_config.labels.update(self._labels)
98-
if spec.labels:
99-
job_config.labels.update(spec.labels)
100-
if spec.maximum_bytes_billed is not None:
101-
job_config.maximum_bytes_billed = spec.maximum_bytes_billed
98+
if spec.bigquery_config is not None:
99+
if spec.bigquery_config.extra_query_labels:
100+
job_config.labels.update(spec.bigquery_config.extra_query_labels)
101+
if spec.bigquery_config.maximum_bytes_billed is not None:
102+
job_config.maximum_bytes_billed = (
103+
spec.bigquery_config.maximum_bytes_billed
104+
)
102105

103106
iterator, query_job = await asyncio.to_thread(
104107
self._run_execute_query,

0 commit comments

Comments
 (0)