Skip to content
This repository was archived by the owner on May 7, 2026. It is now read-only.

Commit 8a7ca8c

Browse files
feat: Add concurrency options to udf
1 parent a2f2b65 commit 8a7ca8c

5 files changed

Lines changed: 186 additions & 0 deletions

File tree

bigframes/functions/_function_client.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,8 @@ def create_cloud_function(
404404
vpc_connector_egress_settings="private-ranges-only",
405405
memory_mib=1024,
406406
ingress_settings="internal-only",
407+
workers=None,
408+
concurrency=None,
407409
):
408410
"""Create a cloud function from the given user defined function."""
409411

@@ -517,6 +519,12 @@ def create_cloud_function(
517519
function.service_config.service_account_email = (
518520
self._cloud_function_service_account
519521
)
522+
if concurrency:
523+
function.service_config.max_instance_request_concurrency = concurrency
524+
if workers:
525+
function.service_config.environment_variables = {
526+
"WORKERS": str(workers)
527+
}
520528
if ingress_settings not in _INGRESS_SETTINGS_MAP:
521529
raise bf_formatting.create_exception_with_feedback_link(
522530
ValueError,
@@ -583,6 +591,8 @@ def provision_bq_remote_function(
583591
cloud_function_memory_mib,
584592
cloud_function_ingress_settings,
585593
bq_metadata,
594+
workers,
595+
concurrency,
586596
):
587597
"""Provision a BigQuery remote function."""
588598
# Augment user package requirements with any internal package
@@ -631,6 +641,8 @@ def provision_bq_remote_function(
631641
vpc_connector_egress_settings=cloud_function_vpc_connector_egress_settings,
632642
memory_mib=cloud_function_memory_mib,
633643
ingress_settings=cloud_function_ingress_settings,
644+
workers=workers,
645+
concurrency=concurrency,
634646
)
635647
else:
636648
logger.info(f"Cloud function {cloud_function_name} already exists.")

bigframes/functions/_function_session.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,8 @@ def remote_function(
253253
"all", "internal-only", "internal-and-gclb"
254254
] = "internal-only",
255255
cloud_build_service_account: Optional[str] = None,
256+
workers: Optional[int] = 0,
257+
concurrency: Optional[int] = 0,
256258
):
257259
"""Decorator to turn a user defined function into a BigQuery remote function.
258260
@@ -640,6 +642,8 @@ def wrapper(func):
640642
cloud_function_memory_mib=cloud_function_memory_mib,
641643
cloud_function_ingress_settings=cloud_function_ingress_settings,
642644
bq_metadata=bqrf_metadata,
645+
workers=workers,
646+
concurrency=concurrency,
643647
)
644648

645649
bigframes_cloud_function = (

bigframes/pandas/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ def remote_function(
9393
"all", "internal-only", "internal-and-gclb"
9494
] = "internal-only",
9595
cloud_build_service_account: Optional[str] = None,
96+
workers: Optional[int] = 0,
97+
concurrency: Optional[int] = 0,
9698
):
9799
return global_session.with_default_session(
98100
bigframes.session.Session.remote_function,
@@ -114,6 +116,8 @@ def remote_function(
114116
cloud_function_memory_mib=cloud_function_memory_mib,
115117
cloud_function_ingress_settings=cloud_function_ingress_settings,
116118
cloud_build_service_account=cloud_build_service_account,
119+
workers=workers,
120+
concurrency=concurrency,
117121
)
118122

119123

bigframes/session/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1542,6 +1542,8 @@ def remote_function(
15421542
"all", "internal-only", "internal-and-gclb"
15431543
] = "internal-only",
15441544
cloud_build_service_account: Optional[str] = None,
1545+
workers: Optional[int] = 0,
1546+
concurrency: Optional[int] = 0,
15451547
):
15461548
"""Decorator to turn a user defined function into a BigQuery remote function. Check out
15471549
the code samples at: https://cloud.google.com/bigquery/docs/remote-functions#bigquery-dataframes.
@@ -1770,6 +1772,8 @@ def remote_function(
17701772
cloud_function_memory_mib=cloud_function_memory_mib,
17711773
cloud_function_ingress_settings=cloud_function_ingress_settings,
17721774
cloud_build_service_account=cloud_build_service_account,
1775+
workers=workers,
1776+
concurrency=concurrency,
17731777
)
17741778

17751779
def deploy_udf(
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "code",
5+
"execution_count": 15,
6+
"id": "2ad860c6",
7+
"metadata": {},
8+
"outputs": [],
9+
"source": [
10+
"import bigframes.pandas as bpd\n",
11+
"\n",
12+
"bpd.options.bigquery.ordering_mode = 'partial'\n"
13+
]
14+
},
15+
{
16+
"cell_type": "code",
17+
"execution_count": 16,
18+
"id": "03a64e81",
19+
"metadata": {},
20+
"outputs": [],
21+
"source": [
22+
"df = bpd.read_gbq(\"bigquery-public-data.baseball.schedules\")[[\"homeTeamName\", \"awayTeamName\", \"duration_minutes\"]]\n"
23+
]
24+
},
25+
{
26+
"cell_type": "code",
27+
"execution_count": 17,
28+
"id": "b71fd6b4",
29+
"metadata": {},
30+
"outputs": [],
31+
"source": [
32+
"import math\n",
33+
"\n",
34+
"def moderate_cpu_work(n: int) -> int:\n",
35+
" \"\"\"\n",
36+
" Performs a fixed amount of CPU-bound math to test UDF overhead.\n",
37+
" \"\"\"\n",
38+
" if n is None:\n",
39+
" return 0\n",
40+
" \n",
41+
" # Using a fixed iteration count ensures the 'work' is consistent \n",
42+
" # regardless of the input value, which helps in benchmarking.\n",
43+
" iterations = 100_000 \n",
44+
" aggregate = float(n)\n",
45+
" \n",
46+
" for i in range(iterations):\n",
47+
" # A mix of trig and power functions to keep the CPU busy\n",
48+
" # We use i to ensure the calculation doesn't get optimized away\n",
49+
" aggregate += math.sqrt(abs(math.sin(i) * math.cos(aggregate)))\n",
50+
" \n",
51+
" # Return a valid integer (modulo-ed to keep it clean)\n",
52+
" return int(aggregate) % 1_000_000"
53+
]
54+
},
55+
{
56+
"cell_type": "code",
57+
"execution_count": null,
58+
"id": "651095ed",
59+
"metadata": {},
60+
"outputs": [],
61+
"source": [
62+
"classic_func = bpd.remote_function(reuse=False, cloud_function_service_account=\"default\", cloud_function_memory_mib=16384)(moderate_cpu_work)\n",
63+
"concurrent_func = bpd.remote_function(reuse=False, cloud_function_service_account=\"default\", cloud_function_memory_mib=16384, workers=33, concurrency=32)(moderate_cpu_work)\n"
64+
]
65+
},
66+
{
67+
"cell_type": "code",
68+
"execution_count": null,
69+
"id": "608f3659",
70+
"metadata": {},
71+
"outputs": [
72+
{
73+
"name": "stderr",
74+
"output_type": "stream",
75+
"text": [
76+
"/usr/local/google/home/tbergeron/src/bigframes/bigframes/core/logging/log_adapter.py:183: TimeTravelCacheWarning: Reading cached table from 2026-02-24 19:15:25.384225+00:00 to avoid\n",
77+
"incompatibilies with previous reads of this table. To read the latest\n",
78+
"version, set `use_cache=False` or close the current session with\n",
79+
"Session.close() or bigframes.pandas.close_session().\n",
80+
" return method(*args, **kwargs)\n"
81+
]
82+
},
83+
{
84+
"data": {
85+
"text/html": [
86+
"\n",
87+
" Query processed 62.6 kB in 27 seconds of slot time. [<a target=\"_blank\" href=\"https://console.cloud.google.com/bigquery?project=bigframes-dev&j=bq:US:71a54816-1dba-4943-9537-9f8322fd4a11&page=queryresults\">Job bigframes-dev:US.71a54816-1dba-4943-9537-9f8322fd4a11 details</a>]\n",
88+
" "
89+
],
90+
"text/plain": [
91+
"<IPython.core.display.HTML object>"
92+
]
93+
},
94+
"metadata": {},
95+
"output_type": "display_data"
96+
}
97+
],
98+
"source": [
99+
"df1 = bpd.read_gbq(\"bigquery-public-data.bitcoin_blockchain.blockss\")[[\"block_id\"]]\n",
100+
"r1 = df1.assign(result_val=df1[\"block_id\"].apply(classic_func)).to_gbq()\n"
101+
]
102+
},
103+
{
104+
"cell_type": "code",
105+
"execution_count": null,
106+
"id": "54fb7a54",
107+
"metadata": {},
108+
"outputs": [
109+
{
110+
"name": "stderr",
111+
"output_type": "stream",
112+
"text": [
113+
"/usr/local/google/home/tbergeron/src/bigframes/bigframes/core/logging/log_adapter.py:183: TimeTravelCacheWarning: Reading cached table from 2026-02-24 19:15:25.384225+00:00 to avoid\n",
114+
"incompatibilies with previous reads of this table. To read the latest\n",
115+
"version, set `use_cache=False` or close the current session with\n",
116+
"Session.close() or bigframes.pandas.close_session().\n",
117+
" return method(*args, **kwargs)\n"
118+
]
119+
},
120+
{
121+
"data": {
122+
"text/html": [
123+
"\n",
124+
" Query processed 62.6 kB in 25 seconds of slot time. [<a target=\"_blank\" href=\"https://console.cloud.google.com/bigquery?project=bigframes-dev&j=bq:US:cd16b23e-a47a-4c16-9cfe-0ba230a324dd&page=queryresults\">Job bigframes-dev:US.cd16b23e-a47a-4c16-9cfe-0ba230a324dd details</a>]\n",
125+
" "
126+
],
127+
"text/plain": [
128+
"<IPython.core.display.HTML object>"
129+
]
130+
},
131+
"metadata": {},
132+
"output_type": "display_data"
133+
}
134+
],
135+
"source": [
136+
"df2 = bpd.read_gbq(\"bigquery-public-data.bitcoin_blockchain.blocks\")[[\"block_id\"]]\n",
137+
"r2 = df2.assign(result_val=df2[\"block_id\"].apply(concurrent_func)).to_gbq()\n"
138+
]
139+
}
140+
],
141+
"metadata": {
142+
"kernelspec": {
143+
"display_name": "venv (3.12.6)",
144+
"language": "python",
145+
"name": "python3"
146+
},
147+
"language_info": {
148+
"codemirror_mode": {
149+
"name": "ipython",
150+
"version": 3
151+
},
152+
"file_extension": ".py",
153+
"mimetype": "text/x-python",
154+
"name": "python",
155+
"nbconvert_exporter": "python",
156+
"pygments_lexer": "ipython3",
157+
"version": "3.12.6"
158+
}
159+
},
160+
"nbformat": 4,
161+
"nbformat_minor": 5
162+
}

0 commit comments

Comments
 (0)