-
Notifications
You must be signed in to change notification settings - Fork 37
Add array data type support for Python #474
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
6cd7777
26c0714
bff423e
5994ee2
d812d39
9071cf8
7dec9da
b16a9b0
6a0fdf2
5315521
c81de63
0925a40
87ab426
6fcc002
a2fdfb5
5210fe5
704bcae
29f43f1
4c3cda8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,6 +24,7 @@ | |
| import time | ||
|
|
||
| import pyarrow as pa | ||
| import pytest | ||
|
|
||
| import fluss | ||
|
|
||
|
|
@@ -744,8 +745,6 @@ def _poll_records(scanner, expected_count, timeout_s=10): | |
| return collected | ||
|
|
||
|
|
||
|
|
||
|
|
||
| def _poll_arrow_ids(scanner, expected_count, timeout_s=10): | ||
| """Poll a batch scanner and extract 'id' column values.""" | ||
| all_ids = [] | ||
|
|
@@ -755,3 +754,203 @@ def _poll_arrow_ids(scanner, expected_count, timeout_s=10): | |
| if arrow_table.num_rows > 0: | ||
| all_ids.extend(arrow_table.column("id").to_pylist()) | ||
| return all_ids | ||
|
|
||
|
|
||
| async def test_append_and_scan_with_array(connection, admin): | ||
| """Test appending and scanning with array columns.""" | ||
| table_path = fluss.TablePath("fluss", "py_test_append_and_scan_with_array") | ||
| await admin.drop_table(table_path, ignore_if_not_exists=True) | ||
|
|
||
| pa_schema = pa.schema( | ||
| [ | ||
| pa.field("id", pa.int32()), | ||
| pa.field("tags", pa.list_(pa.string())), | ||
| pa.field("scores", pa.list_(pa.int32())), | ||
| ] | ||
| ) | ||
| schema = fluss.Schema(pa_schema) | ||
| table_descriptor = fluss.TableDescriptor(schema) | ||
| await admin.create_table(table_path, table_descriptor, ignore_if_exists=False) | ||
|
|
||
| table = await connection.get_table(table_path) | ||
| append_writer = table.new_append().create_writer() | ||
|
|
||
| # Batch 1: Testing standard lists | ||
| batch1 = pa.RecordBatch.from_arrays( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shall we tests null values inside arrays as well?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @fresh-borzoni, added null tests here b16a9b0. |
||
| [ | ||
| pa.array([1, 2], type=pa.int32()), | ||
| pa.array([["a", "b"], ["c"]], type=pa.list_(pa.string())), | ||
| pa.array([[10, 20], [30]], type=pa.list_(pa.int32())), | ||
| ], | ||
| schema=pa_schema, | ||
| ) | ||
| append_writer.write_arrow_batch(batch1) | ||
|
|
||
| # Batch 2: Testing null values inside arrays and null arrays | ||
| batch2 = pa.RecordBatch.from_arrays( | ||
| [ | ||
| pa.array([3, 4, 5, 6], type=pa.int32()), | ||
| pa.array([["d", None], None, [], [None]], type=pa.list_(pa.string())), | ||
| pa.array([[40, 50], [60], None, []], type=pa.list_(pa.int32())), | ||
| ], | ||
| schema=pa_schema, | ||
| ) | ||
| append_writer.write_arrow_batch(batch2) | ||
| await append_writer.flush() | ||
|
|
||
| # Verify via LogScanner (record-by-record) | ||
| scanner = await table.new_scan().create_log_scanner() | ||
| scanner.subscribe_buckets({0: fluss.EARLIEST_OFFSET}) | ||
| records = _poll_records(scanner, expected_count=6) | ||
|
|
||
| assert len(records) == 6 | ||
| records.sort(key=lambda r: r.row["id"]) | ||
|
|
||
| # Verify Batch 1 | ||
| assert records[0].row["tags"] == ["a", "b"] | ||
| assert records[0].row["scores"] == [10, 20] | ||
| assert records[1].row["tags"] == ["c"] | ||
| assert records[1].row["scores"] == [30] | ||
|
|
||
| # Verify Batch 2 | ||
| assert records[2].row["tags"] == ["d", None] | ||
| assert records[2].row["scores"] == [40, 50] | ||
| assert records[3].row["tags"] is None | ||
| assert records[3].row["scores"] == [60] | ||
| assert records[4].row["tags"] == [] | ||
| assert records[4].row["scores"] is None | ||
| assert records[5].row["tags"] == [None] | ||
| assert records[5].row["scores"] == [] | ||
|
|
||
| # Verify via to_arrow (batch-based) | ||
| scanner2 = await table.new_scan().create_record_batch_log_scanner() | ||
| scanner2.subscribe_buckets({0: fluss.EARLIEST_OFFSET}) | ||
| result_table = scanner2.to_arrow() | ||
|
|
||
| assert result_table.num_rows == 6 | ||
| assert result_table.column("tags").to_pylist() == [ | ||
| ["a", "b"], | ||
| ["c"], | ||
| ["d", None], | ||
| None, | ||
| [], | ||
| [None], | ||
| ] | ||
| assert result_table.column("scores").to_pylist() == [ | ||
| [10, 20], | ||
| [30], | ||
| [40, 50], | ||
| [60], | ||
| None, | ||
| [], | ||
| ] | ||
|
|
||
|
|
||
|
|
||
|
|
||
| async def test_append_rows_with_array(connection, admin): | ||
| """Test appending rows with array data as Python lists and scanning.""" | ||
| table_path = fluss.TablePath("fluss", "py_test_append_rows_with_array") | ||
| await admin.drop_table(table_path, ignore_if_not_exists=True) | ||
|
|
||
| pa_schema = pa.schema( | ||
| [ | ||
| pa.field("id", pa.int32()), | ||
| pa.field("tags", pa.list_(pa.string())), | ||
| pa.field("scores", pa.list_(pa.int32())), | ||
| ] | ||
| ) | ||
| schema = fluss.Schema(pa_schema) | ||
| table_descriptor = fluss.TableDescriptor(schema) | ||
| await admin.create_table(table_path, table_descriptor, ignore_if_exists=False) | ||
|
|
||
| table = await connection.get_table(table_path) | ||
| append_writer = table.new_append().create_writer() | ||
|
|
||
| # Append rows using dicts with lists | ||
| append_writer.append({"id": 1, "tags": ["a", "b"], "scores": [10, 20]}) | ||
| append_writer.append({"id": 2, "tags": ["c"], "scores": [30]}) | ||
| # Append row using list with nested list (null handling) | ||
| append_writer.append([3, None, [40, None, 60]]) | ||
|
|
||
| await append_writer.flush() | ||
|
|
||
| scanner = await table.new_scan().create_log_scanner() | ||
| num_buckets = (await admin.get_table_info(table_path)).num_buckets | ||
| scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) | ||
|
|
||
| records = _poll_records(scanner, expected_count=3) | ||
| assert len(records) == 3 | ||
|
|
||
| rows = sorted([r.row for r in records], key=lambda r: r["id"]) | ||
| assert rows[0] == {"id": 1, "tags": ["a", "b"], "scores": [10, 20]} | ||
| assert rows[1] == {"id": 2, "tags": ["c"], "scores": [30]} | ||
| # Note: records[2].row["tags"] will be None, records[2].row["scores"] will be [40, None, 60] | ||
| assert rows[2]["id"] == 3 | ||
| assert rows[2]["tags"] is None | ||
| assert rows[2]["scores"] == [40, None, 60] | ||
|
|
||
| await admin.drop_table(table_path, ignore_if_not_exists=False) | ||
|
|
||
|
|
||
| async def test_append_rows_with_nested_array(connection, admin): | ||
| """Test appending rows with nested array data (ARRAY<ARRAY<INT>>) and scanning.""" | ||
| table_path = fluss.TablePath("fluss", "py_test_append_rows_with_nested_array") | ||
| await admin.drop_table(table_path, ignore_if_not_exists=True) | ||
|
|
||
| pa_schema = pa.schema([ | ||
| pa.field("id", pa.int32()), | ||
| pa.field("matrix", pa.list_(pa.list_(pa.int32()))), | ||
| ]) | ||
| schema = fluss.Schema(pa_schema) | ||
| await admin.create_table(table_path, fluss.TableDescriptor(schema), ignore_if_exists=False) | ||
|
|
||
| table = await connection.get_table(table_path) | ||
| append_writer = table.new_append().create_writer() | ||
|
|
||
| # Append nested lists | ||
| append_writer.append({"id": 1, "matrix": [[1, 2], [3, 4]]}) | ||
| append_writer.append({"id": 2, "matrix": [[], [5], [6, 7, 8]]}) | ||
| append_writer.append({"id": 3, "matrix": None}) | ||
| append_writer.append({"id": 4, "matrix": [[1, None], None, []]}) | ||
| append_writer.append({"id": 5, "matrix": [[None, None]]}) | ||
|
|
||
| await append_writer.flush() | ||
|
|
||
| scanner = await table.new_scan().create_log_scanner() | ||
| num_buckets = (await admin.get_table_info(table_path)).num_buckets | ||
| scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) | ||
|
|
||
| records = _poll_records(scanner, expected_count=5) | ||
| assert len(records) == 5 | ||
|
|
||
| rows = sorted([r.row for r in records], key=lambda r: r["id"]) | ||
| assert rows[0] == {"id": 1, "matrix": [[1, 2], [3, 4]]} | ||
| assert rows[1] == {"id": 2, "matrix": [[], [5], [6, 7, 8]]} | ||
| assert rows[2] == {"id": 3, "matrix": None} | ||
| assert rows[3] == {"id": 4, "matrix": [[1, None], None, []]} | ||
| assert rows[4] == {"id": 5, "matrix": [[None, None]]} | ||
|
|
||
| await admin.drop_table(table_path, ignore_if_not_exists=False) | ||
|
|
||
|
|
||
| async def test_append_rows_with_invalid_array(connection, admin): | ||
| """Test that appending invalid data to an array column raises an error.""" | ||
| table_path = fluss.TablePath("fluss", "py_test_append_rows_with_invalid_array") | ||
| await admin.drop_table(table_path, ignore_if_not_exists=True) | ||
|
|
||
| pa_schema = pa.schema([ | ||
| pa.field("id", pa.int32()), | ||
| pa.field("tags", pa.list_(pa.string())), | ||
| ]) | ||
| schema = fluss.Schema(pa_schema) | ||
| await admin.create_table(table_path, fluss.TableDescriptor(schema), ignore_if_exists=False) | ||
|
|
||
| table = await connection.get_table(table_path) | ||
| append_writer = table.new_append().create_writer() | ||
|
|
||
| # Appending a string instead of a list should raise an error | ||
| with pytest.raises(Exception, match="Expected sequence for Array column"): | ||
| append_writer.append({"id": 4, "tags": "not_a_list"}) | ||
|
|
||
| await admin.drop_table(table_path, ignore_if_not_exists=False) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -35,3 +35,16 @@ def test_get_primary_keys(): | |
| assert schema_without_pk.get_primary_keys() == [] | ||
|
|
||
|
|
||
| def test_schema_with_array(): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to update documentation as well on Array data type support?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @leekeiabstraction, documentation updated in 5210fe5 for PyArrow list, large list, and fixed size list data types. |
||
| # Test that a schema can be constructed from a pyarrow schema containing a list | ||
| fields = pa.schema( | ||
| [ | ||
| pa.field("id", pa.int32()), | ||
| pa.field("tags", pa.list_(pa.string())), | ||
| ] | ||
| ) | ||
| schema = fluss.Schema(fields) | ||
| assert schema.get_column_names() == ["id", "tags"] | ||
| assert schema.get_column_types() == ["int", "array<string>"] | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about python_value_to_datum?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @fresh-borzoni, thanks for catching this, added functionality for
python_value_to_datumin 7dec9da along with a set of tests.