Skip to content
This repository was archived by the owner on May 7, 2026. It is now read-only.

Commit cfc1869

Browse files
feat: Initial support for biglake iceberg tables
1 parent 6fef9be commit cfc1869

18 files changed

Lines changed: 516 additions & 241 deletions

File tree

bigframes/core/array_value.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,8 @@
1717
import datetime
1818
import functools
1919
import typing
20-
from typing import Iterable, List, Mapping, Optional, Sequence, Tuple
20+
from typing import Iterable, List, Mapping, Optional, Sequence, Tuple, Union
2121

22-
import google.cloud.bigquery
2322
import pandas
2423
import pyarrow as pa
2524

@@ -91,7 +90,7 @@ def from_range(cls, start, end, step):
9190
@classmethod
9291
def from_table(
9392
cls,
94-
table: google.cloud.bigquery.Table,
93+
table: Union[bq_data.BiglakeIcebergTable, bq_data.GbqNativeTable],
9594
session: Session,
9695
*,
9796
columns: Optional[Sequence[str]] = None,
@@ -103,8 +102,6 @@ def from_table(
103102
):
104103
if offsets_col and primary_key:
105104
raise ValueError("must set at most one of 'offests', 'primary_key'")
106-
# define data source only for needed columns, this makes row-hashing cheaper
107-
table_def = bq_data.GbqTable.from_table(table, columns=columns or ())
108105

109106
# create ordering from info
110107
ordering = None
@@ -115,7 +112,9 @@ def from_table(
115112
[ids.ColumnId(key_part) for key_part in primary_key]
116113
)
117114

118-
bf_schema = schemata.ArraySchema.from_bq_table(table, columns=columns)
115+
bf_schema = schemata.ArraySchema.from_bq_schema(
116+
table.physical_schema, columns=columns
117+
)
119118
# Scan all columns by default, we define this list as it can be pruned while preserving source_def
120119
scan_list = nodes.ScanList(
121120
tuple(
@@ -124,7 +123,7 @@ def from_table(
124123
)
125124
)
126125
source_def = bq_data.BigqueryDataSource(
127-
table=table_def,
126+
table=table,
128127
schema=bf_schema,
129128
at_time=at_time,
130129
sql_predicate=predicate,

bigframes/core/bq_data.py

Lines changed: 91 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import queue
2323
import threading
2424
import typing
25-
from typing import Any, Iterator, Optional, Sequence, Tuple
25+
from typing import Any, Iterator, List, Optional, Sequence, Tuple, Union
2626

2727
from google.cloud import bigquery_storage_v1
2828
import google.cloud.bigquery as bq
@@ -37,23 +37,48 @@
3737
import bigframes.core.ordering as orderings
3838

3939

40+
# what is the line between metadata and core fields? Mostly metadata fields are optional or unreliable, but its fuzzy
4041
@dataclasses.dataclass(frozen=True)
41-
class GbqTable:
42+
class TableMetadata:
43+
# this size metadata might be stale, don't use where strict correctness is needed
44+
numBytes: Optional[int] = None
45+
numRows: Optional[int] = None
46+
location: Optional[str] = None
47+
type: Optional[str] = None
48+
created_time: Optional[datetime.datetime] = None
49+
modified_time: Optional[datetime.datetime] = None
50+
51+
52+
@dataclasses.dataclass(frozen=True)
53+
class GbqNativeTable:
4254
project_id: str = dataclasses.field()
4355
dataset_id: str = dataclasses.field()
4456
table_id: str = dataclasses.field()
4557
physical_schema: Tuple[bq.SchemaField, ...] = dataclasses.field()
4658
is_physically_stored: bool = dataclasses.field()
47-
cluster_cols: typing.Optional[Tuple[str, ...]]
59+
partition_col: Optional[str] = None
60+
cluster_cols: typing.Optional[Tuple[str, ...]] = None
61+
primary_key: Optional[Tuple[str, ...]] = None
62+
metadata: TableMetadata = TableMetadata()
4863

4964
@staticmethod
50-
def from_table(table: bq.Table, columns: Sequence[str] = ()) -> GbqTable:
65+
def from_table(table: bq.Table, columns: Sequence[str] = ()) -> GbqNativeTable:
5166
# Subsetting fields with columns can reduce cost of row-hash default ordering
5267
if columns:
5368
schema = tuple(item for item in table.schema if item.name in columns)
5469
else:
5570
schema = tuple(table.schema)
56-
return GbqTable(
71+
72+
metadata = TableMetadata(
73+
numBytes=table.num_bytes,
74+
numRows=table.num_rows,
75+
location=table.location, # type: ignore
76+
type=table.table_type, # type: ignore
77+
created_time=table.created,
78+
modified_time=table.modified,
79+
)
80+
81+
return GbqNativeTable(
5782
project_id=table.project,
5883
dataset_id=table.dataset_id,
5984
table_id=table.table_id,
@@ -62,15 +87,17 @@ def from_table(table: bq.Table, columns: Sequence[str] = ()) -> GbqTable:
6287
cluster_cols=None
6388
if table.clustering_fields is None
6489
else tuple(table.clustering_fields),
90+
primary_key=tuple(_get_primary_keys(table)),
91+
metadata=metadata,
6592
)
6693

6794
@staticmethod
6895
def from_ref_and_schema(
6996
table_ref: bq.TableReference,
7097
schema: Sequence[bq.SchemaField],
7198
cluster_cols: Optional[Sequence[str]] = None,
72-
) -> GbqTable:
73-
return GbqTable(
99+
) -> GbqNativeTable:
100+
return GbqNativeTable(
74101
project_id=table_ref.project,
75102
dataset_id=table_ref.dataset_id,
76103
table_id=table_ref.table_id,
@@ -84,12 +111,48 @@ def get_table_ref(self) -> bq.TableReference:
84111
bq.DatasetReference(self.project_id, self.dataset_id), self.table_id
85112
)
86113

114+
def get_full_id(self, quoted: bool = False) -> str:
115+
if quoted:
116+
return f"`{self.project_id}`.`{self.dataset_id}`.`{self.table_id}`"
117+
return f"{self.project_id}.{self.dataset_id}.{self.table_id}"
118+
87119
@property
88120
@functools.cache
89121
def schema_by_id(self):
90122
return {col.name: col for col in self.physical_schema}
91123

92124

125+
@dataclasses.dataclass(frozen=True)
126+
class BiglakeIcebergTable:
127+
project_id: str = dataclasses.field()
128+
catalog_id: str = dataclasses.field()
129+
namespace_id: str = dataclasses.field()
130+
table_id: str = dataclasses.field()
131+
physical_schema: Tuple[bq.SchemaField, ...] = dataclasses.field()
132+
cluster_cols: typing.Optional[Tuple[str, ...]]
133+
metadata: TableMetadata
134+
135+
def get_full_id(self, quoted: bool = False) -> str:
136+
if quoted:
137+
return f"`{self.project_id}`.`{self.catalog_id}`.`{self.namespace_id}`.`{self.table_id}`"
138+
return (
139+
f"{self.project_id}.{self.catalog_id}.{self.namespace_id}.{self.table_id}"
140+
)
141+
142+
@property
143+
@functools.cache
144+
def schema_by_id(self):
145+
return {col.name: col for col in self.physical_schema}
146+
147+
@property
148+
def partition_col(self) -> Optional[str]:
149+
return None
150+
151+
@property
152+
def primary_key(self) -> Optional[Tuple[str, ...]]:
153+
return None
154+
155+
93156
@dataclasses.dataclass(frozen=True)
94157
class BigqueryDataSource:
95158
"""
@@ -104,7 +167,7 @@ def __post_init__(self):
104167
self.schema.names
105168
)
106169

107-
table: GbqTable
170+
table: Union[GbqNativeTable, BiglakeIcebergTable]
108171
schema: bigframes.core.schema.ArraySchema
109172
at_time: typing.Optional[datetime.datetime] = None
110173
# Added for backwards compatibility, not validated
@@ -188,6 +251,8 @@ def get_arrow_batches(
188251
project_id: str,
189252
sample_rate: Optional[float] = None,
190253
) -> ReadResult:
254+
assert isinstance(data.table, GbqNativeTable)
255+
191256
table_mod_options = {}
192257
read_options_dict: dict[str, Any] = {"selected_fields": list(columns)}
193258

@@ -245,3 +310,21 @@ def process_batch(pa_batch):
245310
return ReadResult(
246311
batches, session.estimated_row_count, session.estimated_total_bytes_scanned
247312
)
313+
314+
315+
def _get_primary_keys(
316+
table: bq.Table,
317+
) -> List[str]:
318+
"""Get primary keys from table if they are set."""
319+
320+
primary_keys: List[str] = []
321+
if (
322+
(table_constraints := getattr(table, "table_constraints", None)) is not None
323+
and (primary_key := table_constraints.primary_key) is not None
324+
# This will be False for either None or empty list.
325+
# We want primary_keys = None if no primary keys are set.
326+
and (columns := primary_key.columns)
327+
):
328+
primary_keys = columns if columns is not None else []
329+
330+
return primary_keys

bigframes/core/compile/ibis_compiler/ibis_compiler.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -207,9 +207,7 @@ def _table_to_ibis(
207207
source: bq_data.BigqueryDataSource,
208208
scan_cols: typing.Sequence[str],
209209
) -> ibis_types.Table:
210-
full_table_name = (
211-
f"{source.table.project_id}.{source.table.dataset_id}.{source.table.table_id}"
212-
)
210+
full_table_name = source.table.get_full_id(quoted=False)
213211
# Physical schema might include unused columns, unsupported datatypes like JSON
214212
physical_schema = ibis_bigquery.BigQuerySchema.to_ibis(
215213
list(source.table.physical_schema)

bigframes/core/compile/sqlglot/compiler.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
from bigframes.core import (
2323
agg_expressions,
24+
bq_data,
2425
expression,
2526
guid,
2627
identifiers,
@@ -173,10 +174,21 @@ def compile_readlocal(node: nodes.ReadLocalNode, child: ir.SQLGlotIR) -> ir.SQLG
173174
@_compile_node.register
174175
def compile_readtable(node: nodes.ReadTableNode, child: ir.SQLGlotIR):
175176
table = node.source.table
177+
if isinstance(table, bq_data.GbqNativeTable):
178+
project, dataset, table_id = table.project_id, table.dataset_id, table.table_id
179+
elif isinstance(table, bq_data.BiglakeIcebergTable):
180+
project, dataset, table_id = (
181+
table.project_id,
182+
table.catalog_id,
183+
f"{table.namespace_id}.{table.table_id}",
184+
)
185+
186+
else:
187+
raise ValueError(f"Unrecognized table type: {table}")
176188
return ir.SQLGlotIR.from_table(
177-
table.project_id,
178-
table.dataset_id,
179-
table.table_id,
189+
project,
190+
dataset,
191+
table_id,
180192
col_names=[col.source_id for col in node.scan_list.items],
181193
alias_names=[col.id.sql for col in node.scan_list.items],
182194
uid_gen=child.uid_gen,

bigframes/core/nodes.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -825,9 +825,7 @@ def variables_introduced(self) -> int:
825825

826826
@property
827827
def row_count(self) -> typing.Optional[int]:
828-
if self.source.sql_predicate is None and self.source.table.is_physically_stored:
829-
return self.source.n_rows
830-
return None
828+
return self.source.n_rows
831829

832830
@property
833831
def node_defined_ids(self) -> Tuple[identifiers.ColumnId, ...]:

bigframes/core/schema.py

Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from dataclasses import dataclass
1818
import functools
1919
import typing
20-
from typing import Dict, List, Optional, Sequence
20+
from typing import Dict, Optional, Sequence
2121

2222
import google.cloud.bigquery
2323
import pyarrow
@@ -40,31 +40,16 @@ class ArraySchema:
4040
def __iter__(self):
4141
yield from self.items
4242

43-
@classmethod
44-
def from_bq_table(
45-
cls,
46-
table: google.cloud.bigquery.Table,
47-
column_type_overrides: Optional[
48-
typing.Dict[str, bigframes.dtypes.Dtype]
49-
] = None,
50-
columns: Optional[Sequence[str]] = None,
51-
):
52-
if not columns:
53-
fields = table.schema
54-
else:
55-
lookup = {field.name: field for field in table.schema}
56-
fields = [lookup[col] for col in columns]
57-
58-
return ArraySchema.from_bq_schema(
59-
fields, column_type_overrides=column_type_overrides
60-
)
61-
6243
@classmethod
6344
def from_bq_schema(
6445
cls,
65-
schema: List[google.cloud.bigquery.SchemaField],
46+
schema: Sequence[google.cloud.bigquery.SchemaField],
6647
column_type_overrides: Optional[Dict[str, bigframes.dtypes.Dtype]] = None,
48+
columns: Optional[Sequence[str]] = None,
6749
):
50+
if columns:
51+
lookup = {field.name: field for field in schema}
52+
schema = [lookup[col] for col in columns]
6853
if column_type_overrides is None:
6954
column_type_overrides = {}
7055
items = tuple(

bigframes/core/sql/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import bigframes.core.compile.googlesql as googlesql
2929

3030
if TYPE_CHECKING:
31-
import google.cloud.bigquery as bigquery
3231

3332
import bigframes.core.ordering
3433

@@ -131,7 +130,7 @@ def infix_op(opname: str, left_arg: str, right_arg: str):
131130
return f"{left_arg} {opname} {right_arg}"
132131

133132

134-
def is_distinct_sql(columns: Iterable[str], table_ref: bigquery.TableReference) -> str:
133+
def is_distinct_sql(columns: Iterable[str], table_ref) -> str:
135134
is_unique_sql = f"""WITH full_table AS (
136135
{googlesql.Select().from_(table_ref).select(columns).sql()}
137136
),

bigframes/dtypes.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -800,7 +800,7 @@ def convert_to_schema_field(
800800
name, inner_field.field_type, mode="REPEATED", fields=inner_field.fields
801801
)
802802
if pa.types.is_struct(bigframes_dtype.pyarrow_dtype):
803-
inner_fields: list[pa.Field] = []
803+
inner_fields: list[google.cloud.bigquery.SchemaField] = []
804804
struct_type = typing.cast(pa.StructType, bigframes_dtype.pyarrow_dtype)
805805
for i in range(struct_type.num_fields):
806806
field = struct_type.field(i)
@@ -823,7 +823,7 @@ def convert_to_schema_field(
823823

824824

825825
def bf_type_from_type_kind(
826-
bq_schema: list[google.cloud.bigquery.SchemaField],
826+
bq_schema: Sequence[google.cloud.bigquery.SchemaField],
827827
) -> typing.Dict[str, Dtype]:
828828
"""Converts bigquery sql type to the default bigframes dtype."""
829829
return {name: dtype for name, dtype in map(convert_schema_field, bq_schema)}

bigframes/operations/aggregations.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT
205205
return dtypes.TIMEDELTA_DTYPE
206206

207207
if dtypes.is_numeric(input_types[0]):
208-
if pd.api.types.is_bool_dtype(input_types[0]):
208+
if pd.api.types.is_bool_dtype(input_types[0]): # type: ignore
209209
return dtypes.INT_DTYPE
210210
return input_types[0]
211211

@@ -224,7 +224,7 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT
224224
# These will change if median is changed to exact implementation.
225225
if not dtypes.is_orderable(input_types[0]):
226226
raise TypeError(f"Type {input_types[0]} is not orderable")
227-
if pd.api.types.is_bool_dtype(input_types[0]):
227+
if pd.api.types.is_bool_dtype(input_types[0]): # type: ignore
228228
return dtypes.INT_DTYPE
229229
else:
230230
return input_types[0]

0 commit comments

Comments
 (0)